1use std::{
5 any::Any,
6 collections::{HashMap, VecDeque},
7 env,
8 fmt::Debug,
9 iter,
10 ops::Range,
11 sync::Arc,
12 vec,
13};
14
15use crate::{
16 constants::{
17 STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK,
18 },
19 data::DictionaryDataBlock,
20 encodings::logical::primitive::blob::{BlobDescriptionPageScheduler, BlobPageScheduler},
21 format::{
22 ProtobufUtils21,
23 pb21::{self, CompressiveEncoding, PageLayout, compressive_encoding::Compression},
24 },
25};
26use arrow_array::{Array, ArrayRef, PrimitiveArray, cast::AsArray, make_array, types::UInt64Type};
27use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, NullBuffer, ScalarBuffer};
28use arrow_schema::{DataType, Field as ArrowField};
29use bytes::Bytes;
30use futures::{FutureExt, TryStreamExt, future::BoxFuture, stream::FuturesOrdered};
31use itertools::Itertools;
32use lance_arrow::DataTypeExt;
33use lance_arrow::deepcopy::deep_copy_nulls;
34use lance_core::{
35 cache::{CacheKey, Context, DeepSizeOf},
36 error::{Error, LanceOptionExt},
37 utils::bit::pad_bytes,
38};
39use log::trace;
40
41use crate::encodings::logical::primitive::miniblock::MiniBlockChunk;
42use crate::utils::bytepack::ByteUnpacker;
43use crate::{
44 compression::{
45 BlockDecompressor, CompressionStrategy, DecompressionStrategy, MiniBlockDecompressor,
46 },
47 data::{AllNullDataBlock, DataBlock, VariableWidthBlock},
48 utils::bytepack::BytepackedIntegerEncoder,
49};
50use crate::{
51 compression::{FixedPerValueDecompressor, VariablePerValueDecompressor},
52 encodings::logical::primitive::fullzip::PerValueDataBlock,
53};
54use crate::{
55 encodings::logical::primitive::miniblock::MiniBlockCompressed,
56 statistics::{ComputeStat, GetStat, Stat},
57};
58use crate::{
59 repdef::{
60 CompositeRepDefUnraveler, ControlWordIterator, ControlWordParser, DefinitionInterpretation,
61 RepDefSlicer, SerializedRepDefs, StructuralPagePlan, build_control_word_iterator,
62 },
63 utils::accumulation::AccumulationQueue,
64};
65use lance_core::{Result, datatypes::Field, utils::tokio::spawn_cpu};
66
67use crate::constants::{
68 COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, DICT_DIVISOR_META_KEY,
69 DICT_SIZE_RATIO_META_KEY, DICT_VALUES_COMPRESSION_ENV_VAR,
70 DICT_VALUES_COMPRESSION_LEVEL_ENV_VAR, DICT_VALUES_COMPRESSION_LEVEL_META_KEY,
71 DICT_VALUES_COMPRESSION_META_KEY,
72};
73use crate::version::LanceFileVersion;
74use crate::{
75 EncodingsIo,
76 buffer::LanceBuffer,
77 data::{BlockInfo, DataBlockBuilder, FixedWidthDataBlock},
78 decoder::{
79 ColumnInfo, DecodePageTask, DecodedArray, DecodedPage, FilterExpression, LoadedPageShard,
80 MessageType, PageEncoding, PageInfo, ScheduledScanLine, SchedulerContext,
81 StructuralDecodeArrayTask, StructuralFieldDecoder, StructuralFieldScheduler,
82 StructuralPageDecoder, StructuralSchedulingJob, UnloadedPageShard,
83 },
84 encoder::{
85 EncodeTask, EncodedColumn, EncodedPage, EncodingOptions, FieldEncoder, OutOfLineBuffers,
86 },
87 repdef::{LevelBuffer, RepDefBuilder, RepDefUnraveler},
88};
89
90pub mod blob;
91pub mod constant;
92pub mod dict;
93pub mod fullzip;
94pub mod miniblock;
95
96const FILL_BYTE: u8 = 0xFE;
97const DEFAULT_DICT_DIVISOR: u64 = 2;
98const DEFAULT_DICT_MAX_CARDINALITY: u64 = 100_000;
99const DEFAULT_DICT_SIZE_RATIO: f64 = 0.8;
100const DEFAULT_DICT_VALUES_COMPRESSION: &str = "lz4";
101
102struct PageLoadTask {
103 decoder_fut: BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>,
104 num_rows: u64,
105}
106
107trait StructuralPageScheduler: std::fmt::Debug + Send {
110 fn initialize<'a>(
112 &'a mut self,
113 io: &Arc<dyn EncodingsIo>,
114 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>>;
115 fn load(&mut self, data: &Arc<dyn CachedPageData>);
117 fn schedule_ranges(
126 &self,
127 ranges: &[Range<u64>],
128 io: &Arc<dyn EncodingsIo>,
129 ) -> Result<Vec<PageLoadTask>>;
130}
131
132#[derive(Debug)]
134struct ChunkMeta {
135 num_values: u64,
136 chunk_size_bytes: u64,
137 offset_bytes: u64,
138}
139
140#[derive(Debug, Clone)]
142struct DecodedMiniBlockChunk {
143 rep: Option<ScalarBuffer<u16>>,
144 def: Option<ScalarBuffer<u16>>,
145 values: DataBlock,
146}
147
148#[derive(Debug)]
156struct DecodeMiniBlockTask {
157 rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
158 def_decompressor: Option<Arc<dyn BlockDecompressor>>,
159 value_decompressor: Arc<dyn MiniBlockDecompressor>,
160 dictionary_data: Option<Arc<DataBlock>>,
161 def_meaning: Arc<[DefinitionInterpretation]>,
162 num_buffers: u64,
163 max_visible_level: u16,
164 instructions: Vec<(ChunkDrainInstructions, LoadedChunk)>,
165 has_large_chunk: bool,
166}
167
168impl DecodeMiniBlockTask {
169 fn decode_levels(
170 rep_decompressor: &dyn BlockDecompressor,
171 levels: LanceBuffer,
172 num_levels: u16,
173 ) -> Result<ScalarBuffer<u16>> {
174 let rep = rep_decompressor.decompress(levels, num_levels as u64)?;
175 let rep = rep.as_fixed_width().unwrap();
176 debug_assert_eq!(rep.num_values, num_levels as u64);
177 debug_assert_eq!(rep.bits_per_value, 16);
178 Ok(rep.data.borrow_to_typed_slice::<u16>())
179 }
180
181 fn extend_levels(
188 range: Range<u64>,
189 levels: &mut Option<LevelBuffer>,
190 level_buf: &Option<impl AsRef<[u16]>>,
191 dest_offset: usize,
192 ) {
193 if let Some(level_buf) = level_buf {
194 if levels.is_none() {
195 let mut new_levels_vec =
198 LevelBuffer::with_capacity(dest_offset + (range.end - range.start) as usize);
199 new_levels_vec.extend(iter::repeat_n(0, dest_offset));
200 *levels = Some(new_levels_vec);
201 }
202 levels.as_mut().unwrap().extend(
203 level_buf.as_ref()[range.start as usize..range.end as usize]
204 .iter()
205 .copied(),
206 );
207 } else if let Some(levels) = levels {
208 let num_values = (range.end - range.start) as usize;
209 levels.extend(iter::repeat_n(0, num_values));
212 }
213 }
214
215 fn map_range(
252 range: Range<u64>,
253 rep: Option<&impl AsRef<[u16]>>,
254 def: Option<&impl AsRef<[u16]>>,
255 max_rep: u16,
256 max_visible_def: u16,
257 total_items: u64,
260 preamble_action: PreambleAction,
261 ) -> (Range<u64>, Range<u64>) {
262 if let Some(rep) = rep {
263 let mut rep = rep.as_ref();
264 let mut items_in_preamble = 0_u64;
267 let first_row_start = match preamble_action {
268 PreambleAction::Skip | PreambleAction::Take => {
269 let first_row_start = if let Some(def) = def.as_ref() {
270 let mut first_row_start = None;
271 for (idx, (rep, def)) in rep.iter().zip(def.as_ref()).enumerate() {
272 if *rep == max_rep {
273 first_row_start = Some(idx as u64);
274 break;
275 }
276 if *def <= max_visible_def {
277 items_in_preamble += 1;
278 }
279 }
280 first_row_start
281 } else {
282 let first_row_start =
283 rep.iter().position(|&r| r == max_rep).map(|r| r as u64);
284 items_in_preamble = first_row_start.unwrap_or(rep.len() as u64);
285 first_row_start
286 };
287 if first_row_start.is_none() {
290 assert!(preamble_action == PreambleAction::Take);
291 return (0..total_items, 0..rep.len() as u64);
292 }
293 let first_row_start = first_row_start.unwrap();
294 rep = &rep[first_row_start as usize..];
295 first_row_start
296 }
297 PreambleAction::Absent => {
298 debug_assert!(rep[0] == max_rep);
299 0
300 }
301 };
302
303 if range.start == range.end {
305 debug_assert!(preamble_action == PreambleAction::Take);
306 debug_assert!(items_in_preamble <= total_items);
307 return (0..items_in_preamble, 0..first_row_start);
308 }
309 assert!(range.start < range.end);
310
311 let mut rows_seen = 0;
312 let mut new_start = 0;
313 let mut new_levels_start = 0;
314
315 if let Some(def) = def {
316 let def = &def.as_ref()[first_row_start as usize..];
317
318 let mut lead_invis_seen = 0;
320
321 if range.start > 0 {
322 if def[0] > max_visible_def {
323 lead_invis_seen += 1;
324 }
325 for (idx, (rep, def)) in rep.iter().zip(def).skip(1).enumerate() {
326 if *rep == max_rep {
327 rows_seen += 1;
328 if rows_seen == range.start {
329 new_start = idx as u64 + 1 - lead_invis_seen;
330 new_levels_start = idx as u64 + 1;
331 break;
332 }
333 }
334 if *def > max_visible_def {
335 lead_invis_seen += 1;
336 }
337 }
338 }
339
340 rows_seen += 1;
341
342 let mut new_end = u64::MAX;
343 let mut new_levels_end = rep.len() as u64;
344 let new_start_is_visible = def[new_levels_start as usize] <= max_visible_def;
345 let mut tail_invis_seen = if new_start_is_visible { 0 } else { 1 };
346 for (idx, (rep, def)) in rep[(new_levels_start + 1) as usize..]
347 .iter()
348 .zip(&def[(new_levels_start + 1) as usize..])
349 .enumerate()
350 {
351 if *rep == max_rep {
352 rows_seen += 1;
353 if rows_seen == range.end + 1 {
354 new_end = idx as u64 + new_start + 1 - tail_invis_seen;
355 new_levels_end = idx as u64 + new_levels_start + 1;
356 break;
357 }
358 }
359 if *def > max_visible_def {
360 tail_invis_seen += 1;
361 }
362 }
363
364 if new_end == u64::MAX {
365 new_levels_end = rep.len() as u64;
366 let total_invis_seen = lead_invis_seen + tail_invis_seen;
367 new_end = rep.len() as u64 - total_invis_seen;
368 }
369
370 assert_ne!(new_end, u64::MAX);
371
372 if preamble_action == PreambleAction::Skip {
374 new_start += items_in_preamble;
375 new_end += items_in_preamble;
376 new_levels_start += first_row_start;
377 new_levels_end += first_row_start;
378 } else if preamble_action == PreambleAction::Take {
379 debug_assert_eq!(new_start, 0);
380 debug_assert_eq!(new_levels_start, 0);
381 new_end += items_in_preamble;
382 new_levels_end += first_row_start;
383 }
384
385 debug_assert!(new_end <= total_items);
386 (new_start..new_end, new_levels_start..new_levels_end)
387 } else {
388 if range.start > 0 {
394 for (idx, rep) in rep.iter().skip(1).enumerate() {
395 if *rep == max_rep {
396 rows_seen += 1;
397 if rows_seen == range.start {
398 new_start = idx as u64 + 1;
399 break;
400 }
401 }
402 }
403 }
404 let mut new_end = rep.len() as u64;
405 if range.end < total_items {
407 for (idx, rep) in rep[(new_start + 1) as usize..].iter().enumerate() {
408 if *rep == max_rep {
409 rows_seen += 1;
410 if rows_seen == range.end {
411 new_end = idx as u64 + new_start + 1;
412 break;
413 }
414 }
415 }
416 }
417
418 if preamble_action == PreambleAction::Skip {
420 new_start += first_row_start;
421 new_end += first_row_start;
422 } else if preamble_action == PreambleAction::Take {
423 debug_assert_eq!(new_start, 0);
424 new_end += first_row_start;
425 }
426
427 debug_assert!(new_end <= total_items);
428 (new_start..new_end, new_start..new_end)
429 }
430 } else {
431 (range.clone(), range)
434 }
435 }
436
437 fn read_buffer_sizes<const LARGE: bool>(
439 buf: &[u8],
440 offset: &mut usize,
441 num_buffers: u64,
442 ) -> Vec<u32> {
443 let read_size = if LARGE { 4 } else { 2 };
444 (0..num_buffers)
445 .map(|_| {
446 let bytes = &buf[*offset..*offset + read_size];
447 let size = if LARGE {
448 u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]])
449 } else {
450 u16::from_le_bytes([bytes[0], bytes[1]]) as u32
452 };
453 *offset += read_size;
454 size
455 })
456 .collect()
457 }
458
459 fn decode_miniblock_chunk(
461 &self,
462 buf: &LanceBuffer,
463 items_in_chunk: u64,
464 ) -> Result<DecodedMiniBlockChunk> {
465 let mut offset = 0;
466 let num_levels = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
467 offset += 2;
468
469 let rep_size = if self.rep_decompressor.is_some() {
470 let rep_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
471 offset += 2;
472 Some(rep_size)
473 } else {
474 None
475 };
476 let def_size = if self.def_decompressor.is_some() {
477 let def_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
478 offset += 2;
479 Some(def_size)
480 } else {
481 None
482 };
483
484 let buffer_sizes = if self.has_large_chunk {
485 Self::read_buffer_sizes::<true>(buf, &mut offset, self.num_buffers)
486 } else {
487 Self::read_buffer_sizes::<false>(buf, &mut offset, self.num_buffers)
488 };
489
490 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
491
492 let rep = rep_size.map(|rep_size| {
493 let rep = buf.slice_with_length(offset, rep_size as usize);
494 offset += rep_size as usize;
495 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
496 rep
497 });
498
499 let def = def_size.map(|def_size| {
500 let def = buf.slice_with_length(offset, def_size as usize);
501 offset += def_size as usize;
502 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
503 def
504 });
505
506 let buffers = buffer_sizes
507 .into_iter()
508 .map(|buf_size| {
509 let buf = buf.slice_with_length(offset, buf_size as usize);
510 offset += buf_size as usize;
511 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
512 buf
513 })
514 .collect::<Vec<_>>();
515
516 let values = self
517 .value_decompressor
518 .decompress(buffers, items_in_chunk)?;
519
520 let rep = rep
521 .map(|rep| {
522 Self::decode_levels(
523 self.rep_decompressor.as_ref().unwrap().as_ref(),
524 rep,
525 num_levels,
526 )
527 })
528 .transpose()?;
529 let def = def
530 .map(|def| {
531 Self::decode_levels(
532 self.def_decompressor.as_ref().unwrap().as_ref(),
533 def,
534 num_levels,
535 )
536 })
537 .transpose()?;
538
539 Ok(DecodedMiniBlockChunk { rep, def, values })
540 }
541}
542
543impl DecodePageTask for DecodeMiniBlockTask {
544 fn decode(self: Box<Self>) -> Result<DecodedPage> {
545 let mut repbuf: Option<LevelBuffer> = None;
547 let mut defbuf: Option<LevelBuffer> = None;
548
549 let max_rep = self.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
550
551 let estimated_size_bytes = self
553 .instructions
554 .iter()
555 .map(|(_, chunk)| chunk.data.len())
556 .sum::<usize>()
557 * 2;
558 let mut data_builder =
559 DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
560
561 let mut level_offset = 0;
563
564 let needs_caching: Vec<bool> = self
566 .instructions
567 .windows(2)
568 .map(|w| w[0].1.chunk_idx == w[1].1.chunk_idx)
569 .chain(std::iter::once(false)) .collect();
571
572 let mut chunk_cache: Option<(usize, DecodedMiniBlockChunk)> = None;
574
575 for (idx, (instructions, chunk)) in self.instructions.iter().enumerate() {
577 let should_cache_this_chunk = needs_caching[idx];
578
579 let decoded_chunk = match &chunk_cache {
580 Some((cached_chunk_idx, cached_chunk)) if *cached_chunk_idx == chunk.chunk_idx => {
581 cached_chunk.clone()
583 }
584 _ => {
585 let decoded = self.decode_miniblock_chunk(&chunk.data, chunk.items_in_chunk)?;
587
588 if should_cache_this_chunk {
590 chunk_cache = Some((chunk.chunk_idx, decoded.clone()));
591 }
592 decoded
593 }
594 };
595
596 let DecodedMiniBlockChunk { rep, def, values } = decoded_chunk;
597
598 let row_range_start =
600 instructions.rows_to_skip + instructions.chunk_instructions.rows_to_skip;
601 let row_range_end = row_range_start + instructions.rows_to_take;
602
603 let (item_range, level_range) = Self::map_range(
605 row_range_start..row_range_end,
606 rep.as_ref(),
607 def.as_ref(),
608 max_rep,
609 self.max_visible_level,
610 chunk.items_in_chunk,
611 instructions.preamble_action,
612 );
613 if item_range.end - item_range.start > chunk.items_in_chunk {
614 return Err(lance_core::Error::internal(format!(
615 "Item range {:?} is greater than chunk items in chunk {:?}",
616 item_range, chunk.items_in_chunk
617 )));
618 }
619
620 Self::extend_levels(level_range.clone(), &mut repbuf, &rep, level_offset);
622 Self::extend_levels(level_range.clone(), &mut defbuf, &def, level_offset);
623 level_offset += (level_range.end - level_range.start) as usize;
624 data_builder.append(&values, item_range);
625 }
626
627 let mut data = data_builder.finish();
628
629 let unraveler =
630 RepDefUnraveler::new(repbuf, defbuf, self.def_meaning.clone(), data.num_values());
631
632 if let Some(dictionary) = &self.dictionary_data {
633 let DataBlock::FixedWidth(indices) = data else {
635 return Err(lance_core::Error::internal(format!(
636 "Expected FixedWidth DataBlock for dictionary indices, got {:?}",
637 data
638 )));
639 };
640 data = DataBlock::Dictionary(DictionaryDataBlock::from_parts(
641 indices,
642 dictionary.as_ref().clone(),
643 ));
644 }
645
646 Ok(DecodedPage {
647 data,
648 repdef: unraveler,
649 })
650 }
651}
652
653#[derive(Debug)]
656struct LoadedChunk {
657 data: LanceBuffer,
658 items_in_chunk: u64,
659 byte_range: Range<u64>,
660 chunk_idx: usize,
661}
662
663impl Clone for LoadedChunk {
664 fn clone(&self) -> Self {
665 Self {
666 data: self.data.clone(),
668 items_in_chunk: self.items_in_chunk,
669 byte_range: self.byte_range.clone(),
670 chunk_idx: self.chunk_idx,
671 }
672 }
673}
674
675#[derive(Debug)]
678struct MiniBlockDecoder {
679 rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
680 def_decompressor: Option<Arc<dyn BlockDecompressor>>,
681 value_decompressor: Arc<dyn MiniBlockDecompressor>,
682 def_meaning: Arc<[DefinitionInterpretation]>,
683 loaded_chunks: VecDeque<LoadedChunk>,
684 instructions: VecDeque<ChunkInstructions>,
685 offset_in_current_chunk: u64,
686 num_rows: u64,
687 num_buffers: u64,
688 dictionary: Option<Arc<DataBlock>>,
689 has_large_chunk: bool,
690}
691
692impl StructuralPageDecoder for MiniBlockDecoder {
695 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
696 let mut items_desired = num_rows;
697 let mut need_preamble = false;
698 let mut skip_in_chunk = self.offset_in_current_chunk;
699 let mut drain_instructions = Vec::new();
700 while items_desired > 0 || need_preamble {
701 let (instructions, consumed) = self
702 .instructions
703 .front()
704 .unwrap()
705 .drain_from_instruction(&mut items_desired, &mut need_preamble, &mut skip_in_chunk);
706
707 while self.loaded_chunks.front().unwrap().chunk_idx
708 != instructions.chunk_instructions.chunk_idx
709 {
710 self.loaded_chunks.pop_front();
711 }
712 drain_instructions.push((instructions, self.loaded_chunks.front().unwrap().clone()));
713 if consumed {
714 self.instructions.pop_front();
715 }
716 }
717 self.offset_in_current_chunk = skip_in_chunk;
720
721 let max_visible_level = self
722 .def_meaning
723 .iter()
724 .take_while(|l| !l.is_list())
725 .map(|l| l.num_def_levels())
726 .sum::<u16>();
727
728 Ok(Box::new(DecodeMiniBlockTask {
729 instructions: drain_instructions,
730 def_decompressor: self.def_decompressor.clone(),
731 rep_decompressor: self.rep_decompressor.clone(),
732 value_decompressor: self.value_decompressor.clone(),
733 dictionary_data: self.dictionary.clone(),
734 def_meaning: self.def_meaning.clone(),
735 num_buffers: self.num_buffers,
736 max_visible_level,
737 has_large_chunk: self.has_large_chunk,
738 }))
739 }
740
741 fn num_rows(&self) -> u64 {
742 self.num_rows
743 }
744}
745
746#[derive(Debug)]
747struct CachedComplexAllNullState {
748 rep: Option<ScalarBuffer<u16>>,
749 def: Option<ScalarBuffer<u16>>,
750}
751
752impl DeepSizeOf for CachedComplexAllNullState {
753 fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
754 self.rep.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
755 + self.def.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
756 }
757}
758
759impl CachedPageData for CachedComplexAllNullState {
760 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
761 self
762 }
763}
764
765#[derive(Debug)]
774pub struct ComplexAllNullScheduler {
775 buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
777 def_meaning: Arc<[DefinitionInterpretation]>,
778 repdef: Option<Arc<CachedComplexAllNullState>>,
779 max_rep: u16,
780 max_visible_level: u16,
781 rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
782 def_decompressor: Option<Arc<dyn BlockDecompressor>>,
783 num_rep_values: u64,
784 num_def_values: u64,
785}
786
787impl ComplexAllNullScheduler {
788 pub fn new(
789 buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
790 def_meaning: Arc<[DefinitionInterpretation]>,
791 rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
792 def_decompressor: Option<Arc<dyn BlockDecompressor>>,
793 num_rep_values: u64,
794 num_def_values: u64,
795 ) -> Self {
796 let max_rep = def_meaning.iter().filter(|l| l.is_list()).count() as u16;
797 let max_visible_level = def_meaning
798 .iter()
799 .take_while(|l| !l.is_list())
800 .map(|l| l.num_def_levels())
801 .sum::<u16>();
802 Self {
803 buffer_offsets_and_sizes,
804 def_meaning,
805 repdef: None,
806 max_rep,
807 max_visible_level,
808 rep_decompressor,
809 def_decompressor,
810 num_rep_values,
811 num_def_values,
812 }
813 }
814}
815
816impl StructuralPageScheduler for ComplexAllNullScheduler {
817 fn initialize<'a>(
818 &'a mut self,
819 io: &Arc<dyn EncodingsIo>,
820 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
821 let (rep_pos, rep_size) = self.buffer_offsets_and_sizes[0];
823 let (def_pos, def_size) = self.buffer_offsets_and_sizes[1];
824 let has_rep = rep_size > 0;
825 let has_def = def_size > 0;
826
827 let mut reads = Vec::with_capacity(2);
828 if has_rep {
829 reads.push(rep_pos..rep_pos + rep_size);
830 }
831 if has_def {
832 reads.push(def_pos..def_pos + def_size);
833 }
834
835 let data = io.submit_request(reads, 0);
836 let rep_decompressor = self.rep_decompressor.clone();
837 let def_decompressor = self.def_decompressor.clone();
838 let num_rep_values = self.num_rep_values;
839 let num_def_values = self.num_def_values;
840
841 async move {
842 let data = data.await?;
843 let mut data_iter = data.into_iter();
844
845 let decompress_levels = |compressed_bytes: Bytes,
846 decompressor: &Arc<dyn BlockDecompressor>,
847 num_values: u64,
848 level_type: &str|
849 -> Result<ScalarBuffer<u16>> {
850 let compressed_buffer = LanceBuffer::from_bytes(compressed_bytes, 1);
851 let decompressed = decompressor.decompress(compressed_buffer, num_values)?;
852 match decompressed {
853 DataBlock::FixedWidth(block) => {
854 if block.num_values != num_values {
855 return Err(Error::invalid_input_source(format!(
856 "Unexpected {} level count after decompression: expected {}, got {}",
857 level_type, num_values, block.num_values
858 )
859 .into()));
860 }
861 if block.bits_per_value != 16 {
862 return Err(Error::invalid_input_source(format!(
863 "Unexpected {} level bit width after decompression: expected 16, got {}",
864 level_type, block.bits_per_value
865 )
866 .into()));
867 }
868 Ok(block.data.borrow_to_typed_slice::<u16>())
869 }
870 _ => Err(Error::invalid_input_source(format!(
871 "Expected fixed-width data block for {} levels",
872 level_type
873 )
874 .into())),
875 }
876 };
877
878 let rep = if has_rep {
879 let rep = data_iter.next().unwrap();
880 if let Some(rep_decompressor) = rep_decompressor.as_ref() {
881 Some(decompress_levels(
882 rep,
883 rep_decompressor,
884 num_rep_values,
885 "repetition",
886 )?)
887 } else {
888 let rep = LanceBuffer::from_bytes(rep, 2);
889 let rep = rep.borrow_to_typed_slice::<u16>();
890 Some(rep)
891 }
892 } else {
893 None
894 };
895
896 let def = if has_def {
897 let def = data_iter.next().unwrap();
898 if let Some(def_decompressor) = def_decompressor.as_ref() {
899 Some(decompress_levels(
900 def,
901 def_decompressor,
902 num_def_values,
903 "definition",
904 )?)
905 } else {
906 let def = LanceBuffer::from_bytes(def, 2);
907 let def = def.borrow_to_typed_slice::<u16>();
908 Some(def)
909 }
910 } else {
911 None
912 };
913
914 let repdef = Arc::new(CachedComplexAllNullState { rep, def });
915
916 self.repdef = Some(repdef.clone());
917
918 Ok(repdef as Arc<dyn CachedPageData>)
919 }
920 .boxed()
921 }
922
923 fn load(&mut self, data: &Arc<dyn CachedPageData>) {
924 self.repdef = Some(
925 data.clone()
926 .as_arc_any()
927 .downcast::<CachedComplexAllNullState>()
928 .unwrap(),
929 );
930 }
931
932 fn schedule_ranges(
933 &self,
934 ranges: &[Range<u64>],
935 _io: &Arc<dyn EncodingsIo>,
936 ) -> Result<Vec<PageLoadTask>> {
937 let ranges = VecDeque::from_iter(ranges.iter().cloned());
938 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
939 let decoder = Box::new(ComplexAllNullPageDecoder {
940 ranges,
941 rep: self.repdef.as_ref().unwrap().rep.clone(),
942 def: self.repdef.as_ref().unwrap().def.clone(),
943 num_rows,
944 def_meaning: self.def_meaning.clone(),
945 max_rep: self.max_rep,
946 max_visible_level: self.max_visible_level,
947 cursor_row: 0,
948 cursor_level: 0,
949 }) as Box<dyn StructuralPageDecoder>;
950 let page_load_task = PageLoadTask {
951 decoder_fut: std::future::ready(Ok(decoder)).boxed(),
952 num_rows,
953 };
954 Ok(vec![page_load_task])
955 }
956}
957
958#[derive(Debug)]
959pub struct ComplexAllNullPageDecoder {
960 ranges: VecDeque<Range<u64>>,
961 rep: Option<ScalarBuffer<u16>>,
962 def: Option<ScalarBuffer<u16>>,
963 num_rows: u64,
964 def_meaning: Arc<[DefinitionInterpretation]>,
965 max_rep: u16,
966 max_visible_level: u16,
967 cursor_row: u64,
968 cursor_level: usize,
969}
970
971impl ComplexAllNullPageDecoder {
972 fn drain_ranges(&mut self, num_rows: u64) -> Vec<Range<u64>> {
973 let mut rows_desired = num_rows;
974 let mut ranges = Vec::with_capacity(self.ranges.len());
975 while rows_desired > 0 {
976 let front = self.ranges.front_mut().unwrap();
977 let avail = front.end - front.start;
978 if avail > rows_desired {
979 ranges.push(front.start..front.start + rows_desired);
980 front.start += rows_desired;
981 rows_desired = 0;
982 } else {
983 ranges.push(self.ranges.pop_front().unwrap());
984 rows_desired -= avail;
985 }
986 }
987 ranges
988 }
989
990 fn take_row(&mut self) -> Result<(Range<usize>, u64)> {
991 let start = self.cursor_level;
992 let end = if let Some(rep) = &self.rep {
993 if start >= rep.len() {
994 return Err(Error::internal(
995 "Invalid complex all-null layout: repetition buffer too short",
996 ));
997 }
998 if rep[start] != self.max_rep {
999 return Err(Error::internal(
1000 "Invalid complex all-null layout: row did not start at max repetition level",
1001 ));
1002 }
1003 let mut end = start + 1;
1004 while end < rep.len() && rep[end] != self.max_rep {
1005 end += 1;
1006 }
1007 end
1008 } else {
1009 start + 1
1010 };
1011
1012 let visible = if let Some(def) = &self.def {
1013 if end > def.len() {
1014 return Err(Error::internal(
1015 "Invalid complex all-null layout: definition buffer too short",
1016 ));
1017 }
1018 def[start..end]
1019 .iter()
1020 .filter(|d| **d <= self.max_visible_level)
1021 .count() as u64
1022 } else {
1023 (end - start) as u64
1024 };
1025
1026 self.cursor_level = end;
1027 self.cursor_row += 1;
1028 Ok((start..end, visible))
1029 }
1030
1031 fn skip_to_row(&mut self, target_row: u64) -> Result<()> {
1032 while self.cursor_row < target_row {
1033 self.take_row()?;
1034 }
1035 Ok(())
1036 }
1037}
1038
1039impl StructuralPageDecoder for ComplexAllNullPageDecoder {
1040 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
1041 let drained_ranges = self.drain_ranges(num_rows);
1042 let mut level_slices: Vec<Range<usize>> = Vec::new();
1043 let mut visible_items_total = 0;
1044
1045 for range in drained_ranges {
1046 self.skip_to_row(range.start)?;
1047 for _ in range.start..range.end {
1048 let (level_range, visible) = self.take_row()?;
1049 visible_items_total += visible;
1050 if let Some(last) = level_slices.last_mut()
1051 && last.end == level_range.start
1052 {
1053 last.end = level_range.end;
1054 continue;
1055 }
1056 level_slices.push(level_range);
1057 }
1058 }
1059
1060 Ok(Box::new(DecodeComplexAllNullTask {
1061 level_slices,
1062 visible_items_total,
1063 rep: self.rep.clone(),
1064 def: self.def.clone(),
1065 def_meaning: self.def_meaning.clone(),
1066 max_visible_level: self.max_visible_level,
1067 }))
1068 }
1069
1070 fn num_rows(&self) -> u64 {
1071 self.num_rows
1072 }
1073}
1074
1075#[derive(Debug)]
1078pub struct DecodeComplexAllNullTask {
1079 level_slices: Vec<Range<usize>>,
1080 visible_items_total: u64,
1081 rep: Option<ScalarBuffer<u16>>,
1082 def: Option<ScalarBuffer<u16>>,
1083 def_meaning: Arc<[DefinitionInterpretation]>,
1084 max_visible_level: u16,
1085}
1086
1087impl DecodeComplexAllNullTask {
1088 fn decode_level(&self, levels: &Option<ScalarBuffer<u16>>) -> Option<Vec<u16>> {
1089 levels.as_ref().map(|levels| {
1090 let num_levels = self
1091 .level_slices
1092 .iter()
1093 .map(|range| range.end - range.start)
1094 .sum();
1095 let mut referenced_levels = Vec::with_capacity(num_levels);
1096 for range in &self.level_slices {
1097 referenced_levels.extend(levels[range.start..range.end].iter().copied());
1098 }
1099 referenced_levels
1100 })
1101 }
1102}
1103
1104impl DecodePageTask for DecodeComplexAllNullTask {
1105 fn decode(self: Box<Self>) -> Result<DecodedPage> {
1106 let rep = self.decode_level(&self.rep);
1107 let def = self.decode_level(&self.def);
1108
1109 let num_values = if let Some(def) = &def {
1113 def.iter().filter(|&d| *d <= self.max_visible_level).count() as u64
1114 } else {
1115 self.visible_items_total
1116 };
1117
1118 let data = DataBlock::AllNull(AllNullDataBlock { num_values });
1119 let unraveler = RepDefUnraveler::new(rep, def, self.def_meaning, num_values);
1120 Ok(DecodedPage {
1121 data,
1122 repdef: unraveler,
1123 })
1124 }
1125}
1126
1127#[derive(Debug, Default)]
1132pub struct SimpleAllNullScheduler {}
1133
1134impl StructuralPageScheduler for SimpleAllNullScheduler {
1135 fn initialize<'a>(
1136 &'a mut self,
1137 _io: &Arc<dyn EncodingsIo>,
1138 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
1139 std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
1140 }
1141
1142 fn load(&mut self, _cache: &Arc<dyn CachedPageData>) {}
1143
1144 fn schedule_ranges(
1145 &self,
1146 ranges: &[Range<u64>],
1147 _io: &Arc<dyn EncodingsIo>,
1148 ) -> Result<Vec<PageLoadTask>> {
1149 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1150 let decoder =
1151 Box::new(SimpleAllNullPageDecoder { num_rows }) as Box<dyn StructuralPageDecoder>;
1152 let page_load_task = PageLoadTask {
1153 decoder_fut: std::future::ready(Ok(decoder)).boxed(),
1154 num_rows,
1155 };
1156 Ok(vec![page_load_task])
1157 }
1158}
1159
1160#[derive(Debug)]
1163struct SimpleAllNullDecodePageTask {
1164 num_values: u64,
1165}
1166impl DecodePageTask for SimpleAllNullDecodePageTask {
1167 fn decode(self: Box<Self>) -> Result<DecodedPage> {
1168 let unraveler = RepDefUnraveler::new(
1169 None,
1170 Some(vec![1; self.num_values as usize]),
1171 Arc::new([DefinitionInterpretation::NullableItem]),
1172 self.num_values,
1173 );
1174 Ok(DecodedPage {
1175 data: DataBlock::AllNull(AllNullDataBlock {
1176 num_values: self.num_values,
1177 }),
1178 repdef: unraveler,
1179 })
1180 }
1181}
1182
1183#[derive(Debug)]
1184pub struct SimpleAllNullPageDecoder {
1185 num_rows: u64,
1186}
1187
1188impl StructuralPageDecoder for SimpleAllNullPageDecoder {
1189 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
1190 Ok(Box::new(SimpleAllNullDecodePageTask {
1191 num_values: num_rows,
1192 }))
1193 }
1194
1195 fn num_rows(&self) -> u64 {
1196 self.num_rows
1197 }
1198}
1199
1200#[derive(Debug, Clone)]
1201struct MiniBlockSchedulerDictionary {
1202 dictionary_decompressor: Arc<dyn BlockDecompressor>,
1204 dictionary_buf_position_and_size: (u64, u64),
1205 dictionary_data_alignment: u64,
1206 num_dictionary_items: u64,
1207}
1208
1209#[derive(Debug)]
1211struct MiniBlockRepIndexBlock {
1212 first_row: u64,
1216 starts_including_trailer: u64,
1219 has_preamble: bool,
1221 has_trailer: bool,
1223}
1224
1225impl DeepSizeOf for MiniBlockRepIndexBlock {
1226 fn deep_size_of_children(&self, _context: &mut Context) -> usize {
1227 0
1228 }
1229}
1230
1231#[derive(Debug)]
1236struct MiniBlockRepIndex {
1237 blocks: Vec<MiniBlockRepIndexBlock>,
1238}
1239
1240impl DeepSizeOf for MiniBlockRepIndex {
1241 fn deep_size_of_children(&self, context: &mut Context) -> usize {
1242 self.blocks.deep_size_of_children(context)
1243 }
1244}
1245
1246impl MiniBlockRepIndex {
1247 pub fn default_from_chunks(chunks: &[ChunkMeta]) -> Self {
1252 let mut blocks = Vec::with_capacity(chunks.len());
1253 let mut offset: u64 = 0;
1254
1255 for c in chunks {
1256 blocks.push(MiniBlockRepIndexBlock {
1257 first_row: offset,
1258 starts_including_trailer: c.num_values,
1259 has_preamble: false,
1260 has_trailer: false,
1261 });
1262
1263 offset += c.num_values;
1264 }
1265
1266 Self { blocks }
1267 }
1268
1269 pub fn decode_from_bytes(rep_bytes: &[u8], stride: usize) -> Self {
1275 let buffer = crate::buffer::LanceBuffer::from(rep_bytes.to_vec());
1277 let u64_slice = buffer.borrow_to_typed_slice::<u64>();
1278 let n = u64_slice.len() / stride;
1279
1280 let mut blocks = Vec::with_capacity(n);
1281 let mut chunk_has_preamble = false;
1282 let mut offset: u64 = 0;
1283
1284 for i in 0..n {
1286 let base_idx = i * stride;
1287 let ends = u64_slice[base_idx];
1288 let partial = u64_slice[base_idx + 1];
1289
1290 let has_trailer = partial > 0;
1291 let starts_including_trailer =
1293 ends + (has_trailer as u64) - (chunk_has_preamble as u64);
1294
1295 blocks.push(MiniBlockRepIndexBlock {
1296 first_row: offset,
1297 starts_including_trailer,
1298 has_preamble: chunk_has_preamble,
1299 has_trailer,
1300 });
1301
1302 chunk_has_preamble = has_trailer;
1303 offset += starts_including_trailer;
1304 }
1305
1306 Self { blocks }
1307 }
1308}
1309
1310#[derive(Debug)]
1312struct MiniBlockCacheableState {
1313 chunk_meta: Vec<ChunkMeta>,
1315 rep_index: MiniBlockRepIndex,
1317 dictionary: Option<Arc<DataBlock>>,
1319}
1320
1321impl DeepSizeOf for MiniBlockCacheableState {
1322 fn deep_size_of_children(&self, context: &mut Context) -> usize {
1323 self.rep_index.deep_size_of_children(context)
1324 + self
1325 .dictionary
1326 .as_ref()
1327 .map(|dict| dict.data_size() as usize)
1328 .unwrap_or(0)
1329 }
1330}
1331
1332impl CachedPageData for MiniBlockCacheableState {
1333 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
1334 self
1335 }
1336}
1337
1338#[derive(Debug)]
1365pub struct MiniBlockScheduler {
1366 buffer_offsets_and_sizes: Vec<(u64, u64)>,
1368 priority: u64,
1369 items_in_page: u64,
1370 repetition_index_depth: u16,
1371 num_buffers: u64,
1372 rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
1373 def_decompressor: Option<Arc<dyn BlockDecompressor>>,
1374 value_decompressor: Arc<dyn MiniBlockDecompressor>,
1375 def_meaning: Arc<[DefinitionInterpretation]>,
1376 dictionary: Option<MiniBlockSchedulerDictionary>,
1377 page_meta: Option<Arc<MiniBlockCacheableState>>,
1379 has_large_chunk: bool,
1380}
1381
1382impl MiniBlockScheduler {
1383 fn try_new(
1384 buffer_offsets_and_sizes: &[(u64, u64)],
1385 priority: u64,
1386 items_in_page: u64,
1387 layout: &pb21::MiniBlockLayout,
1388 decompressors: &dyn DecompressionStrategy,
1389 ) -> Result<Self> {
1390 let rep_decompressor = layout
1391 .rep_compression
1392 .as_ref()
1393 .map(|rep_compression| {
1394 decompressors
1395 .create_block_decompressor(rep_compression)
1396 .map(Arc::from)
1397 })
1398 .transpose()?;
1399 let def_decompressor = layout
1400 .def_compression
1401 .as_ref()
1402 .map(|def_compression| {
1403 decompressors
1404 .create_block_decompressor(def_compression)
1405 .map(Arc::from)
1406 })
1407 .transpose()?;
1408 let def_meaning = layout
1409 .layers
1410 .iter()
1411 .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
1412 .collect::<Vec<_>>();
1413 let value_decompressor = decompressors.create_miniblock_decompressor(
1414 layout.value_compression.as_ref().unwrap(),
1415 decompressors,
1416 )?;
1417
1418 let dictionary = if let Some(dictionary_encoding) = layout.dictionary.as_ref() {
1419 let num_dictionary_items = layout.num_dictionary_items;
1420 let dictionary_decompressor = decompressors
1421 .create_block_decompressor(dictionary_encoding)?
1422 .into();
1423 let dictionary_data_alignment = match dictionary_encoding.compression.as_ref().unwrap()
1424 {
1425 Compression::Variable(_) => 4,
1426 Compression::Flat(_) => 16,
1427 Compression::General(_) => 1,
1428 Compression::InlineBitpacking(_) | Compression::OutOfLineBitpacking(_) => {
1429 crate::encoder::MIN_PAGE_BUFFER_ALIGNMENT
1430 }
1431 _ => {
1432 return Err(Error::invalid_input_source(
1433 format!(
1434 "Unsupported mini-block dictionary encoding: {:?}",
1435 dictionary_encoding.compression.as_ref().unwrap()
1436 )
1437 .into(),
1438 ));
1439 }
1440 };
1441 Some(MiniBlockSchedulerDictionary {
1442 dictionary_decompressor,
1443 dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1444 dictionary_data_alignment,
1445 num_dictionary_items,
1446 })
1447 } else {
1448 None
1449 };
1450
1451 Ok(Self {
1452 buffer_offsets_and_sizes: buffer_offsets_and_sizes.to_vec(),
1453 rep_decompressor,
1454 def_decompressor,
1455 value_decompressor: value_decompressor.into(),
1456 repetition_index_depth: layout.repetition_index_depth as u16,
1457 num_buffers: layout.num_buffers,
1458 priority,
1459 items_in_page,
1460 dictionary,
1461 def_meaning: def_meaning.into(),
1462 page_meta: None,
1463 has_large_chunk: layout.has_large_chunk,
1464 })
1465 }
1466
1467 fn lookup_chunks(&self, chunk_indices: &[usize]) -> Vec<LoadedChunk> {
1468 let page_meta = self.page_meta.as_ref().unwrap();
1469 chunk_indices
1470 .iter()
1471 .map(|&chunk_idx| {
1472 let chunk_meta = &page_meta.chunk_meta[chunk_idx];
1473 let bytes_start = chunk_meta.offset_bytes;
1474 let bytes_end = bytes_start + chunk_meta.chunk_size_bytes;
1475 LoadedChunk {
1476 byte_range: bytes_start..bytes_end,
1477 items_in_chunk: chunk_meta.num_values,
1478 chunk_idx,
1479 data: LanceBuffer::empty(),
1480 }
1481 })
1482 .collect()
1483 }
1484}
1485
1486#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1487enum PreambleAction {
1488 Take,
1489 Skip,
1490 Absent,
1491}
1492
1493#[derive(Clone, Debug, PartialEq, Eq)]
1528struct ChunkInstructions {
1529 chunk_idx: usize,
1531 preamble: PreambleAction,
1537 rows_to_skip: u64,
1541 rows_to_take: u64,
1544 take_trailer: bool,
1551}
1552
1553#[derive(Debug, PartialEq, Eq)]
1571struct ChunkDrainInstructions {
1572 chunk_instructions: ChunkInstructions,
1573 rows_to_skip: u64,
1574 rows_to_take: u64,
1575 preamble_action: PreambleAction,
1576}
1577
1578impl ChunkInstructions {
1579 fn schedule_instructions(
1585 rep_index: &MiniBlockRepIndex,
1586 user_ranges: &[Range<u64>],
1587 ) -> Vec<Self> {
1588 let mut chunk_instructions = Vec::with_capacity(user_ranges.len());
1592
1593 for user_range in user_ranges {
1594 let mut rows_needed = user_range.end - user_range.start;
1595 let mut need_preamble = false;
1596
1597 let mut block_index = match rep_index
1600 .blocks
1601 .binary_search_by_key(&user_range.start, |block| block.first_row)
1602 {
1603 Ok(idx) => {
1604 let mut idx = idx;
1607 while idx > 0 && rep_index.blocks[idx - 1].first_row == user_range.start {
1608 idx -= 1;
1609 }
1610 idx
1611 }
1612 Err(idx) => idx - 1,
1614 };
1615
1616 let mut to_skip = user_range.start - rep_index.blocks[block_index].first_row;
1617
1618 while rows_needed > 0 || need_preamble {
1619 if block_index >= rep_index.blocks.len() {
1621 log::warn!(
1622 "schedule_instructions inconsistency: block_index >= rep_index.blocks.len(), exiting early"
1623 );
1624 break;
1625 }
1626
1627 let chunk = &rep_index.blocks[block_index];
1628 let rows_avail = chunk.starts_including_trailer.saturating_sub(to_skip);
1629
1630 if rows_avail == 0 && to_skip == 0 {
1634 if chunk.has_preamble && need_preamble {
1636 chunk_instructions.push(Self {
1637 chunk_idx: block_index,
1638 preamble: PreambleAction::Take,
1639 rows_to_skip: 0,
1640 rows_to_take: 0,
1641 take_trailer: chunk.has_trailer,
1645 });
1646 if chunk.starts_including_trailer > 0
1650 || block_index == rep_index.blocks.len() - 1
1651 {
1652 need_preamble = false;
1653 }
1654 }
1655 block_index += 1;
1657 continue;
1658 }
1659
1660 if rows_avail == 0 && to_skip > 0 {
1664 to_skip -= chunk.starts_including_trailer;
1667 block_index += 1;
1668 continue;
1669 }
1670
1671 let rows_to_take = rows_avail.min(rows_needed);
1672 rows_needed -= rows_to_take;
1673
1674 let mut take_trailer = false;
1675 let preamble = if chunk.has_preamble {
1676 if need_preamble {
1677 PreambleAction::Take
1678 } else {
1679 PreambleAction::Skip
1680 }
1681 } else {
1682 PreambleAction::Absent
1683 };
1684
1685 if rows_to_take == rows_avail && chunk.has_trailer {
1687 take_trailer = true;
1688 need_preamble = true;
1689 } else {
1690 need_preamble = false;
1691 };
1692
1693 chunk_instructions.push(Self {
1694 preamble,
1695 chunk_idx: block_index,
1696 rows_to_skip: to_skip,
1697 rows_to_take,
1698 take_trailer,
1699 });
1700
1701 to_skip = 0;
1702 block_index += 1;
1703 }
1704 }
1705
1706 if user_ranges.len() > 1 {
1710 let mut merged_instructions = Vec::with_capacity(chunk_instructions.len());
1712 let mut instructions_iter = chunk_instructions.into_iter();
1713 merged_instructions.push(instructions_iter.next().unwrap());
1714 for instruction in instructions_iter {
1715 let last = merged_instructions.last_mut().unwrap();
1716 if last.chunk_idx == instruction.chunk_idx
1717 && last.rows_to_take + last.rows_to_skip == instruction.rows_to_skip
1718 {
1719 last.rows_to_take += instruction.rows_to_take;
1720 last.take_trailer |= instruction.take_trailer;
1721 } else {
1722 merged_instructions.push(instruction);
1723 }
1724 }
1725 merged_instructions
1726 } else {
1727 chunk_instructions
1728 }
1729 }
1730
1731 fn drain_from_instruction(
1732 &self,
1733 rows_desired: &mut u64,
1734 need_preamble: &mut bool,
1735 skip_in_chunk: &mut u64,
1736 ) -> (ChunkDrainInstructions, bool) {
1737 debug_assert!(!*need_preamble || *skip_in_chunk == 0);
1739 let rows_avail = self.rows_to_take - *skip_in_chunk;
1740 let has_preamble = self.preamble != PreambleAction::Absent;
1741 let preamble_action = match (*need_preamble, has_preamble) {
1742 (true, true) => PreambleAction::Take,
1743 (true, false) => panic!("Need preamble but there isn't one"),
1744 (false, true) => PreambleAction::Skip,
1745 (false, false) => PreambleAction::Absent,
1746 };
1747
1748 let rows_taking = if *rows_desired >= rows_avail {
1751 *need_preamble = self.take_trailer;
1759 rows_avail
1760 } else {
1761 *need_preamble = false;
1764 *rows_desired
1765 };
1766 let rows_skipped = *skip_in_chunk;
1767
1768 let consumed_chunk = if *rows_desired >= rows_avail {
1770 *rows_desired -= rows_avail;
1771 *skip_in_chunk = 0;
1772 true
1773 } else {
1774 *skip_in_chunk += *rows_desired;
1775 *rows_desired = 0;
1776 false
1777 };
1778
1779 (
1780 ChunkDrainInstructions {
1781 chunk_instructions: self.clone(),
1782 rows_to_skip: rows_skipped,
1783 rows_to_take: rows_taking,
1784 preamble_action,
1785 },
1786 consumed_chunk,
1787 )
1788 }
1789}
1790
1791enum Words {
1792 U16(ScalarBuffer<u16>),
1793 U32(ScalarBuffer<u32>),
1794}
1795
1796struct WordsIter<'a> {
1797 iter: Box<dyn Iterator<Item = u32> + 'a>,
1798}
1799
1800impl Words {
1801 pub fn len(&self) -> usize {
1802 match self {
1803 Self::U16(b) => b.len(),
1804 Self::U32(b) => b.len(),
1805 }
1806 }
1807
1808 pub fn iter(&self) -> WordsIter<'_> {
1809 match self {
1810 Self::U16(buf) => WordsIter {
1811 iter: Box::new(buf.iter().map(|&x| x as u32)),
1812 },
1813 Self::U32(buf) => WordsIter {
1814 iter: Box::new(buf.iter().copied()),
1815 },
1816 }
1817 }
1818
1819 pub fn from_bytes(bytes: Bytes, has_large_chunk: bool) -> Result<Self> {
1820 let bytes_per_value = if has_large_chunk { 4 } else { 2 };
1821 assert_eq!(bytes.len() % bytes_per_value, 0);
1822 let buffer = LanceBuffer::from_bytes(bytes, bytes_per_value as u64);
1823 if has_large_chunk {
1824 Ok(Self::U32(buffer.borrow_to_typed_slice::<u32>()))
1825 } else {
1826 Ok(Self::U16(buffer.borrow_to_typed_slice::<u16>()))
1827 }
1828 }
1829}
1830
1831impl<'a> Iterator for WordsIter<'a> {
1832 type Item = u32;
1833
1834 fn next(&mut self) -> Option<Self::Item> {
1835 self.iter.next()
1836 }
1837}
1838
1839impl StructuralPageScheduler for MiniBlockScheduler {
1840 fn initialize<'a>(
1841 &'a mut self,
1842 io: &Arc<dyn EncodingsIo>,
1843 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
1844 let (meta_buf_position, meta_buf_size) = self.buffer_offsets_and_sizes[0];
1848 let value_buf_position = self.buffer_offsets_and_sizes[1].0;
1849 let mut bufs_needed = 1;
1850 if self.dictionary.is_some() {
1851 bufs_needed += 1;
1852 }
1853 if self.repetition_index_depth > 0 {
1854 bufs_needed += 1;
1855 }
1856 let mut required_ranges = Vec::with_capacity(bufs_needed);
1857 required_ranges.push(meta_buf_position..meta_buf_position + meta_buf_size);
1858 if let Some(ref dictionary) = self.dictionary {
1859 required_ranges.push(
1860 dictionary.dictionary_buf_position_and_size.0
1861 ..dictionary.dictionary_buf_position_and_size.0
1862 + dictionary.dictionary_buf_position_and_size.1,
1863 );
1864 }
1865 if self.repetition_index_depth > 0 {
1866 let (rep_index_pos, rep_index_size) = self.buffer_offsets_and_sizes.last().unwrap();
1867 required_ranges.push(*rep_index_pos..*rep_index_pos + *rep_index_size);
1868 }
1869 let io_req = io.submit_request(required_ranges, 0);
1870
1871 async move {
1872 let mut buffers = io_req.await?.into_iter().fuse();
1873 let meta_bytes = buffers.next().unwrap();
1874 let dictionary_bytes = self.dictionary.as_ref().and_then(|_| buffers.next());
1875 let rep_index_bytes = buffers.next();
1876
1877 let words = Words::from_bytes(meta_bytes, self.has_large_chunk)?;
1879 let mut chunk_meta = Vec::with_capacity(words.len());
1880
1881 let mut rows_counter = 0;
1882 let mut offset_bytes = value_buf_position;
1883 for (word_idx, word) in words.iter().enumerate() {
1884 let log_num_values = word & 0x0F;
1885 let divided_bytes = word >> 4;
1886 let num_bytes = (divided_bytes as usize + 1) * MINIBLOCK_ALIGNMENT;
1887 debug_assert!(num_bytes > 0);
1888 let num_values = if word_idx < words.len() - 1 {
1889 debug_assert!(log_num_values > 0);
1890 1 << log_num_values
1891 } else {
1892 debug_assert!(
1893 log_num_values == 0
1894 || (1 << log_num_values) == (self.items_in_page - rows_counter)
1895 );
1896 self.items_in_page - rows_counter
1897 };
1898 rows_counter += num_values;
1899
1900 chunk_meta.push(ChunkMeta {
1901 num_values,
1902 chunk_size_bytes: num_bytes as u64,
1903 offset_bytes,
1904 });
1905 offset_bytes += num_bytes as u64;
1906 }
1907
1908 let rep_index = if let Some(rep_index_data) = rep_index_bytes {
1910 assert!(rep_index_data.len() % 8 == 0);
1911 let stride = self.repetition_index_depth as usize + 1;
1912 MiniBlockRepIndex::decode_from_bytes(&rep_index_data, stride)
1913 } else {
1914 MiniBlockRepIndex::default_from_chunks(&chunk_meta)
1915 };
1916
1917 let mut page_meta = MiniBlockCacheableState {
1918 chunk_meta,
1919 rep_index,
1920 dictionary: None,
1921 };
1922
1923 if let Some(ref mut dictionary) = self.dictionary {
1925 let dictionary_data = dictionary_bytes.unwrap();
1926 page_meta.dictionary =
1927 Some(Arc::new(dictionary.dictionary_decompressor.decompress(
1928 LanceBuffer::from_bytes(
1929 dictionary_data,
1930 dictionary.dictionary_data_alignment,
1931 ),
1932 dictionary.num_dictionary_items,
1933 )?));
1934 };
1935 let page_meta = Arc::new(page_meta);
1936 self.page_meta = Some(page_meta.clone());
1937 Ok(page_meta as Arc<dyn CachedPageData>)
1938 }
1939 .boxed()
1940 }
1941
1942 fn load(&mut self, data: &Arc<dyn CachedPageData>) {
1943 self.page_meta = Some(
1944 data.clone()
1945 .as_arc_any()
1946 .downcast::<MiniBlockCacheableState>()
1947 .unwrap(),
1948 );
1949 }
1950
1951 fn schedule_ranges(
1952 &self,
1953 ranges: &[Range<u64>],
1954 io: &Arc<dyn EncodingsIo>,
1955 ) -> Result<Vec<PageLoadTask>> {
1956 let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
1957
1958 let page_meta = self.page_meta.as_ref().unwrap();
1959
1960 let chunk_instructions =
1961 ChunkInstructions::schedule_instructions(&page_meta.rep_index, ranges);
1962
1963 debug_assert_eq!(
1964 num_rows,
1965 chunk_instructions
1966 .iter()
1967 .map(|ci| ci.rows_to_take)
1968 .sum::<u64>()
1969 );
1970
1971 let chunks_needed = chunk_instructions
1972 .iter()
1973 .map(|ci| ci.chunk_idx)
1974 .unique()
1975 .collect::<Vec<_>>();
1976
1977 let mut loaded_chunks = self.lookup_chunks(&chunks_needed);
1978 let chunk_ranges = loaded_chunks
1979 .iter()
1980 .map(|c| c.byte_range.clone())
1981 .collect::<Vec<_>>();
1982 let loaded_chunk_data = io.submit_request(chunk_ranges, self.priority);
1983
1984 let rep_decompressor = self.rep_decompressor.clone();
1985 let def_decompressor = self.def_decompressor.clone();
1986 let value_decompressor = self.value_decompressor.clone();
1987 let num_buffers = self.num_buffers;
1988 let has_large_chunk = self.has_large_chunk;
1989 let dictionary = page_meta
1990 .dictionary
1991 .as_ref()
1992 .map(|dictionary| dictionary.clone());
1993 let def_meaning = self.def_meaning.clone();
1994
1995 let res = async move {
1996 let loaded_chunk_data = loaded_chunk_data.await?;
1997 for (loaded_chunk, chunk_data) in loaded_chunks.iter_mut().zip(loaded_chunk_data) {
1998 loaded_chunk.data = LanceBuffer::from_bytes(chunk_data, 1);
1999 }
2000
2001 Ok(Box::new(MiniBlockDecoder {
2002 rep_decompressor,
2003 def_decompressor,
2004 value_decompressor,
2005 def_meaning,
2006 loaded_chunks: VecDeque::from_iter(loaded_chunks),
2007 instructions: VecDeque::from(chunk_instructions),
2008 offset_in_current_chunk: 0,
2009 dictionary,
2010 num_rows,
2011 num_buffers,
2012 has_large_chunk,
2013 }) as Box<dyn StructuralPageDecoder>)
2014 }
2015 .boxed();
2016 let page_load_task = PageLoadTask {
2017 decoder_fut: res,
2018 num_rows,
2019 };
2020 Ok(vec![page_load_task])
2021 }
2022}
2023
2024#[derive(Debug, Clone, Copy)]
2025struct FullZipRepIndexDetails {
2026 buf_position: u64,
2027 bytes_per_value: u64, }
2029
2030#[derive(Debug)]
2031enum PerValueDecompressor {
2032 Fixed(Arc<dyn FixedPerValueDecompressor>),
2033 Variable(Arc<dyn VariablePerValueDecompressor>),
2034}
2035
2036#[derive(Debug)]
2037struct FullZipDecodeDetails {
2038 value_decompressor: PerValueDecompressor,
2039 def_meaning: Arc<[DefinitionInterpretation]>,
2040 ctrl_word_parser: ControlWordParser,
2041 max_rep: u16,
2042 max_visible_def: u16,
2043}
2044
2045#[derive(Debug, Clone)]
2057enum FullZipReadSource {
2058 Remote(Arc<dyn EncodingsIo>),
2060 PrefetchedPage { base_offset: u64, data: LanceBuffer },
2062}
2063
2064impl FullZipReadSource {
2065 fn fetch(
2069 &self,
2070 ranges: &[Range<u64>],
2071 priority: u64,
2072 ) -> BoxFuture<'static, Result<VecDeque<LanceBuffer>>> {
2073 match self {
2074 Self::Remote(io) => {
2075 let io = io.clone();
2076 let ranges = ranges.to_vec();
2077 async move {
2078 let data = io.submit_request(ranges, priority).await?;
2079 Ok(data
2080 .into_iter()
2081 .map(|bytes| LanceBuffer::from_bytes(bytes, 1))
2082 .collect::<VecDeque<_>>())
2083 }
2084 .boxed()
2085 }
2086 Self::PrefetchedPage { base_offset, data } => {
2087 let base_offset = *base_offset;
2088 let data = data.clone();
2089 let page_end = base_offset + data.len() as u64;
2090 std::future::ready(
2091 ranges
2092 .iter()
2093 .map(|range| {
2094 if range.start > range.end
2095 || range.start < base_offset
2096 || range.end > page_end
2097 {
2098 return Err(Error::internal(format!(
2099 "Requested range {:?} is outside page range {}..{}",
2100 range, base_offset, page_end
2101 )));
2102 }
2103 let start = (range.start - base_offset) as usize;
2104 let len = (range.end - range.start) as usize;
2105 Ok(data.slice_with_length(start, len))
2106 })
2107 .collect::<Result<VecDeque<_>>>(),
2108 )
2109 .boxed()
2110 }
2111 }
2112 }
2113}
2114
2115#[derive(Debug)]
2123pub struct FullZipScheduler {
2124 data_buf_position: u64,
2125 data_buf_size: u64,
2126 rep_index: Option<FullZipRepIndexDetails>,
2127 priority: u64,
2128 rows_in_page: u64,
2129 bits_per_offset: u8,
2130 details: Arc<FullZipDecodeDetails>,
2131 cached_state: Option<Arc<FullZipCacheableState>>,
2133 enable_cache: bool,
2135}
2136
2137impl FullZipScheduler {
2138 fn try_new(
2139 buffer_offsets_and_sizes: &[(u64, u64)],
2140 priority: u64,
2141 rows_in_page: u64,
2142 layout: &pb21::FullZipLayout,
2143 decompressors: &dyn DecompressionStrategy,
2144 ) -> Result<Self> {
2145 let (data_buf_position, data_buf_size) = buffer_offsets_and_sizes[0];
2146 let rep_index = buffer_offsets_and_sizes.get(1).map(|(pos, len)| {
2147 let num_reps = rows_in_page + 1;
2148 let bytes_per_rep = len / num_reps;
2149 debug_assert_eq!(len % num_reps, 0);
2150 debug_assert!(
2151 bytes_per_rep == 1
2152 || bytes_per_rep == 2
2153 || bytes_per_rep == 4
2154 || bytes_per_rep == 8
2155 );
2156 FullZipRepIndexDetails {
2157 buf_position: *pos,
2158 bytes_per_value: bytes_per_rep,
2159 }
2160 });
2161
2162 let value_decompressor = match layout.details {
2163 Some(pb21::full_zip_layout::Details::BitsPerValue(_)) => {
2164 let decompressor = decompressors.create_fixed_per_value_decompressor(
2165 layout.value_compression.as_ref().unwrap(),
2166 )?;
2167 PerValueDecompressor::Fixed(decompressor.into())
2168 }
2169 Some(pb21::full_zip_layout::Details::BitsPerOffset(_)) => {
2170 let decompressor = decompressors.create_variable_per_value_decompressor(
2171 layout.value_compression.as_ref().unwrap(),
2172 )?;
2173 PerValueDecompressor::Variable(decompressor.into())
2174 }
2175 None => {
2176 panic!("Full-zip layout must have a `details` field");
2177 }
2178 };
2179 let ctrl_word_parser = ControlWordParser::new(
2180 layout.bits_rep.try_into().unwrap(),
2181 layout.bits_def.try_into().unwrap(),
2182 );
2183 let def_meaning = layout
2184 .layers
2185 .iter()
2186 .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
2187 .collect::<Vec<_>>();
2188
2189 let max_rep = def_meaning.iter().filter(|d| d.is_list()).count() as u16;
2190 let max_visible_def = def_meaning
2191 .iter()
2192 .filter(|d| !d.is_list())
2193 .map(|d| d.num_def_levels())
2194 .sum();
2195
2196 let bits_per_offset = match layout.details {
2197 Some(pb21::full_zip_layout::Details::BitsPerValue(_)) => 32,
2198 Some(pb21::full_zip_layout::Details::BitsPerOffset(bits_per_offset)) => {
2199 bits_per_offset as u8
2200 }
2201 None => panic!("Full-zip layout must have a `details` field"),
2202 };
2203
2204 let details = Arc::new(FullZipDecodeDetails {
2205 value_decompressor,
2206 def_meaning: def_meaning.into(),
2207 ctrl_word_parser,
2208 max_rep,
2209 max_visible_def,
2210 });
2211 Ok(Self {
2212 data_buf_position,
2213 data_buf_size,
2214 rep_index,
2215 details,
2216 priority,
2217 rows_in_page,
2218 bits_per_offset,
2219 cached_state: None,
2220 enable_cache: false,
2221 })
2222 }
2223
2224 fn covers_entire_page(ranges: &[Range<u64>], rows_in_page: u64) -> bool {
2225 if ranges.is_empty() {
2226 return false;
2227 }
2228 let mut expected_start = 0;
2229 for range in ranges {
2230 if range.start != expected_start || range.end > rows_in_page || range.end < range.start
2231 {
2232 return false;
2233 }
2234 expected_start = range.end;
2235 }
2236 expected_start == rows_in_page
2237 }
2238
2239 fn create_page_load_task(
2240 io_future: BoxFuture<'static, Result<Vec<Bytes>>>,
2241 num_rows: u64,
2242 details: Arc<FullZipDecodeDetails>,
2243 bits_per_offset: u8,
2244 ) -> PageLoadTask {
2245 let load_task = async move {
2246 let buffers = io_future.await?;
2247 let data = buffers
2248 .into_iter()
2249 .map(|bytes| LanceBuffer::from_bytes(bytes, 1))
2250 .collect::<VecDeque<_>>();
2251 Self::create_decoder(details, data, num_rows, bits_per_offset)
2252 }
2253 .boxed();
2254 PageLoadTask {
2255 decoder_fut: load_task,
2256 num_rows,
2257 }
2258 }
2259
2260 fn create_decoder(
2262 details: Arc<FullZipDecodeDetails>,
2263 data: VecDeque<LanceBuffer>,
2264 num_rows: u64,
2265 bits_per_offset: u8,
2266 ) -> Result<Box<dyn StructuralPageDecoder>> {
2267 match &details.value_decompressor {
2268 PerValueDecompressor::Fixed(decompressor) => {
2269 let bits_per_value = decompressor.bits_per_value();
2270 if bits_per_value % 8 != 0 {
2271 return Err(lance_core::Error::not_supported_source("Bit-packed full-zip encoding (non-byte-aligned values) is not yet implemented".into()));
2272 }
2273 let bytes_per_value = bits_per_value / 8;
2274 let total_bytes_per_value =
2275 bytes_per_value as usize + details.ctrl_word_parser.bytes_per_word();
2276 if total_bytes_per_value == 0 {
2277 return Err(lance_core::Error::internal(
2278 "Invalid encoding: per-row byte width must be greater than 0",
2279 ));
2280 }
2281 Ok(Box::new(FixedFullZipDecoder {
2282 details,
2283 data,
2284 num_rows,
2285 offset_in_current: 0,
2286 bytes_per_value: bytes_per_value as usize,
2287 total_bytes_per_value,
2288 }) as Box<dyn StructuralPageDecoder>)
2289 }
2290 PerValueDecompressor::Variable(_decompressor) => {
2291 Ok(Box::new(VariableFullZipDecoder::new(
2292 details,
2293 data,
2294 num_rows,
2295 bits_per_offset,
2296 bits_per_offset,
2297 )))
2298 }
2299 }
2300 }
2301
2302 fn extract_byte_ranges_from_pairs(
2305 buffer: LanceBuffer,
2306 bytes_per_value: u64,
2307 data_buf_position: u64,
2308 ) -> Vec<Range<u64>> {
2309 ByteUnpacker::new(buffer, bytes_per_value as usize)
2310 .chunks(2)
2311 .into_iter()
2312 .map(|mut c| {
2313 let start = c.next().unwrap() + data_buf_position;
2314 let end = c.next().unwrap() + data_buf_position;
2315 start..end
2316 })
2317 .collect::<Vec<_>>()
2318 }
2319
2320 fn extract_byte_ranges_from_cached(
2323 buffer: &LanceBuffer,
2324 ranges: &[Range<u64>],
2325 bytes_per_value: u64,
2326 data_buf_position: u64,
2327 ) -> Vec<Range<u64>> {
2328 ranges
2329 .iter()
2330 .map(|r| {
2331 let start_offset = (r.start * bytes_per_value) as usize;
2332 let end_offset = (r.end * bytes_per_value) as usize;
2333
2334 let start_slice = &buffer[start_offset..start_offset + bytes_per_value as usize];
2335 let start_val =
2336 ByteUnpacker::new(start_slice.iter().copied(), bytes_per_value as usize)
2337 .next()
2338 .unwrap();
2339
2340 let end_slice = &buffer[end_offset..end_offset + bytes_per_value as usize];
2341 let end_val =
2342 ByteUnpacker::new(end_slice.iter().copied(), bytes_per_value as usize)
2343 .next()
2344 .unwrap();
2345
2346 (data_buf_position + start_val)..(data_buf_position + end_val)
2347 })
2348 .collect()
2349 }
2350
2351 fn compute_rep_index_ranges(
2353 ranges: &[Range<u64>],
2354 rep_index: &FullZipRepIndexDetails,
2355 ) -> Vec<Range<u64>> {
2356 ranges
2357 .iter()
2358 .flat_map(|r| {
2359 let first_val_start =
2360 rep_index.buf_position + (r.start * rep_index.bytes_per_value);
2361 let first_val_end = first_val_start + rep_index.bytes_per_value;
2362 let last_val_start = rep_index.buf_position + (r.end * rep_index.bytes_per_value);
2363 let last_val_end = last_val_start + rep_index.bytes_per_value;
2364 [first_val_start..first_val_end, last_val_start..last_val_end]
2365 })
2366 .collect()
2367 }
2368
2369 fn schedule_ranges_rep(
2371 &self,
2372 ranges: &[Range<u64>],
2373 io: &Arc<dyn EncodingsIo>,
2374 rep_index: FullZipRepIndexDetails,
2375 ) -> Result<Vec<PageLoadTask>> {
2376 let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
2377 let data_buf_position = self.data_buf_position;
2378 let priority = self.priority;
2379 let details = self.details.clone();
2380 let bits_per_offset = self.bits_per_offset;
2381
2382 if Self::covers_entire_page(ranges, self.rows_in_page) {
2383 let full_range = self.data_buf_position..(self.data_buf_position + self.data_buf_size);
2384 let page_data = io.submit_single(full_range.clone(), priority);
2385 let load_task = async move {
2386 let page_data = page_data.await?;
2387 let source = FullZipReadSource::PrefetchedPage {
2388 base_offset: full_range.start,
2389 data: LanceBuffer::from_bytes(page_data, 1),
2390 };
2391 let read_ranges = vec![full_range];
2392 let data = source.fetch(&read_ranges, priority).await?;
2393 Self::create_decoder(details, data, num_rows, bits_per_offset)
2394 }
2395 .boxed();
2396 let page_load_task = PageLoadTask {
2397 decoder_fut: load_task,
2398 num_rows,
2399 };
2400 return Ok(vec![page_load_task]);
2401 }
2402
2403 if let Some(cached_state) = &self.cached_state {
2404 let byte_ranges = Self::extract_byte_ranges_from_cached(
2405 &cached_state.rep_index_buffer,
2406 ranges,
2407 rep_index.bytes_per_value,
2408 data_buf_position,
2409 );
2410 let io_future = io.submit_request(byte_ranges, priority);
2411 let page_load_task =
2412 Self::create_page_load_task(io_future, num_rows, details, bits_per_offset);
2413 return Ok(vec![page_load_task]);
2414 }
2415
2416 let rep_ranges = Self::compute_rep_index_ranges(ranges, &rep_index);
2417 let rep_data = io.submit_request(rep_ranges, priority);
2418 let io_clone = io.clone();
2419 let load_task = async move {
2420 let rep_data = rep_data.await?;
2421 let rep_buffer = LanceBuffer::concat(
2422 &rep_data
2423 .into_iter()
2424 .map(|d| LanceBuffer::from_bytes(d, 1))
2425 .collect::<Vec<_>>(),
2426 );
2427 let byte_ranges = Self::extract_byte_ranges_from_pairs(
2428 rep_buffer,
2429 rep_index.bytes_per_value,
2430 data_buf_position,
2431 );
2432 let source = FullZipReadSource::Remote(io_clone);
2433 let data = source.fetch(&byte_ranges, priority).await?;
2434 Self::create_decoder(details, data, num_rows, bits_per_offset)
2435 }
2436 .boxed();
2437 let page_load_task = PageLoadTask {
2438 decoder_fut: load_task,
2439 num_rows,
2440 };
2441 Ok(vec![page_load_task])
2442 }
2443
2444 fn schedule_ranges_simple(
2448 &self,
2449 ranges: &[Range<u64>],
2450 io: &Arc<dyn EncodingsIo>,
2451 ) -> Result<Vec<PageLoadTask>> {
2452 let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
2454
2455 let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor else {
2456 unreachable!()
2457 };
2458
2459 let bits_per_value = decompressor.bits_per_value();
2461 if !bits_per_value.is_multiple_of(8) {
2462 return Err(Error::invalid_input_source(
2463 format!(
2464 "Full-zip fixed-width values must be byte aligned, got {} bits per value",
2465 bits_per_value
2466 )
2467 .into(),
2468 ));
2469 }
2470 let bytes_per_value = bits_per_value / 8;
2471 let bytes_per_cw = self.details.ctrl_word_parser.bytes_per_word();
2472 let total_bytes_per_value = bytes_per_value + bytes_per_cw as u64;
2473 let byte_ranges = ranges
2474 .iter()
2475 .map(|r| {
2476 debug_assert!(r.end <= self.rows_in_page);
2477 let start = self.data_buf_position + r.start * total_bytes_per_value;
2478 let end = self.data_buf_position + r.end * total_bytes_per_value;
2479 start..end
2480 })
2481 .collect::<Vec<_>>();
2482
2483 let io_future = io.submit_request(byte_ranges, self.priority);
2484 let page_load_task = Self::create_page_load_task(
2485 io_future,
2486 num_rows,
2487 self.details.clone(),
2488 self.bits_per_offset,
2489 );
2490 Ok(vec![page_load_task])
2491 }
2492}
2493
2494#[derive(Debug)]
2496struct FullZipCacheableState {
2497 rep_index_buffer: LanceBuffer,
2499}
2500
2501impl DeepSizeOf for FullZipCacheableState {
2502 fn deep_size_of_children(&self, _context: &mut Context) -> usize {
2503 self.rep_index_buffer.len()
2504 }
2505}
2506
2507impl CachedPageData for FullZipCacheableState {
2508 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
2509 self
2510 }
2511}
2512
2513impl StructuralPageScheduler for FullZipScheduler {
2514 fn initialize<'a>(
2515 &'a mut self,
2516 io: &Arc<dyn EncodingsIo>,
2517 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
2518 if self.enable_cache
2519 && let Some(rep_index) = self.rep_index
2520 {
2521 let total_size = (self.rows_in_page + 1) * rep_index.bytes_per_value;
2522 let rep_index_range = rep_index.buf_position..(rep_index.buf_position + total_size);
2523 let io_clone = io.clone();
2524 return async move {
2525 let rep_index_data = io_clone.submit_request(vec![rep_index_range], 0).await?;
2526 let state = Arc::new(FullZipCacheableState {
2527 rep_index_buffer: LanceBuffer::from_bytes(rep_index_data[0].clone(), 1),
2528 });
2529 self.cached_state = Some(state.clone());
2530 Ok(state as Arc<dyn CachedPageData>)
2531 }
2532 .boxed();
2533 }
2534 std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
2535 }
2536
2537 fn load(&mut self, cache: &Arc<dyn CachedPageData>) {
2541 if let Ok(cached_state) = cache
2543 .clone()
2544 .as_arc_any()
2545 .downcast::<FullZipCacheableState>()
2546 {
2547 self.cached_state = Some(cached_state);
2549 }
2550 }
2551
2552 fn schedule_ranges(
2553 &self,
2554 ranges: &[Range<u64>],
2555 io: &Arc<dyn EncodingsIo>,
2556 ) -> Result<Vec<PageLoadTask>> {
2557 if let Some(rep_index) = self.rep_index {
2558 self.schedule_ranges_rep(ranges, io, rep_index)
2559 } else {
2560 self.schedule_ranges_simple(ranges, io)
2561 }
2562 }
2563}
2564
2565#[derive(Debug)]
2573struct FixedFullZipDecoder {
2574 details: Arc<FullZipDecodeDetails>,
2575 data: VecDeque<LanceBuffer>,
2576 offset_in_current: usize,
2577 bytes_per_value: usize,
2578 total_bytes_per_value: usize,
2579 num_rows: u64,
2580}
2581
2582impl FixedFullZipDecoder {
2583 fn slice_next_task(&mut self, num_rows: u64) -> FullZipDecodeTaskItem {
2584 debug_assert!(num_rows > 0);
2585 let cur_buf = self.data.front_mut().unwrap();
2586 let start = self.offset_in_current;
2587 if self.details.ctrl_word_parser.has_rep() {
2588 let mut rows_started = 0;
2591 let mut num_items = 0;
2594 while self.offset_in_current < cur_buf.len() {
2595 let control = self.details.ctrl_word_parser.parse_desc(
2596 &cur_buf[self.offset_in_current..],
2597 self.details.max_rep,
2598 self.details.max_visible_def,
2599 );
2600 if control.is_new_row {
2601 if rows_started == num_rows {
2602 break;
2603 }
2604 rows_started += 1;
2605 }
2606 num_items += 1;
2607 if control.is_visible {
2608 self.offset_in_current += self.total_bytes_per_value;
2609 } else {
2610 self.offset_in_current += self.details.ctrl_word_parser.bytes_per_word();
2611 }
2612 }
2613
2614 let task_slice = cur_buf.slice_with_length(start, self.offset_in_current - start);
2615 if self.offset_in_current == cur_buf.len() {
2616 self.data.pop_front();
2617 self.offset_in_current = 0;
2618 }
2619
2620 FullZipDecodeTaskItem {
2621 data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2622 data: task_slice,
2623 bits_per_value: self.bytes_per_value as u64 * 8,
2624 num_values: num_items,
2625 block_info: BlockInfo::new(),
2626 }),
2627 rows_in_buf: rows_started,
2628 }
2629 } else {
2630 let cur_buf = self.data.front_mut().unwrap();
2633 let bytes_avail = cur_buf.len() - self.offset_in_current;
2634 let offset_in_cur = self.offset_in_current;
2635
2636 let bytes_needed = num_rows as usize * self.total_bytes_per_value;
2637 let mut rows_taken = num_rows;
2638 let task_slice = if bytes_needed >= bytes_avail {
2639 self.offset_in_current = 0;
2640 rows_taken = bytes_avail as u64 / self.total_bytes_per_value as u64;
2641 self.data
2642 .pop_front()
2643 .unwrap()
2644 .slice_with_length(offset_in_cur, bytes_avail)
2645 } else {
2646 self.offset_in_current += bytes_needed;
2647 cur_buf.slice_with_length(offset_in_cur, bytes_needed)
2648 };
2649 FullZipDecodeTaskItem {
2650 data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2651 data: task_slice,
2652 bits_per_value: self.bytes_per_value as u64 * 8,
2653 num_values: rows_taken,
2654 block_info: BlockInfo::new(),
2655 }),
2656 rows_in_buf: rows_taken,
2657 }
2658 }
2659 }
2660}
2661
2662impl StructuralPageDecoder for FixedFullZipDecoder {
2663 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2664 let mut task_data = Vec::with_capacity(self.data.len());
2665 let mut remaining = num_rows;
2666 while remaining > 0 {
2667 let task_item = self.slice_next_task(remaining);
2668 remaining -= task_item.rows_in_buf;
2669 task_data.push(task_item);
2670 }
2671 Ok(Box::new(FixedFullZipDecodeTask {
2672 details: self.details.clone(),
2673 data: task_data,
2674 bytes_per_value: self.bytes_per_value,
2675 num_rows: num_rows as usize,
2676 }))
2677 }
2678
2679 fn num_rows(&self) -> u64 {
2680 self.num_rows
2681 }
2682}
2683
2684#[derive(Debug)]
2689struct VariableFullZipDecoder {
2690 details: Arc<FullZipDecodeDetails>,
2691 decompressor: Arc<dyn VariablePerValueDecompressor>,
2692 data: LanceBuffer,
2693 offsets: LanceBuffer,
2694 rep: ScalarBuffer<u16>,
2695 def: ScalarBuffer<u16>,
2696 repdef_starts: Vec<usize>,
2697 data_starts: Vec<usize>,
2698 offset_starts: Vec<usize>,
2699 visible_item_counts: Vec<u64>,
2700 bits_per_offset: u8,
2701 current_idx: usize,
2702 num_rows: u64,
2703}
2704
2705impl VariableFullZipDecoder {
2706 fn new(
2707 details: Arc<FullZipDecodeDetails>,
2708 data: VecDeque<LanceBuffer>,
2709 num_rows: u64,
2710 in_bits_per_length: u8,
2711 out_bits_per_offset: u8,
2712 ) -> Self {
2713 let decompressor = match details.value_decompressor {
2714 PerValueDecompressor::Variable(ref d) => d.clone(),
2715 _ => unreachable!(),
2716 };
2717
2718 assert_eq!(in_bits_per_length % 8, 0);
2719 assert!(out_bits_per_offset == 32 || out_bits_per_offset == 64);
2720
2721 let mut decoder = Self {
2722 details,
2723 decompressor,
2724 data: LanceBuffer::empty(),
2725 offsets: LanceBuffer::empty(),
2726 rep: LanceBuffer::empty().borrow_to_typed_slice(),
2727 def: LanceBuffer::empty().borrow_to_typed_slice(),
2728 bits_per_offset: out_bits_per_offset,
2729 repdef_starts: Vec::with_capacity(num_rows as usize + 1),
2730 data_starts: Vec::with_capacity(num_rows as usize + 1),
2731 offset_starts: Vec::with_capacity(num_rows as usize + 1),
2732 visible_item_counts: Vec::with_capacity(num_rows as usize + 1),
2733 current_idx: 0,
2734 num_rows,
2735 };
2736
2737 decoder.unzip(data, in_bits_per_length, out_bits_per_offset, num_rows);
2758
2759 decoder
2760 }
2761
2762 fn slice_batch_data_and_rebase_offsets_typed<T>(
2763 data: &LanceBuffer,
2764 offsets: &LanceBuffer,
2765 ) -> Result<(LanceBuffer, LanceBuffer)>
2766 where
2767 T: arrow_buffer::ArrowNativeType
2768 + Copy
2769 + PartialOrd
2770 + std::ops::Sub<Output = T>
2771 + std::fmt::Display
2772 + TryInto<usize>,
2773 {
2774 let offsets_slice = offsets.borrow_to_typed_slice::<T>();
2775 let offsets_slice = offsets_slice.as_ref();
2776 if offsets_slice.is_empty() {
2777 return Err(Error::internal(
2778 "Variable offsets cannot be empty".to_string(),
2779 ));
2780 }
2781
2782 let base = offsets_slice[0];
2783 let end = *offsets_slice.last().unwrap();
2784 if end < base {
2785 return Err(Error::internal(format!(
2786 "Invalid variable offsets: end ({end}) is less than base ({base})"
2787 )));
2788 }
2789
2790 let data_start = base.try_into().map_err(|_| {
2791 Error::internal(format!("Variable offset ({base}) does not fit into usize"))
2792 })?;
2793 let data_end = end.try_into().map_err(|_| {
2794 Error::internal(format!("Variable offset ({end}) does not fit into usize"))
2795 })?;
2796 if data_end > data.len() {
2797 return Err(Error::internal(format!(
2798 "Invalid variable offsets: end ({data_end}) exceeds data len ({})",
2799 data.len()
2800 )));
2801 }
2802
2803 let mut rebased_offsets = Vec::with_capacity(offsets_slice.len());
2804 for &offset in offsets_slice {
2805 if offset < base {
2806 return Err(Error::internal(format!(
2807 "Invalid variable offsets: offset ({offset}) is less than base ({base})"
2808 )));
2809 }
2810 rebased_offsets.push(offset - base);
2811 }
2812
2813 let sliced_data = data.slice_with_length(data_start, data_end - data_start);
2814 let sliced_data = LanceBuffer::copy_slice(&sliced_data);
2816 let rebased_offsets = LanceBuffer::reinterpret_vec(rebased_offsets);
2817 Ok((sliced_data, rebased_offsets))
2818 }
2819
2820 fn slice_batch_data_and_rebase_offsets(
2821 data: &LanceBuffer,
2822 offsets: &LanceBuffer,
2823 bits_per_offset: u8,
2824 ) -> Result<(LanceBuffer, LanceBuffer)> {
2825 match bits_per_offset {
2826 32 => Self::slice_batch_data_and_rebase_offsets_typed::<u32>(data, offsets),
2827 64 => Self::slice_batch_data_and_rebase_offsets_typed::<u64>(data, offsets),
2828 _ => Err(Error::internal(format!(
2829 "Unsupported bits_per_offset={bits_per_offset}"
2830 ))),
2831 }
2832 }
2833
2834 unsafe fn parse_length(data: &[u8], bits_per_offset: u8) -> u64 {
2835 match bits_per_offset {
2836 8 => *data.get_unchecked(0) as u64,
2837 16 => u16::from_le_bytes([*data.get_unchecked(0), *data.get_unchecked(1)]) as u64,
2838 32 => u32::from_le_bytes([
2839 *data.get_unchecked(0),
2840 *data.get_unchecked(1),
2841 *data.get_unchecked(2),
2842 *data.get_unchecked(3),
2843 ]) as u64,
2844 64 => u64::from_le_bytes([
2845 *data.get_unchecked(0),
2846 *data.get_unchecked(1),
2847 *data.get_unchecked(2),
2848 *data.get_unchecked(3),
2849 *data.get_unchecked(4),
2850 *data.get_unchecked(5),
2851 *data.get_unchecked(6),
2852 *data.get_unchecked(7),
2853 ]),
2854 _ => unreachable!(),
2855 }
2856 }
2857
2858 fn unzip(
2859 &mut self,
2860 data: VecDeque<LanceBuffer>,
2861 in_bits_per_length: u8,
2862 out_bits_per_offset: u8,
2863 num_rows: u64,
2864 ) {
2865 let mut rep = Vec::with_capacity(num_rows as usize);
2867 let mut def = Vec::with_capacity(num_rows as usize);
2868 let bytes_cw = self.details.ctrl_word_parser.bytes_per_word() * num_rows as usize;
2869
2870 let bytes_per_offset = out_bits_per_offset as usize / 8;
2873 let bytes_offsets = bytes_per_offset * (num_rows as usize + 1);
2874 let mut offsets_data = Vec::with_capacity(bytes_offsets);
2875
2876 let bytes_per_length = in_bits_per_length as usize / 8;
2877 let bytes_lengths = bytes_per_length * num_rows as usize;
2878
2879 let bytes_data = data.iter().map(|d| d.len()).sum::<usize>();
2880 let mut unzipped_data =
2883 Vec::with_capacity((bytes_data - bytes_cw).saturating_sub(bytes_lengths));
2884
2885 let mut current_offset = 0_u64;
2886 let mut visible_item_count = 0_u64;
2887 for databuf in data.into_iter() {
2888 let mut databuf = databuf.as_ref();
2889 while !databuf.is_empty() {
2890 let data_start = unzipped_data.len();
2891 let offset_start = offsets_data.len();
2892 let repdef_start = rep.len().max(def.len());
2895 let ctrl_desc = self.details.ctrl_word_parser.parse_desc(
2897 databuf,
2898 self.details.max_rep,
2899 self.details.max_visible_def,
2900 );
2901 self.details
2902 .ctrl_word_parser
2903 .parse(databuf, &mut rep, &mut def);
2904 databuf = &databuf[self.details.ctrl_word_parser.bytes_per_word()..];
2905
2906 if ctrl_desc.is_new_row {
2907 self.repdef_starts.push(repdef_start);
2908 self.data_starts.push(data_start);
2909 self.offset_starts.push(offset_start);
2910 self.visible_item_counts.push(visible_item_count);
2911 }
2912 if ctrl_desc.is_visible {
2913 visible_item_count += 1;
2914 if ctrl_desc.is_valid_item {
2915 debug_assert!(databuf.len() >= bytes_per_length);
2917 let length = unsafe { Self::parse_length(databuf, in_bits_per_length) };
2918 match out_bits_per_offset {
2919 32 => offsets_data
2920 .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2921 64 => offsets_data.extend_from_slice(¤t_offset.to_le_bytes()),
2922 _ => unreachable!(),
2923 };
2924 databuf = &databuf[bytes_per_offset..];
2925 unzipped_data.extend_from_slice(&databuf[..length as usize]);
2926 databuf = &databuf[length as usize..];
2927 current_offset += length;
2928 } else {
2929 match out_bits_per_offset {
2931 32 => offsets_data
2932 .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2933 64 => offsets_data.extend_from_slice(¤t_offset.to_le_bytes()),
2934 _ => unreachable!(),
2935 }
2936 }
2937 }
2938 }
2939 }
2940 self.repdef_starts.push(rep.len().max(def.len()));
2941 self.data_starts.push(unzipped_data.len());
2942 self.offset_starts.push(offsets_data.len());
2943 self.visible_item_counts.push(visible_item_count);
2944 match out_bits_per_offset {
2945 32 => offsets_data.extend_from_slice(&(current_offset as u32).to_le_bytes()),
2946 64 => offsets_data.extend_from_slice(¤t_offset.to_le_bytes()),
2947 _ => unreachable!(),
2948 };
2949 self.rep = ScalarBuffer::from(rep);
2950 self.def = ScalarBuffer::from(def);
2951 self.data = LanceBuffer::from(unzipped_data);
2952 self.offsets = LanceBuffer::from(offsets_data);
2953 }
2954}
2955
2956impl StructuralPageDecoder for VariableFullZipDecoder {
2957 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2958 let start = self.current_idx;
2959 let end = start + num_rows as usize;
2960
2961 let offset_start = self.offset_starts[start];
2962 let offset_end = self.offset_starts[end] + (self.bits_per_offset as usize / 8);
2963 let offsets = self
2964 .offsets
2965 .slice_with_length(offset_start, offset_end - offset_start);
2966 let (data, offsets) =
2968 Self::slice_batch_data_and_rebase_offsets(&self.data, &offsets, self.bits_per_offset)?;
2969
2970 let repdef_start = self.repdef_starts[start];
2971 let repdef_end = self.repdef_starts[end];
2972 let rep = if self.rep.is_empty() {
2973 self.rep.clone()
2974 } else {
2975 self.rep.slice(repdef_start, repdef_end - repdef_start)
2976 };
2977 let def = if self.def.is_empty() {
2978 self.def.clone()
2979 } else {
2980 self.def.slice(repdef_start, repdef_end - repdef_start)
2981 };
2982
2983 let visible_item_counts_start = self.visible_item_counts[start];
2984 let visible_item_counts_end = self.visible_item_counts[end];
2985 let num_visible_items = visible_item_counts_end - visible_item_counts_start;
2986
2987 self.current_idx += num_rows as usize;
2988
2989 Ok(Box::new(VariableFullZipDecodeTask {
2990 details: self.details.clone(),
2991 decompressor: self.decompressor.clone(),
2992 data,
2993 offsets,
2994 bits_per_offset: self.bits_per_offset,
2995 num_visible_items,
2996 rep,
2997 def,
2998 }))
2999 }
3000
3001 fn num_rows(&self) -> u64 {
3002 self.num_rows
3003 }
3004}
3005
3006#[derive(Debug)]
3007struct VariableFullZipDecodeTask {
3008 details: Arc<FullZipDecodeDetails>,
3009 decompressor: Arc<dyn VariablePerValueDecompressor>,
3010 data: LanceBuffer,
3011 offsets: LanceBuffer,
3012 bits_per_offset: u8,
3013 num_visible_items: u64,
3014 rep: ScalarBuffer<u16>,
3015 def: ScalarBuffer<u16>,
3016}
3017
3018impl DecodePageTask for VariableFullZipDecodeTask {
3019 fn decode(self: Box<Self>) -> Result<DecodedPage> {
3020 let block = VariableWidthBlock {
3021 data: self.data,
3022 offsets: self.offsets,
3023 bits_per_offset: self.bits_per_offset,
3024 num_values: self.num_visible_items,
3025 block_info: BlockInfo::new(),
3026 };
3027 let decomopressed = self.decompressor.decompress(block)?;
3028 let rep = if self.rep.is_empty() {
3029 None
3030 } else {
3031 Some(self.rep.to_vec())
3032 };
3033 let def = if self.def.is_empty() {
3034 None
3035 } else {
3036 Some(self.def.to_vec())
3037 };
3038 let unraveler = RepDefUnraveler::new(
3039 rep,
3040 def,
3041 self.details.def_meaning.clone(),
3042 self.num_visible_items,
3043 );
3044 Ok(DecodedPage {
3045 data: decomopressed,
3046 repdef: unraveler,
3047 })
3048 }
3049}
3050
3051#[derive(Debug)]
3052struct FullZipDecodeTaskItem {
3053 data: PerValueDataBlock,
3054 rows_in_buf: u64,
3055}
3056
3057#[derive(Debug)]
3060struct FixedFullZipDecodeTask {
3061 details: Arc<FullZipDecodeDetails>,
3062 data: Vec<FullZipDecodeTaskItem>,
3063 num_rows: usize,
3064 bytes_per_value: usize,
3065}
3066
3067impl DecodePageTask for FixedFullZipDecodeTask {
3068 fn decode(self: Box<Self>) -> Result<DecodedPage> {
3069 let estimated_size_bytes = self
3071 .data
3072 .iter()
3073 .map(|task_item| task_item.data.data_size() as usize)
3074 .sum::<usize>()
3075 * 2;
3076 let mut data_builder =
3077 DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
3078
3079 if self.details.ctrl_word_parser.bytes_per_word() == 0 {
3080 for task_item in self.data.into_iter() {
3084 let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
3085 unreachable!()
3086 };
3087 let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
3088 else {
3089 unreachable!()
3090 };
3091 debug_assert_eq!(fixed_data.num_values, task_item.rows_in_buf);
3092 let decompressed = decompressor.decompress(fixed_data, task_item.rows_in_buf)?;
3093 data_builder.append(&decompressed, 0..task_item.rows_in_buf);
3094 }
3095
3096 let unraveler = RepDefUnraveler::new(
3097 None,
3098 None,
3099 self.details.def_meaning.clone(),
3100 self.num_rows as u64,
3101 );
3102
3103 Ok(DecodedPage {
3104 data: data_builder.finish(),
3105 repdef: unraveler,
3106 })
3107 } else {
3108 let mut rep = Vec::with_capacity(self.num_rows);
3110 let mut def = Vec::with_capacity(self.num_rows);
3111
3112 for task_item in self.data.into_iter() {
3113 let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
3114 unreachable!()
3115 };
3116 let mut buf_slice = fixed_data.data.as_ref();
3117 let num_values = fixed_data.num_values as usize;
3118 let mut values = Vec::with_capacity(
3121 fixed_data.data.len()
3122 - (self.details.ctrl_word_parser.bytes_per_word() * num_values),
3123 );
3124 let mut visible_items = 0;
3125 for _ in 0..num_values {
3126 self.details
3128 .ctrl_word_parser
3129 .parse(buf_slice, &mut rep, &mut def);
3130 buf_slice = &buf_slice[self.details.ctrl_word_parser.bytes_per_word()..];
3131
3132 let is_visible = def
3133 .last()
3134 .map(|d| *d <= self.details.max_visible_def)
3135 .unwrap_or(true);
3136 if is_visible {
3137 values.extend_from_slice(buf_slice[..self.bytes_per_value].as_ref());
3139 buf_slice = &buf_slice[self.bytes_per_value..];
3140 visible_items += 1;
3141 }
3142 }
3143
3144 let values_buf = LanceBuffer::from(values);
3146 let fixed_data = FixedWidthDataBlock {
3147 bits_per_value: self.bytes_per_value as u64 * 8,
3148 block_info: BlockInfo::new(),
3149 data: values_buf,
3150 num_values: visible_items,
3151 };
3152 let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
3153 else {
3154 unreachable!()
3155 };
3156 let decompressed = decompressor.decompress(fixed_data, visible_items)?;
3157 data_builder.append(&decompressed, 0..visible_items);
3158 }
3159
3160 let repetition = if rep.is_empty() { None } else { Some(rep) };
3161 let definition = if def.is_empty() { None } else { Some(def) };
3162
3163 let unraveler = RepDefUnraveler::new(
3164 repetition,
3165 definition,
3166 self.details.def_meaning.clone(),
3167 self.num_rows as u64,
3168 );
3169 let data = data_builder.finish();
3170
3171 Ok(DecodedPage {
3172 data,
3173 repdef: unraveler,
3174 })
3175 }
3176 }
3177}
3178
3179#[derive(Debug)]
3180struct StructuralPrimitiveFieldSchedulingJob<'a> {
3181 scheduler: &'a StructuralPrimitiveFieldScheduler,
3182 ranges: Vec<Range<u64>>,
3183 page_idx: usize,
3184 range_idx: usize,
3185 global_row_offset: u64,
3186}
3187
3188impl<'a> StructuralPrimitiveFieldSchedulingJob<'a> {
3189 pub fn new(scheduler: &'a StructuralPrimitiveFieldScheduler, ranges: Vec<Range<u64>>) -> Self {
3190 Self {
3191 scheduler,
3192 ranges,
3193 page_idx: 0,
3194 range_idx: 0,
3195 global_row_offset: 0,
3196 }
3197 }
3198}
3199
3200impl StructuralSchedulingJob for StructuralPrimitiveFieldSchedulingJob<'_> {
3201 fn schedule_next(&mut self, context: &mut SchedulerContext) -> Result<Vec<ScheduledScanLine>> {
3202 if self.range_idx >= self.ranges.len() {
3203 return Ok(Vec::new());
3204 }
3205 let mut range = self.ranges[self.range_idx].clone();
3207 let priority = range.start;
3208
3209 let mut cur_page = &self.scheduler.page_schedulers[self.page_idx];
3210 trace!(
3211 "Current range is {:?} and current page has {} rows",
3212 range, cur_page.num_rows
3213 );
3214 while cur_page.num_rows + self.global_row_offset <= range.start {
3216 self.global_row_offset += cur_page.num_rows;
3217 self.page_idx += 1;
3218 trace!("Skipping entire page of {} rows", cur_page.num_rows);
3219 cur_page = &self.scheduler.page_schedulers[self.page_idx];
3220 }
3221
3222 let mut ranges_in_page = Vec::new();
3226 while cur_page.num_rows + self.global_row_offset > range.start {
3227 range.start = range.start.max(self.global_row_offset);
3228 let start_in_page = range.start - self.global_row_offset;
3229 let end_in_page = start_in_page + (range.end - range.start);
3230 let end_in_page = end_in_page.min(cur_page.num_rows);
3231 let last_in_range = (end_in_page + self.global_row_offset) >= range.end;
3232
3233 ranges_in_page.push(start_in_page..end_in_page);
3234 if last_in_range {
3235 self.range_idx += 1;
3236 if self.range_idx == self.ranges.len() {
3237 break;
3238 }
3239 range = self.ranges[self.range_idx].clone();
3240 } else {
3241 break;
3242 }
3243 }
3244
3245 trace!(
3246 "Scheduling {} rows across {} ranges from page with {} rows (priority={}, column_index={}, page_index={})",
3247 ranges_in_page.iter().map(|r| r.end - r.start).sum::<u64>(),
3248 ranges_in_page.len(),
3249 cur_page.num_rows,
3250 priority,
3251 self.scheduler.column_index,
3252 cur_page.page_index,
3253 );
3254
3255 self.global_row_offset += cur_page.num_rows;
3256 self.page_idx += 1;
3257
3258 let page_decoders = cur_page
3259 .scheduler
3260 .schedule_ranges(&ranges_in_page, context.io())?;
3261
3262 let cur_path = context.current_path();
3263 page_decoders
3264 .into_iter()
3265 .map(|page_load_task| {
3266 let cur_path = cur_path.clone();
3267 let page_decoder = page_load_task.decoder_fut;
3268 let unloaded_page = async move {
3269 let page_decoder = page_decoder.await?;
3270 Ok(LoadedPageShard {
3271 decoder: page_decoder,
3272 path: cur_path,
3273 })
3274 }
3275 .boxed();
3276 Ok(ScheduledScanLine {
3277 decoders: vec![MessageType::UnloadedPage(UnloadedPageShard(unloaded_page))],
3278 rows_scheduled: page_load_task.num_rows,
3279 })
3280 })
3281 .collect::<Result<Vec<_>>>()
3282 }
3283}
3284
3285#[derive(Debug)]
3286struct PageInfoAndScheduler {
3287 page_index: usize,
3288 num_rows: u64,
3289 scheduler: Box<dyn StructuralPageScheduler>,
3290}
3291
3292#[derive(Debug)]
3297pub struct StructuralPrimitiveFieldScheduler {
3298 page_schedulers: Vec<PageInfoAndScheduler>,
3299 column_index: u32,
3300 view_tag: String,
3306}
3307
3308impl StructuralPrimitiveFieldScheduler {
3309 pub fn try_new(
3310 column_info: &ColumnInfo,
3311 decompressors: &dyn DecompressionStrategy,
3312 cache_repetition_index: bool,
3313 target_field: &Field,
3314 ) -> Result<Self> {
3315 let page_schedulers = column_info
3316 .page_infos
3317 .iter()
3318 .enumerate()
3319 .map(|(page_index, page_info)| {
3320 Self::page_info_to_scheduler(
3321 page_info,
3322 page_index,
3323 decompressors,
3324 cache_repetition_index,
3325 target_field,
3326 )
3327 })
3328 .collect::<Result<Vec<_>>>()?;
3329 Ok(Self {
3330 page_schedulers,
3331 column_index: column_info.index,
3332 view_tag: format!("{:?}", target_field.data_type()),
3333 })
3334 }
3335
3336 fn page_layout_to_scheduler(
3337 page_info: &PageInfo,
3338 page_layout: &PageLayout,
3339 decompressors: &dyn DecompressionStrategy,
3340 cache_repetition_index: bool,
3341 target_field: &Field,
3342 ) -> Result<Box<dyn StructuralPageScheduler>> {
3343 use pb21::page_layout::Layout;
3344 Ok(match page_layout.layout.as_ref().expect_ok()? {
3345 Layout::MiniBlockLayout(mini_block) => Box::new(MiniBlockScheduler::try_new(
3346 &page_info.buffer_offsets_and_sizes,
3347 page_info.priority,
3348 mini_block.num_items,
3349 mini_block,
3350 decompressors,
3351 )?),
3352 Layout::FullZipLayout(full_zip) => {
3353 let mut scheduler = FullZipScheduler::try_new(
3354 &page_info.buffer_offsets_and_sizes,
3355 page_info.priority,
3356 page_info.num_rows,
3357 full_zip,
3358 decompressors,
3359 )?;
3360 scheduler.enable_cache = cache_repetition_index;
3361 Box::new(scheduler)
3362 }
3363 Layout::ConstantLayout(constant_layout) => {
3364 let def_meaning = constant_layout
3365 .layers
3366 .iter()
3367 .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
3368 .collect::<Vec<_>>();
3369 let has_scalar_value = constant_layout.inline_value.is_some()
3370 || page_info.buffer_offsets_and_sizes.len() == 1
3371 || page_info.buffer_offsets_and_sizes.len() == 3;
3372 if has_scalar_value {
3373 Box::new(constant::ConstantPageScheduler::try_new(
3374 page_info.buffer_offsets_and_sizes.clone(),
3375 constant_layout.inline_value.clone(),
3376 target_field.data_type(),
3377 def_meaning.into(),
3378 )?) as Box<dyn StructuralPageScheduler>
3379 } else if def_meaning.len() == 1
3380 && def_meaning[0] == DefinitionInterpretation::NullableItem
3381 {
3382 Box::new(SimpleAllNullScheduler::default()) as Box<dyn StructuralPageScheduler>
3383 } else {
3384 let rep_decompressor = constant_layout
3385 .rep_compression
3386 .as_ref()
3387 .map(|encoding| decompressors.create_block_decompressor(encoding))
3388 .transpose()?
3389 .map(Arc::from);
3390
3391 let def_decompressor = constant_layout
3392 .def_compression
3393 .as_ref()
3394 .map(|encoding| decompressors.create_block_decompressor(encoding))
3395 .transpose()?
3396 .map(Arc::from);
3397
3398 Box::new(ComplexAllNullScheduler::new(
3399 page_info.buffer_offsets_and_sizes.clone(),
3400 def_meaning.into(),
3401 rep_decompressor,
3402 def_decompressor,
3403 constant_layout.num_rep_values,
3404 constant_layout.num_def_values,
3405 )) as Box<dyn StructuralPageScheduler>
3406 }
3407 }
3408 Layout::BlobLayout(blob) => {
3409 let inner_scheduler = Self::page_layout_to_scheduler(
3410 page_info,
3411 blob.inner_layout.as_ref().expect_ok()?.as_ref(),
3412 decompressors,
3413 cache_repetition_index,
3414 target_field,
3415 )?;
3416 let def_meaning = blob
3417 .layers
3418 .iter()
3419 .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
3420 .collect::<Vec<_>>();
3421 if matches!(target_field.data_type(), DataType::Struct(_)) {
3422 Box::new(BlobDescriptionPageScheduler::new(
3424 inner_scheduler,
3425 def_meaning.into(),
3426 ))
3427 } else {
3428 Box::new(BlobPageScheduler::new(
3430 inner_scheduler,
3431 page_info.priority,
3432 page_info.num_rows,
3433 def_meaning.into(),
3434 ))
3435 }
3436 }
3437 })
3438 }
3439
3440 fn page_info_to_scheduler(
3441 page_info: &PageInfo,
3442 page_index: usize,
3443 decompressors: &dyn DecompressionStrategy,
3444 cache_repetition_index: bool,
3445 target_field: &Field,
3446 ) -> Result<PageInfoAndScheduler> {
3447 let page_layout = page_info.encoding.as_structural();
3448 let scheduler = Self::page_layout_to_scheduler(
3449 page_info,
3450 page_layout,
3451 decompressors,
3452 cache_repetition_index,
3453 target_field,
3454 )?;
3455 Ok(PageInfoAndScheduler {
3456 page_index,
3457 num_rows: page_info.num_rows,
3458 scheduler,
3459 })
3460 }
3461}
3462
3463pub trait CachedPageData: Any + Send + Sync + DeepSizeOf + 'static {
3464 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static>;
3465}
3466
3467pub struct NoCachedPageData;
3468
3469impl DeepSizeOf for NoCachedPageData {
3470 fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
3471 0
3472 }
3473}
3474impl CachedPageData for NoCachedPageData {
3475 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
3476 self
3477 }
3478}
3479
3480pub struct CachedFieldData {
3481 pages: Vec<Arc<dyn CachedPageData>>,
3482}
3483
3484impl DeepSizeOf for CachedFieldData {
3485 fn deep_size_of_children(&self, ctx: &mut Context) -> usize {
3486 self.pages.deep_size_of_children(ctx)
3487 }
3488}
3489
3490#[derive(Debug, Clone)]
3500pub struct FieldDataCacheKey {
3501 pub column_index: u32,
3502 pub view_tag: String,
3503}
3504
3505impl CacheKey for FieldDataCacheKey {
3506 type ValueType = CachedFieldData;
3507
3508 fn key(&self) -> std::borrow::Cow<'_, str> {
3509 format!("{}:{}", self.column_index, self.view_tag).into()
3510 }
3511
3512 fn type_name() -> &'static str {
3513 "FieldData"
3514 }
3515}
3516
3517impl StructuralFieldScheduler for StructuralPrimitiveFieldScheduler {
3518 fn initialize<'a>(
3519 &'a mut self,
3520 _filter: &'a FilterExpression,
3521 context: &'a SchedulerContext,
3522 ) -> BoxFuture<'a, Result<()>> {
3523 let cache_key = FieldDataCacheKey {
3524 column_index: self.column_index,
3525 view_tag: self.view_tag.clone(),
3526 };
3527 let cache = context.cache().clone();
3528
3529 async move {
3530 if let Some(cached_data) = cache.get_with_key(&cache_key).await {
3531 self.page_schedulers
3532 .iter_mut()
3533 .zip(cached_data.pages.iter())
3534 .for_each(|(page_scheduler, cached_data)| {
3535 page_scheduler.scheduler.load(cached_data);
3536 });
3537 return Ok(());
3538 }
3539
3540 let page_data = self
3541 .page_schedulers
3542 .iter_mut()
3543 .map(|s| s.scheduler.initialize(context.io()))
3544 .collect::<FuturesOrdered<_>>();
3545
3546 let page_data = page_data.try_collect::<Vec<_>>().await?;
3547 let cached_data = Arc::new(CachedFieldData { pages: page_data });
3548 cache.insert_with_key(&cache_key, cached_data).await;
3549 Ok(())
3550 }
3551 .boxed()
3552 }
3553
3554 fn schedule_ranges<'a>(
3555 &'a self,
3556 ranges: &[Range<u64>],
3557 _filter: &FilterExpression,
3558 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
3559 let ranges = ranges.to_vec();
3560 Ok(Box::new(StructuralPrimitiveFieldSchedulingJob::new(
3561 self, ranges,
3562 )))
3563 }
3564}
3565
3566#[derive(Debug)]
3569pub struct StructuralCompositeDecodeArrayTask {
3570 tasks: Vec<Box<dyn DecodePageTask>>,
3571 should_validate: bool,
3572 data_type: DataType,
3573}
3574
3575impl StructuralCompositeDecodeArrayTask {
3576 fn restore_validity(
3577 array: Arc<dyn Array>,
3578 unraveler: &mut CompositeRepDefUnraveler,
3579 ) -> Arc<dyn Array> {
3580 let validity = unraveler.unravel_validity(array.len());
3581 let Some(validity) = validity else {
3582 return array;
3583 };
3584 if array.data_type() == &DataType::Null {
3585 return array;
3587 }
3588 assert_eq!(validity.len(), array.len());
3589 make_array(unsafe {
3592 array
3593 .to_data()
3594 .into_builder()
3595 .nulls(Some(validity))
3596 .build_unchecked()
3597 })
3598 }
3599}
3600
3601impl StructuralDecodeArrayTask for StructuralCompositeDecodeArrayTask {
3602 fn decode(self: Box<Self>) -> Result<DecodedArray> {
3603 let mut arrays = Vec::with_capacity(self.tasks.len());
3604 let mut unravelers = Vec::with_capacity(self.tasks.len());
3605 let mut data_size = 0u64;
3606 for task in self.tasks {
3607 let decoded = task.decode()?;
3608 data_size += decoded.data.data_size();
3609 unravelers.push(decoded.repdef);
3610
3611 let array = make_array(
3612 decoded
3613 .data
3614 .into_arrow(self.data_type.clone(), self.should_validate)?,
3615 );
3616
3617 arrays.push(array);
3618 }
3619 let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
3620 let array = arrow_select::concat::concat(&array_refs)?;
3621 let mut repdef = CompositeRepDefUnraveler::new(unravelers);
3622
3623 let array = Self::restore_validity(array, &mut repdef);
3624
3625 Ok(DecodedArray {
3626 array,
3627 repdef,
3628 data_size,
3629 })
3630 }
3631}
3632
3633#[derive(Debug)]
3634pub struct StructuralPrimitiveFieldDecoder {
3635 field: Arc<ArrowField>,
3636 page_decoders: VecDeque<Box<dyn StructuralPageDecoder>>,
3637 should_validate: bool,
3638 rows_drained_in_current: u64,
3639}
3640
3641impl StructuralPrimitiveFieldDecoder {
3642 pub fn new(field: &Arc<ArrowField>, should_validate: bool) -> Self {
3643 Self {
3644 field: field.clone(),
3645 page_decoders: VecDeque::new(),
3646 should_validate,
3647 rows_drained_in_current: 0,
3648 }
3649 }
3650}
3651
3652impl StructuralFieldDecoder for StructuralPrimitiveFieldDecoder {
3653 fn accept_page(&mut self, child: LoadedPageShard) -> Result<()> {
3654 assert!(child.path.is_empty());
3655 self.page_decoders.push_back(child.decoder);
3656 Ok(())
3657 }
3658
3659 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
3660 let mut remaining = num_rows;
3661 let mut tasks = Vec::new();
3662 while remaining > 0 {
3663 let cur_page = self.page_decoders.front_mut().unwrap();
3664 let num_in_page = cur_page.num_rows() - self.rows_drained_in_current;
3665 let to_take = num_in_page.min(remaining);
3666
3667 let task = cur_page.drain(to_take)?;
3668 tasks.push(task);
3669
3670 if to_take == num_in_page {
3671 self.page_decoders.pop_front();
3672 self.rows_drained_in_current = 0;
3673 } else {
3674 self.rows_drained_in_current += to_take;
3675 }
3676
3677 remaining -= to_take;
3678 }
3679 Ok(Box::new(StructuralCompositeDecodeArrayTask {
3680 tasks,
3681 should_validate: self.should_validate,
3682 data_type: self.field.data_type().clone(),
3683 }))
3684 }
3685
3686 fn data_type(&self) -> &DataType {
3687 self.field.data_type()
3688 }
3689}
3690
3691struct SerializedFullZip {
3693 values: LanceBuffer,
3695 repetition_index: Option<LanceBuffer>,
3697}
3698
3699const MINIBLOCK_ALIGNMENT: usize = 8;
3714
3715pub struct PrimitiveStructuralEncoder {
3742 accumulation_queue: AccumulationQueue,
3744
3745 keep_original_array: bool,
3746 support_large_chunk: bool,
3747 accumulated_repdefs: Vec<RepDefBuilder>,
3748 compression_strategy: Arc<dyn CompressionStrategy>,
3750 column_index: u32,
3751 field: Field,
3752 encoding_metadata: Arc<HashMap<String, String>>,
3753 version: LanceFileVersion,
3754}
3755
3756struct CompressedLevelsChunk {
3757 data: LanceBuffer,
3758 num_levels: u16,
3759}
3760
3761struct CompressedLevels {
3762 data: Vec<CompressedLevelsChunk>,
3763 compression: CompressiveEncoding,
3764 rep_index: Option<LanceBuffer>,
3765}
3766
3767struct SerializedMiniBlockPage {
3768 num_buffers: u64,
3769 data: LanceBuffer,
3770 metadata: LanceBuffer,
3771}
3772
3773#[derive(Debug, Clone, Copy)]
3774struct DictEncodingBudget {
3775 max_dict_entries: u32,
3776 max_encoded_size: usize,
3777}
3778
3779struct PrimitivePageData {
3781 arrays: Vec<ArrayRef>,
3783 repdef: SerializedRepDefs,
3785 row_number: u64,
3787 num_rows: u64,
3789 unsplittable_miniblock_levels: Option<u64>,
3791}
3792
3793#[derive(Clone)]
3798struct PrimitiveEncodeContext {
3799 column_idx: u32,
3801 field: Field,
3803 compression_strategy: Arc<dyn CompressionStrategy>,
3805 encoding_metadata: Arc<HashMap<String, String>>,
3807 support_large_chunk: bool,
3809 version: LanceFileVersion,
3811 is_simple_validity: bool,
3813 has_repdef_info: bool,
3815}
3816
3817impl PrimitiveStructuralEncoder {
3818 pub fn try_new(
3819 options: &EncodingOptions,
3820 compression_strategy: Arc<dyn CompressionStrategy>,
3821 column_index: u32,
3822 field: Field,
3823 encoding_metadata: Arc<HashMap<String, String>>,
3824 ) -> Result<Self> {
3825 Ok(Self {
3826 accumulation_queue: AccumulationQueue::new(
3827 options.cache_bytes_per_column,
3828 column_index,
3829 options.keep_original_array,
3830 ),
3831 support_large_chunk: options.support_large_chunk(),
3832 keep_original_array: options.keep_original_array,
3833 accumulated_repdefs: Vec::new(),
3834 column_index,
3835 compression_strategy,
3836 field,
3837 encoding_metadata,
3838 version: options.version,
3839 })
3840 }
3841
3842 fn is_narrow(data_block: &DataBlock) -> bool {
3850 const MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE: u64 = 256;
3851
3852 if let Some(max_len_array) = data_block.get_stat(Stat::MaxLength) {
3853 let max_len_array = max_len_array
3854 .as_any()
3855 .downcast_ref::<PrimitiveArray<UInt64Type>>()
3856 .unwrap();
3857 if max_len_array.value(0) < MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE {
3858 return true;
3859 }
3860 }
3861 false
3862 }
3863
3864 fn prefers_miniblock(
3865 data_block: &DataBlock,
3866 encoding_metadata: &HashMap<String, String>,
3867 ) -> bool {
3868 if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3870 return user_requested.to_lowercase() == STRUCTURAL_ENCODING_MINIBLOCK;
3871 }
3872 Self::is_narrow(data_block)
3874 }
3875
3876 fn prefers_fullzip(encoding_metadata: &HashMap<String, String>) -> bool {
3877 if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3881 return user_requested.to_lowercase() == STRUCTURAL_ENCODING_FULLZIP;
3882 }
3883 true
3884 }
3885
3886 fn serialize_miniblocks(
3933 miniblocks: MiniBlockCompressed,
3934 rep: Option<Vec<CompressedLevelsChunk>>,
3935 def: Option<Vec<CompressedLevelsChunk>>,
3936 support_large_chunk: bool,
3937 ) -> Result<SerializedMiniBlockPage> {
3938 let bytes_rep = rep
3939 .as_ref()
3940 .map(|rep| rep.iter().map(|r| r.data.len()).sum::<usize>())
3941 .unwrap_or(0);
3942 let bytes_def = def
3943 .as_ref()
3944 .map(|def| def.iter().map(|d| d.data.len()).sum::<usize>())
3945 .unwrap_or(0);
3946 let bytes_data = miniblocks.data.iter().map(|d| d.len()).sum::<usize>();
3947 let mut num_buffers = miniblocks.data.len();
3948 if rep.is_some() {
3949 num_buffers += 1;
3950 }
3951 if def.is_some() {
3952 num_buffers += 1;
3953 }
3954 let max_extra = 9 * num_buffers;
3956 let mut data_buffer = Vec::with_capacity(bytes_rep + bytes_def + bytes_data + max_extra);
3957 let chunk_size_bytes = if support_large_chunk { 4 } else { 2 };
3958 let mut meta_buffer = Vec::with_capacity(miniblocks.chunks.len() * chunk_size_bytes);
3959
3960 let mut rep_iter = rep.map(|r| r.into_iter());
3961 let mut def_iter = def.map(|d| d.into_iter());
3962
3963 let mut buffer_offsets = vec![0; miniblocks.data.len()];
3964 for chunk in miniblocks.chunks {
3965 let start_pos = data_buffer.len();
3966 debug_assert_eq!(start_pos % MINIBLOCK_ALIGNMENT, 0);
3968
3969 let rep = rep_iter.as_mut().map(|r| r.next().unwrap());
3970 let def = def_iter.as_mut().map(|d| d.next().unwrap());
3971
3972 let num_levels = rep
3974 .as_ref()
3975 .map(|r| r.num_levels)
3976 .unwrap_or(def.as_ref().map(|d| d.num_levels).unwrap_or(0));
3977 data_buffer.extend_from_slice(&num_levels.to_le_bytes());
3978
3979 if let Some(rep) = rep.as_ref() {
3981 let bytes_rep = u16::try_from(rep.data.len()).map_err(|_| {
3982 Error::internal(format!(
3983 "Repetition buffer size ({} bytes) too large",
3984 rep.data.len()
3985 ))
3986 })?;
3987 data_buffer.extend_from_slice(&bytes_rep.to_le_bytes());
3988 }
3989 if let Some(def) = def.as_ref() {
3990 let bytes_def = u16::try_from(def.data.len()).map_err(|_| {
3991 Error::internal(format!(
3992 "Definition buffer size ({} bytes) too large",
3993 def.data.len()
3994 ))
3995 })?;
3996 data_buffer.extend_from_slice(&bytes_def.to_le_bytes());
3997 }
3998
3999 if support_large_chunk {
4000 for &buffer_size in &chunk.buffer_sizes {
4001 data_buffer.extend_from_slice(&buffer_size.to_le_bytes());
4002 }
4003 } else {
4004 for &buffer_size in &chunk.buffer_sizes {
4005 let buffer_size = u16::try_from(buffer_size).map_err(|_| {
4006 Error::internal(format!(
4007 "Mini-block buffer size ({} bytes) too large for 16-bit metadata",
4008 buffer_size
4009 ))
4010 })?;
4011 data_buffer.extend_from_slice(&buffer_size.to_le_bytes());
4012 }
4013 }
4014
4015 let add_padding = |data_buffer: &mut Vec<u8>| {
4017 let pad = pad_bytes::<MINIBLOCK_ALIGNMENT>(data_buffer.len());
4018 data_buffer.extend(iter::repeat_n(FILL_BYTE, pad));
4019 };
4020 add_padding(&mut data_buffer);
4021
4022 if let Some(rep) = rep.as_ref() {
4024 data_buffer.extend_from_slice(&rep.data);
4025 add_padding(&mut data_buffer);
4026 }
4027 if let Some(def) = def.as_ref() {
4028 data_buffer.extend_from_slice(&def.data);
4029 add_padding(&mut data_buffer);
4030 }
4031 for (buffer_size, (buffer, buffer_offset)) in chunk
4032 .buffer_sizes
4033 .iter()
4034 .zip(miniblocks.data.iter().zip(buffer_offsets.iter_mut()))
4035 {
4036 let start = *buffer_offset;
4037 let end = start + *buffer_size as usize;
4038 *buffer_offset += *buffer_size as usize;
4039 data_buffer.extend_from_slice(&buffer[start..end]);
4040 add_padding(&mut data_buffer);
4041 }
4042
4043 let chunk_bytes = data_buffer.len() - start_pos;
4044 let max_chunk_size = if support_large_chunk {
4045 1_u64 << 31 } else {
4047 32 * 1024 };
4049 if chunk_bytes == 0 || chunk_bytes as u64 > max_chunk_size {
4050 return Err(Error::internal(format!(
4051 "Mini-block chunk size {} bytes exceeds the {} byte metadata limit",
4052 chunk_bytes, max_chunk_size
4053 )));
4054 }
4055 if chunk_bytes % MINIBLOCK_ALIGNMENT != 0 {
4056 return Err(Error::internal(format!(
4057 "Mini-block chunk size {} bytes is not aligned to {} bytes",
4058 chunk_bytes, MINIBLOCK_ALIGNMENT
4059 )));
4060 }
4061 if chunk.log_num_values > 15 {
4062 return Err(Error::internal(format!(
4063 "Mini-block log_num_values {} exceeds the 4-bit metadata limit",
4064 chunk.log_num_values
4065 )));
4066 }
4067 let divided_bytes = chunk_bytes / MINIBLOCK_ALIGNMENT;
4071 let divided_bytes_minus_one = (divided_bytes - 1) as u64;
4072
4073 let metadata = (divided_bytes_minus_one << 4) | chunk.log_num_values as u64;
4074 if support_large_chunk {
4075 meta_buffer.extend_from_slice(&(metadata as u32).to_le_bytes());
4076 } else {
4077 meta_buffer.extend_from_slice(&(metadata as u16).to_le_bytes());
4078 }
4079 }
4080
4081 let data_buffer = LanceBuffer::from(data_buffer);
4082 let metadata_buffer = LanceBuffer::from(meta_buffer);
4083
4084 Ok(SerializedMiniBlockPage {
4085 num_buffers: miniblocks.data.len() as u64,
4086 data: data_buffer,
4087 metadata: metadata_buffer,
4088 })
4089 }
4090
4091 fn compress_levels(
4096 mut levels: RepDefSlicer<'_>,
4097 num_elements: u64,
4098 compression_strategy: &dyn CompressionStrategy,
4099 chunks: &[MiniBlockChunk],
4100 max_rep: u16,
4102 ) -> Result<CompressedLevels> {
4103 let mut rep_index = if max_rep > 0 {
4104 Vec::with_capacity(chunks.len())
4105 } else {
4106 vec![]
4107 };
4108 let num_levels = levels.num_levels() as u64;
4110 let levels_buf = levels.all_levels().clone();
4111
4112 let mut fixed_width_block = FixedWidthDataBlock {
4113 data: levels_buf,
4114 bits_per_value: 16,
4115 num_values: num_levels,
4116 block_info: BlockInfo::new(),
4117 };
4118 fixed_width_block.compute_stat();
4120
4121 let levels_block = DataBlock::FixedWidth(fixed_width_block);
4122 let levels_field = Field::new_arrow("", DataType::UInt16, false)?;
4123 let (compressor, compressor_desc) =
4125 compression_strategy.create_block_compressor(&levels_field, &levels_block)?;
4126 let mut level_chunks = Vec::with_capacity(chunks.len());
4128 let mut values_counter = 0;
4129 for (chunk_idx, chunk) in chunks.iter().enumerate() {
4130 let chunk_num_values = chunk.num_values(values_counter, num_elements);
4131 debug_assert!(chunk_num_values > 0);
4132 values_counter += chunk_num_values;
4133 let chunk_levels = if chunk_idx < chunks.len() - 1 {
4134 levels.slice_next(chunk_num_values as usize)
4135 } else {
4136 levels.slice_rest()
4137 };
4138 let num_chunk_levels = (chunk_levels.len() / 2) as u64;
4139 if max_rep > 0 {
4140 let rep_values = chunk_levels.borrow_to_typed_slice::<u16>();
4150 let rep_values = rep_values.as_ref();
4151
4152 let mut num_rows = rep_values.iter().skip(1).filter(|v| **v == max_rep).count();
4155 let num_leftovers = if chunk_idx < chunks.len() - 1 {
4156 rep_values
4157 .iter()
4158 .rev()
4159 .position(|v| *v == max_rep)
4160 .map(|pos| pos + 1)
4162 .unwrap_or(rep_values.len())
4163 } else {
4164 0
4166 };
4167
4168 if chunk_idx != 0 && rep_values.first() == Some(&max_rep) {
4169 let rep_len = rep_index.len();
4173 if rep_index[rep_len - 1] != 0 {
4174 rep_index[rep_len - 2] += 1;
4176 rep_index[rep_len - 1] = 0;
4177 }
4178 }
4179
4180 if chunk_idx == chunks.len() - 1 {
4181 num_rows += 1;
4183 }
4184 rep_index.push(num_rows as u64);
4185 rep_index.push(num_leftovers as u64);
4186 }
4187 let mut chunk_fixed_width = FixedWidthDataBlock {
4188 data: chunk_levels,
4189 bits_per_value: 16,
4190 num_values: num_chunk_levels,
4191 block_info: BlockInfo::new(),
4192 };
4193 chunk_fixed_width.compute_stat();
4194 let chunk_levels_block = DataBlock::FixedWidth(chunk_fixed_width);
4195 let compressed_levels = compressor.compress(chunk_levels_block)?;
4196 let num_levels = u16::try_from(num_chunk_levels).map_err(|_| {
4197 Error::invalid_input_source(
4198 format!(
4199 "Mini-block cannot encode {} rep/def levels in one chunk. \
4200 This usually means a top-level row contains too much nested structure \
4201 for the current layout.",
4202 num_chunk_levels
4203 )
4204 .into(),
4205 )
4206 })?;
4207 level_chunks.push(CompressedLevelsChunk {
4208 data: compressed_levels,
4209 num_levels,
4210 });
4211 }
4212 debug_assert_eq!(levels.num_levels_remaining(), 0);
4213 let rep_index = if rep_index.is_empty() {
4214 None
4215 } else {
4216 Some(LanceBuffer::reinterpret_vec(rep_index))
4217 };
4218 Ok(CompressedLevels {
4219 data: level_chunks,
4220 compression: compressor_desc,
4221 rep_index,
4222 })
4223 }
4224
4225 fn encode_simple_all_null(
4226 column_idx: u32,
4227 num_rows: u64,
4228 row_number: u64,
4229 ) -> Result<EncodedPage> {
4230 let description =
4231 ProtobufUtils21::constant_layout(&[DefinitionInterpretation::NullableItem], None);
4232 Ok(EncodedPage {
4233 column_idx,
4234 data: vec![],
4235 description: PageEncoding::Structural(description),
4236 num_rows,
4237 row_number,
4238 })
4239 }
4240
4241 fn encode_complex_all_null_vals(
4242 data: &Arc<[u16]>,
4243 compression_strategy: &dyn CompressionStrategy,
4244 ) -> Result<(LanceBuffer, pb21::CompressiveEncoding)> {
4245 let buffer = LanceBuffer::reinterpret_slice(data.clone());
4246 let mut fixed_width_block = FixedWidthDataBlock {
4247 data: buffer,
4248 bits_per_value: 16,
4249 num_values: data.len() as u64,
4250 block_info: BlockInfo::new(),
4251 };
4252 fixed_width_block.compute_stat();
4253
4254 let levels_block = DataBlock::FixedWidth(fixed_width_block);
4255 let levels_field = Field::new_arrow("", DataType::UInt16, false)?;
4256 let (compressor, encoding) =
4257 compression_strategy.create_block_compressor(&levels_field, &levels_block)?;
4258 let compressed_buffer = compressor.compress(levels_block)?;
4259 Ok((compressed_buffer, encoding))
4260 }
4261
4262 fn encode_complex_all_null(
4266 column_idx: u32,
4267 repdef: crate::repdef::SerializedRepDefs,
4268 row_number: u64,
4269 num_rows: u64,
4270 version: LanceFileVersion,
4271 compression_strategy: &dyn CompressionStrategy,
4272 ) -> Result<EncodedPage> {
4273 if version.resolve() < LanceFileVersion::V2_2 {
4274 let rep_bytes = if let Some(rep) = repdef.repetition_levels.as_ref() {
4275 LanceBuffer::reinterpret_slice(rep.clone())
4276 } else {
4277 LanceBuffer::empty()
4278 };
4279
4280 let def_bytes = if let Some(def) = repdef.definition_levels.as_ref() {
4281 LanceBuffer::reinterpret_slice(def.clone())
4282 } else {
4283 LanceBuffer::empty()
4284 };
4285
4286 let description = ProtobufUtils21::constant_layout(&repdef.def_meaning, None);
4287 return Ok(EncodedPage {
4288 column_idx,
4289 data: vec![rep_bytes, def_bytes],
4290 description: PageEncoding::Structural(description),
4291 num_rows,
4292 row_number,
4293 });
4294 }
4295
4296 let (rep_bytes, rep_encoding, num_rep_values) = if let Some(rep) =
4297 repdef.repetition_levels.as_ref()
4298 {
4299 let num_values = rep.len() as u64;
4300 let (buffer, encoding) = Self::encode_complex_all_null_vals(rep, compression_strategy)?;
4301 (buffer, Some(encoding), num_values)
4302 } else {
4303 (LanceBuffer::empty(), None, 0)
4304 };
4305
4306 let (def_bytes, def_encoding, num_def_values) = if let Some(def) =
4307 repdef.definition_levels.as_ref()
4308 {
4309 let num_values = def.len() as u64;
4310 let (buffer, encoding) = Self::encode_complex_all_null_vals(def, compression_strategy)?;
4311 (buffer, Some(encoding), num_values)
4312 } else {
4313 (LanceBuffer::empty(), None, 0)
4314 };
4315
4316 let description = ProtobufUtils21::compressed_all_null_constant_layout(
4317 &repdef.def_meaning,
4318 rep_encoding,
4319 def_encoding,
4320 num_rep_values,
4321 num_def_values,
4322 );
4323 Ok(EncodedPage {
4324 column_idx,
4325 data: vec![rep_bytes, def_bytes],
4326 description: PageEncoding::Structural(description),
4327 num_rows,
4328 row_number,
4329 })
4330 }
4331
4332 fn leaf_validity(
4333 repdef: &crate::repdef::SerializedRepDefs,
4334 num_values: usize,
4335 ) -> Result<Option<BooleanBuffer>> {
4336 let rep = repdef
4337 .repetition_levels
4338 .as_ref()
4339 .map(|rep| rep.as_ref().to_vec());
4340 let def = repdef
4341 .definition_levels
4342 .as_ref()
4343 .map(|def| def.as_ref().to_vec());
4344 let mut unraveler = RepDefUnraveler::new(
4345 rep,
4346 def,
4347 repdef.def_meaning.clone().into(),
4348 num_values as u64,
4349 );
4350 if unraveler.is_all_valid() {
4351 return Ok(None);
4352 }
4353 let mut validity = BooleanBufferBuilder::new(num_values);
4354 unraveler.unravel_validity(&mut validity);
4355 Ok(Some(validity.finish()))
4356 }
4357
4358 fn is_constant_values(
4359 arrays: &[ArrayRef],
4360 scalar: &ArrayRef,
4361 validity: Option<&BooleanBuffer>,
4362 ) -> Result<bool> {
4363 debug_assert_eq!(scalar.len(), 1);
4364 debug_assert_eq!(scalar.null_count(), 0);
4365
4366 match scalar.data_type() {
4367 DataType::Boolean => {
4368 let mut global_idx = 0usize;
4369 let scalar_val = scalar.as_boolean().value(0);
4370 for arr in arrays {
4371 let bool_arr = arr.as_boolean();
4372 for i in 0..arr.len() {
4373 let is_valid = validity.map(|v| v.value(global_idx)).unwrap_or(true);
4374 global_idx += 1;
4375 if !is_valid {
4376 continue;
4377 }
4378 if bool_arr.value(i) != scalar_val {
4379 return Ok(false);
4380 }
4381 }
4382 }
4383 Ok(true)
4384 }
4385 DataType::Utf8 => Self::is_constant_utf8::<i32>(arrays, scalar, validity),
4386 DataType::LargeUtf8 => Self::is_constant_utf8::<i64>(arrays, scalar, validity),
4387 DataType::Binary => Self::is_constant_binary::<i32>(arrays, scalar, validity),
4388 DataType::LargeBinary => Self::is_constant_binary::<i64>(arrays, scalar, validity),
4389 data_type => {
4390 let mut global_idx = 0usize;
4391 let Some(byte_width) = data_type.byte_width_opt() else {
4392 return Ok(false);
4393 };
4394 let scalar_data = scalar.to_data();
4395 if scalar_data.buffers().len() != 1 || !scalar_data.child_data().is_empty() {
4396 return Ok(false);
4397 }
4398 let scalar_bytes = scalar_data.buffers()[0].as_slice();
4399 if scalar_bytes.len() != byte_width {
4400 return Ok(false);
4401 }
4402
4403 for arr in arrays {
4404 let data = arr.to_data();
4405 if data.buffers().is_empty() {
4406 return Ok(false);
4407 }
4408 let buf = data.buffers()[0].as_slice();
4409 let base = data.offset();
4410 for i in 0..arr.len() {
4411 let is_valid = validity.map(|v| v.value(global_idx)).unwrap_or(true);
4412 global_idx += 1;
4413 if !is_valid {
4414 continue;
4415 }
4416 let start = (base + i) * byte_width;
4417 if buf[start..start + byte_width] != scalar_bytes[..] {
4418 return Ok(false);
4419 }
4420 }
4421 }
4422 Ok(true)
4423 }
4424 }
4425 }
4426
4427 fn is_constant_utf8<O: arrow_array::OffsetSizeTrait>(
4428 arrays: &[ArrayRef],
4429 scalar: &ArrayRef,
4430 validity: Option<&BooleanBuffer>,
4431 ) -> Result<bool> {
4432 debug_assert_eq!(scalar.len(), 1);
4433 let scalar_val = scalar.as_string::<O>().value(0).as_bytes();
4434 let mut global_idx = 0usize;
4435 for arr in arrays {
4436 let str_arr = arr.as_string::<O>();
4437 for i in 0..arr.len() {
4438 let is_valid = validity.map(|v| v.value(global_idx)).unwrap_or(true);
4439 global_idx += 1;
4440 if !is_valid {
4441 continue;
4442 }
4443 if str_arr.value(i).as_bytes() != scalar_val {
4444 return Ok(false);
4445 }
4446 }
4447 }
4448 Ok(true)
4449 }
4450
4451 fn is_constant_binary<O: arrow_array::OffsetSizeTrait>(
4452 arrays: &[ArrayRef],
4453 scalar: &ArrayRef,
4454 validity: Option<&BooleanBuffer>,
4455 ) -> Result<bool> {
4456 debug_assert_eq!(scalar.len(), 1);
4457 let scalar_val = scalar.as_binary::<O>().value(0);
4458 let mut global_idx = 0usize;
4459 for arr in arrays {
4460 let bin_arr = arr.as_binary::<O>();
4461 for i in 0..arr.len() {
4462 let is_valid = validity.map(|v| v.value(global_idx)).unwrap_or(true);
4463 global_idx += 1;
4464 if !is_valid {
4465 continue;
4466 }
4467 if bin_arr.value(i) != scalar_val {
4468 return Ok(false);
4469 }
4470 }
4471 }
4472 Ok(true)
4473 }
4474
4475 fn find_constant_scalar(
4476 arrays: &[ArrayRef],
4477 validity: Option<&BooleanBuffer>,
4478 ) -> Result<Option<ArrayRef>> {
4479 if arrays.is_empty() {
4480 return Ok(None);
4481 }
4482
4483 let global_scalar_idx = if let Some(validity) = validity {
4484 let Some(idx) = (0..validity.len()).find(|&i| validity.value(i)) else {
4485 return Ok(None);
4486 };
4487 idx
4488 } else {
4489 0
4490 };
4491
4492 let mut idx_remaining = global_scalar_idx;
4493 let mut scalar_arr_idx = 0usize;
4494 while scalar_arr_idx < arrays.len() {
4495 let len = arrays[scalar_arr_idx].len();
4496 if idx_remaining < len {
4497 break;
4498 }
4499 idx_remaining -= len;
4500 scalar_arr_idx += 1;
4501 }
4502
4503 if scalar_arr_idx >= arrays.len() {
4504 return Ok(None);
4505 }
4506
4507 let scalar =
4508 lance_arrow::scalar::extract_scalar_value(&arrays[scalar_arr_idx], idx_remaining)?;
4509 if scalar.null_count() != 0 {
4510 return Ok(None);
4511 }
4512 if !Self::is_constant_values(arrays, &scalar, validity)? {
4513 return Ok(None);
4514 }
4515 Ok(Some(scalar))
4516 }
4517
4518 fn resolve_dict_values_compression_metadata(
4519 field_metadata: &HashMap<String, String>,
4520 env_compression: Option<String>,
4521 env_compression_level: Option<String>,
4522 ) -> HashMap<String, String> {
4523 let mut metadata = HashMap::new();
4524
4525 let compression = field_metadata
4526 .get(DICT_VALUES_COMPRESSION_META_KEY)
4527 .cloned()
4528 .or(env_compression)
4529 .unwrap_or_else(|| DEFAULT_DICT_VALUES_COMPRESSION.to_string());
4530 metadata.insert(COMPRESSION_META_KEY.to_string(), compression);
4531
4532 if let Some(compression_level) = field_metadata
4533 .get(DICT_VALUES_COMPRESSION_LEVEL_META_KEY)
4534 .cloned()
4535 .or(env_compression_level)
4536 {
4537 metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), compression_level);
4538 }
4539
4540 metadata
4541 }
4542
4543 fn build_dict_values_compressor_field(field: &Field) -> Result<Field> {
4544 let mut dict_values_field = Field::new_arrow("", DataType::UInt16, false)?;
4549 dict_values_field.metadata = Self::resolve_dict_values_compression_metadata(
4550 &field.metadata,
4551 env::var(DICT_VALUES_COMPRESSION_ENV_VAR).ok(),
4552 env::var(DICT_VALUES_COMPRESSION_LEVEL_ENV_VAR).ok(),
4553 );
4554 Ok(dict_values_field)
4555 }
4556
4557 #[allow(clippy::too_many_arguments)]
4558 fn encode_miniblock(
4559 column_idx: u32,
4560 field: &Field,
4561 compression_strategy: &dyn CompressionStrategy,
4562 data: DataBlock,
4563 repdef: crate::repdef::SerializedRepDefs,
4564 row_number: u64,
4565 dictionary_data: Option<DataBlock>,
4566 num_rows: u64,
4567 support_large_chunk: bool,
4568 ) -> Result<EncodedPage> {
4569 if let DataBlock::AllNull(_null_block) = data {
4570 unreachable!()
4573 }
4574
4575 let num_items = data.num_values();
4576
4577 let compressor = compression_strategy.create_miniblock_compressor(field, &data)?;
4578 let (compressed_data, value_encoding) = compressor.compress(data)?;
4579
4580 let max_rep = repdef.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
4581
4582 let mut compressed_rep = repdef
4583 .rep_slicer()
4584 .map(|rep_slicer| {
4585 Self::compress_levels(
4586 rep_slicer,
4587 num_items,
4588 compression_strategy,
4589 &compressed_data.chunks,
4590 max_rep,
4591 )
4592 })
4593 .transpose()?;
4594
4595 let (rep_index, rep_index_depth) =
4596 match compressed_rep.as_mut().and_then(|cr| cr.rep_index.as_mut()) {
4597 Some(rep_index) => (Some(rep_index.clone()), 1),
4598 None => (None, 0),
4599 };
4600
4601 let mut compressed_def = repdef
4602 .def_slicer()
4603 .map(|def_slicer| {
4604 Self::compress_levels(
4605 def_slicer,
4606 num_items,
4607 compression_strategy,
4608 &compressed_data.chunks,
4609 0,
4610 )
4611 })
4612 .transpose()?;
4613
4614 let rep_data = compressed_rep
4620 .as_mut()
4621 .map(|cr| std::mem::take(&mut cr.data));
4622 let def_data = compressed_def
4623 .as_mut()
4624 .map(|cd| std::mem::take(&mut cd.data));
4625
4626 let serialized =
4627 Self::serialize_miniblocks(compressed_data, rep_data, def_data, support_large_chunk)?;
4628
4629 let mut data = Vec::with_capacity(4);
4631 data.push(serialized.metadata);
4632 data.push(serialized.data);
4633
4634 if let Some(dictionary_data) = dictionary_data {
4635 let num_dictionary_items = dictionary_data.num_values();
4636 let dict_values_field = Self::build_dict_values_compressor_field(field)?;
4637
4638 let (compressor, dictionary_encoding) = compression_strategy
4639 .create_block_compressor(&dict_values_field, &dictionary_data)?;
4640 let dictionary_buffer = compressor.compress(dictionary_data)?;
4641
4642 data.push(dictionary_buffer);
4643 if let Some(rep_index) = rep_index {
4644 data.push(rep_index);
4645 }
4646
4647 let description = ProtobufUtils21::miniblock_layout(
4648 compressed_rep.map(|cr| cr.compression),
4649 compressed_def.map(|cd| cd.compression),
4650 value_encoding,
4651 rep_index_depth,
4652 serialized.num_buffers,
4653 Some((dictionary_encoding, num_dictionary_items)),
4654 &repdef.def_meaning,
4655 num_items,
4656 support_large_chunk,
4657 );
4658 Ok(EncodedPage {
4659 num_rows,
4660 column_idx,
4661 data,
4662 description: PageEncoding::Structural(description),
4663 row_number,
4664 })
4665 } else {
4666 let description = ProtobufUtils21::miniblock_layout(
4667 compressed_rep.map(|cr| cr.compression),
4668 compressed_def.map(|cd| cd.compression),
4669 value_encoding,
4670 rep_index_depth,
4671 serialized.num_buffers,
4672 None,
4673 &repdef.def_meaning,
4674 num_items,
4675 support_large_chunk,
4676 );
4677
4678 if let Some(rep_index) = rep_index {
4679 let view = rep_index.borrow_to_typed_slice::<u64>();
4680 let total = view.chunks_exact(2).map(|c| c[0]).sum::<u64>();
4681 debug_assert_eq!(total, num_rows);
4682
4683 data.push(rep_index);
4684 }
4685
4686 Ok(EncodedPage {
4687 num_rows,
4688 column_idx,
4689 data,
4690 description: PageEncoding::Structural(description),
4691 row_number,
4692 })
4693 }
4694 }
4695
4696 fn serialize_full_zip_fixed(
4698 fixed: FixedWidthDataBlock,
4699 mut repdef: ControlWordIterator,
4700 num_values: u64,
4701 ) -> Result<SerializedFullZip> {
4702 if !fixed.bits_per_value.is_multiple_of(8) {
4703 return Err(Error::invalid_input_source(
4704 format!(
4705 "Full-zip fixed-width values must be byte aligned, got {} bits per value",
4706 fixed.bits_per_value
4707 )
4708 .into(),
4709 ));
4710 }
4711
4712 let len = fixed.data.len() + repdef.bytes_per_word() * num_values as usize;
4713 let mut zipped_data = Vec::with_capacity(len);
4714
4715 let max_rep_index_val = if repdef.has_repetition() {
4716 len as u64
4717 } else {
4718 0
4720 };
4721 let mut rep_index_builder =
4722 BytepackedIntegerEncoder::with_capacity(num_values as usize + 1, max_rep_index_val);
4723
4724 let bytes_per_value = fixed.bits_per_value as usize / 8;
4725 let mut offset = 0;
4726
4727 if bytes_per_value == 0 {
4728 while let Some(control) = repdef.append_next(&mut zipped_data) {
4730 if control.is_new_row {
4731 debug_assert!(offset <= len);
4733 unsafe { rep_index_builder.append(offset as u64) };
4735 }
4736 offset = zipped_data.len();
4737 }
4738 } else {
4739 let mut data_iter = fixed.data.chunks_exact(bytes_per_value);
4741 while let Some(control) = repdef.append_next(&mut zipped_data) {
4742 if control.is_new_row {
4743 debug_assert!(offset <= len);
4745 unsafe { rep_index_builder.append(offset as u64) };
4747 }
4748 if control.is_visible {
4749 let value = data_iter.next().unwrap();
4750 zipped_data.extend_from_slice(value);
4751 }
4752 offset = zipped_data.len();
4753 }
4754 }
4755
4756 debug_assert_eq!(zipped_data.len(), len);
4757 unsafe {
4760 rep_index_builder.append(zipped_data.len() as u64);
4761 }
4762
4763 let zipped_data = LanceBuffer::from(zipped_data);
4764 let rep_index = rep_index_builder.into_data();
4765 let rep_index = if rep_index.is_empty() {
4766 None
4767 } else {
4768 Some(LanceBuffer::from(rep_index))
4769 };
4770 Ok(SerializedFullZip {
4771 values: zipped_data,
4772 repetition_index: rep_index,
4773 })
4774 }
4775
4776 fn serialize_full_zip_variable(
4780 variable: VariableWidthBlock,
4781 mut repdef: ControlWordIterator,
4782 num_items: u64,
4783 ) -> Result<SerializedFullZip> {
4784 let bytes_per_offset = variable.bits_per_offset as usize / 8;
4785 if !variable.bits_per_offset.is_multiple_of(8) {
4786 return Err(Error::invalid_input_source(
4787 format!(
4788 "Full-zip variable-width offsets must be byte aligned, got {} bits per offset",
4789 variable.bits_per_offset
4790 )
4791 .into(),
4792 ));
4793 }
4794 let len = variable.data.len()
4795 + repdef.bytes_per_word() * num_items as usize
4796 + bytes_per_offset * variable.num_values as usize;
4797 let mut buf = Vec::with_capacity(len);
4798
4799 let max_rep_index_val = len as u64;
4800 let mut rep_index_builder =
4801 BytepackedIntegerEncoder::with_capacity(num_items as usize + 1, max_rep_index_val);
4802
4803 match bytes_per_offset {
4805 4 => {
4806 let offs = variable.offsets.borrow_to_typed_slice::<u32>();
4807 let mut rep_offset = 0;
4808 let mut windows_iter = offs.as_ref().windows(2);
4809 while let Some(control) = repdef.append_next(&mut buf) {
4810 if control.is_new_row {
4811 debug_assert!(rep_offset <= len);
4813 unsafe { rep_index_builder.append(rep_offset as u64) };
4815 }
4816 if control.is_visible {
4817 let window = windows_iter.next().unwrap();
4818 if control.is_valid_item {
4819 buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
4820 buf.extend_from_slice(
4821 &variable.data[window[0] as usize..window[1] as usize],
4822 );
4823 }
4824 }
4825 rep_offset = buf.len();
4826 }
4827 }
4828 8 => {
4829 let offs = variable.offsets.borrow_to_typed_slice::<u64>();
4830 let mut rep_offset = 0;
4831 let mut windows_iter = offs.as_ref().windows(2);
4832 while let Some(control) = repdef.append_next(&mut buf) {
4833 if control.is_new_row {
4834 debug_assert!(rep_offset <= len);
4836 unsafe { rep_index_builder.append(rep_offset as u64) };
4838 }
4839 if control.is_visible {
4840 let window = windows_iter.next().unwrap();
4841 if control.is_valid_item {
4842 buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
4843 buf.extend_from_slice(
4844 &variable.data[window[0] as usize..window[1] as usize],
4845 );
4846 }
4847 }
4848 rep_offset = buf.len();
4849 }
4850 }
4851 _ => {
4852 return Err(Error::invalid_input_source(
4853 format!(
4854 "Full-zip variable-width offsets must be 32 or 64 bits, got {} bits",
4855 variable.bits_per_offset
4856 )
4857 .into(),
4858 ));
4859 }
4860 }
4861
4862 debug_assert!(buf.len() <= len);
4865 unsafe {
4868 rep_index_builder.append(buf.len() as u64);
4869 }
4870
4871 let zipped_data = LanceBuffer::from(buf);
4872 let rep_index = rep_index_builder.into_data();
4873 debug_assert!(!rep_index.is_empty());
4874 let rep_index = Some(LanceBuffer::from(rep_index));
4875 Ok(SerializedFullZip {
4876 values: zipped_data,
4877 repetition_index: rep_index,
4878 })
4879 }
4880
4881 fn serialize_full_zip(
4884 compressed_data: PerValueDataBlock,
4885 repdef: ControlWordIterator,
4886 num_items: u64,
4887 ) -> Result<SerializedFullZip> {
4888 match compressed_data {
4889 PerValueDataBlock::Fixed(fixed) => {
4890 Self::serialize_full_zip_fixed(fixed, repdef, num_items)
4891 }
4892 PerValueDataBlock::Variable(var) => {
4893 Self::serialize_full_zip_variable(var, repdef, num_items)
4894 }
4895 }
4896 }
4897
4898 fn expand_boolean_to_bytes(fixed: FixedWidthDataBlock) -> FixedWidthDataBlock {
4899 debug_assert_eq!(fixed.bits_per_value, 1);
4900 let num_values = fixed.num_values as usize;
4901 let bool_buf = BooleanBuffer::new(fixed.data.into_buffer(), 0, num_values);
4902 let expanded: Vec<u8> = (0..num_values).map(|i| bool_buf.value(i) as u8).collect();
4903 FixedWidthDataBlock {
4904 data: LanceBuffer::from(expanded),
4905 bits_per_value: 8,
4906 num_values: fixed.num_values,
4907 block_info: BlockInfo::new(),
4908 }
4909 }
4910
4911 fn encode_full_zip(
4912 column_idx: u32,
4913 field: &Field,
4914 compression_strategy: &dyn CompressionStrategy,
4915 data: DataBlock,
4916 repdef: crate::repdef::SerializedRepDefs,
4917 row_number: u64,
4918 num_lists: u64,
4919 ) -> Result<EncodedPage> {
4920 let max_rep = repdef
4921 .repetition_levels
4922 .as_ref()
4923 .map_or(0, |r| r.iter().max().copied().unwrap_or(0));
4924 let max_def = repdef
4925 .definition_levels
4926 .as_ref()
4927 .map_or(0, |d| d.iter().max().copied().unwrap_or(0));
4928
4929 let (num_items, num_visible_items) =
4933 if let Some(rep_levels) = repdef.repetition_levels.as_ref() {
4934 (rep_levels.len() as u64, data.num_values())
4937 } else {
4938 (data.num_values(), data.num_values())
4940 };
4941
4942 let max_visible_def = repdef.max_visible_level.unwrap_or(u16::MAX);
4943
4944 let repdef_iter = build_control_word_iterator(
4945 repdef.repetition_levels.as_deref(),
4946 max_rep,
4947 repdef.definition_levels.as_deref(),
4948 max_def,
4949 max_visible_def,
4950 num_items as usize,
4951 );
4952 let bits_rep = repdef_iter.bits_rep();
4953 let bits_def = repdef_iter.bits_def();
4954
4955 let data = match data {
4957 DataBlock::FixedWidth(fixed) if fixed.bits_per_value == 1 => {
4958 DataBlock::FixedWidth(Self::expand_boolean_to_bytes(fixed))
4959 }
4960 other => other,
4961 };
4962
4963 let compressor = compression_strategy.create_per_value(field, &data)?;
4964 let (compressed_data, value_encoding) = compressor.compress(data)?;
4965
4966 let description = match &compressed_data {
4967 PerValueDataBlock::Fixed(fixed) => ProtobufUtils21::fixed_full_zip_layout(
4968 bits_rep,
4969 bits_def,
4970 fixed.bits_per_value as u32,
4971 value_encoding,
4972 &repdef.def_meaning,
4973 num_items as u32,
4974 num_visible_items as u32,
4975 ),
4976 PerValueDataBlock::Variable(variable) => ProtobufUtils21::variable_full_zip_layout(
4977 bits_rep,
4978 bits_def,
4979 variable.bits_per_offset as u32,
4980 value_encoding,
4981 &repdef.def_meaning,
4982 num_items as u32,
4983 num_visible_items as u32,
4984 ),
4985 };
4986
4987 let zipped = Self::serialize_full_zip(compressed_data, repdef_iter, num_items)?;
4988
4989 let data = if let Some(repindex) = zipped.repetition_index {
4990 vec![zipped.values, repindex]
4991 } else {
4992 vec![zipped.values]
4993 };
4994
4995 Ok(EncodedPage {
4996 num_rows: num_lists,
4997 column_idx,
4998 data,
4999 description: PageEncoding::Structural(description),
5000 row_number,
5001 })
5002 }
5003
5004 fn should_dictionary_encode(
5005 data_block: &DataBlock,
5006 field: &Field,
5007 version: LanceFileVersion,
5008 ) -> Option<DictEncodingBudget> {
5009 const DEFAULT_SAMPLE_SIZE: usize = 4096;
5010 const DEFAULT_SAMPLE_UNIQUE_RATIO: f64 = 0.98;
5011
5012 match data_block {
5015 DataBlock::FixedWidth(fixed) => {
5016 if fixed.bits_per_value == 64 && version < LanceFileVersion::V2_2 {
5017 return None;
5018 }
5019 if fixed.bits_per_value != 64 && fixed.bits_per_value != 128 {
5020 return None;
5021 }
5022 if fixed.bits_per_value % 8 != 0 {
5023 return None;
5024 }
5025 }
5026 DataBlock::VariableWidth(var) => {
5027 if var.bits_per_offset != 32 && var.bits_per_offset != 64 {
5028 return None;
5029 }
5030 }
5031 _ => return None,
5032 }
5033
5034 let too_small = env::var("LANCE_ENCODING_DICT_TOO_SMALL")
5036 .ok()
5037 .and_then(|val| val.parse().ok())
5038 .unwrap_or(100);
5039 if data_block.num_values() < too_small {
5040 return None;
5041 }
5042
5043 let num_values = data_block.num_values();
5044
5045 let divisor: u64 = field
5048 .metadata
5049 .get(DICT_DIVISOR_META_KEY)
5050 .and_then(|val| val.parse().ok())
5051 .or_else(|| {
5052 env::var("LANCE_ENCODING_DICT_DIVISOR")
5053 .ok()
5054 .and_then(|val| val.parse().ok())
5055 })
5056 .unwrap_or(DEFAULT_DICT_DIVISOR);
5057
5058 let max_cardinality: u64 = env::var("LANCE_ENCODING_DICT_MAX_CARDINALITY")
5059 .ok()
5060 .and_then(|val| val.parse().ok())
5061 .unwrap_or(DEFAULT_DICT_MAX_CARDINALITY);
5062
5063 let threshold_cardinality = num_values
5064 .checked_div(divisor.max(1))
5065 .unwrap_or(0)
5066 .min(max_cardinality);
5067 if threshold_cardinality == 0 {
5068 return None;
5069 }
5070
5071 let threshold_ratio = field
5073 .metadata
5074 .get(DICT_SIZE_RATIO_META_KEY)
5075 .and_then(|val| val.parse::<f64>().ok())
5076 .or_else(|| {
5077 env::var("LANCE_ENCODING_DICT_SIZE_RATIO")
5078 .ok()
5079 .and_then(|val| val.parse().ok())
5080 })
5081 .unwrap_or(DEFAULT_DICT_SIZE_RATIO);
5082
5083 if threshold_ratio <= 0.0 || threshold_ratio > 1.0 {
5084 panic!(
5085 "Invalid parameter: dict-size-ratio is {} which is not in the range (0, 1].",
5086 threshold_ratio
5087 );
5088 }
5089
5090 let data_size = data_block.data_size();
5091 if data_size == 0 {
5092 return None;
5093 }
5094
5095 let max_encoded_size = (data_size as f64 * threshold_ratio) as u64;
5096 let max_encoded_size = usize::try_from(max_encoded_size).ok()?;
5097
5098 if let Some(sample_unique_ratio) =
5101 Self::sample_unique_ratio(data_block, DEFAULT_SAMPLE_SIZE)?
5102 {
5103 if sample_unique_ratio >= DEFAULT_SAMPLE_UNIQUE_RATIO {
5104 return None;
5105 }
5106
5107 let projected_cardinality = (sample_unique_ratio * num_values as f64).ceil() as u64;
5108 if projected_cardinality > threshold_cardinality {
5109 return None;
5110 }
5111 }
5112
5113 let max_dict_entries = u32::try_from(threshold_cardinality.min(i32::MAX as u64)).ok()?;
5114 Some(DictEncodingBudget {
5115 max_dict_entries,
5116 max_encoded_size,
5117 })
5118 }
5119
5120 fn sample_unique_ratio(data_block: &DataBlock, max_samples: usize) -> Option<Option<f64>> {
5128 use std::collections::HashSet;
5129
5130 const NUM_SAMPLE_BLOCKS: usize = 32;
5131 const MIN_RELIABLE_SAMPLES: usize = 1024;
5132
5133 let num_values = usize::try_from(data_block.num_values()).ok()?;
5134 if num_values == 0 {
5135 return Some(None);
5136 }
5137
5138 let sample_count = num_values.min(max_samples).max(1);
5139 if sample_count < MIN_RELIABLE_SAMPLES {
5140 return Some(None);
5141 }
5142
5143 let block_count = NUM_SAMPLE_BLOCKS.min(sample_count).min(num_values).max(1);
5144 let samples_per_block = (sample_count / block_count).max(1);
5145 let mut indices = Vec::with_capacity(sample_count);
5146 for block_idx in 0..block_count {
5147 let block_start = block_idx * num_values / block_count;
5148 let next_block_start = ((block_idx + 1) * num_values / block_count).min(num_values);
5149 let block_len = next_block_start.saturating_sub(block_start);
5150 let samples_in_block = samples_per_block.min(block_len);
5151 indices.extend((0..samples_in_block).map(|offset| block_start + offset));
5152 }
5153
5154 if indices.len() < MIN_RELIABLE_SAMPLES {
5155 return Some(None);
5156 }
5157
5158 let ratio = match data_block {
5159 DataBlock::FixedWidth(fixed) => match fixed.bits_per_value {
5160 64 => {
5161 let values = fixed.data.borrow_to_typed_slice::<u64>();
5162 let values = values.as_ref();
5163 let mut unique: HashSet<u64> =
5164 HashSet::with_capacity(indices.len().min(MIN_RELIABLE_SAMPLES));
5165 for idx in indices.iter().copied() {
5166 unique.insert(values.get(idx).copied()?);
5167 }
5168 unique.len() as f64 / indices.len() as f64
5169 }
5170 128 => {
5171 let values = fixed.data.borrow_to_typed_slice::<u128>();
5172 let values = values.as_ref();
5173 let mut unique: HashSet<u128> =
5174 HashSet::with_capacity(indices.len().min(MIN_RELIABLE_SAMPLES));
5175 for idx in indices.iter().copied() {
5176 unique.insert(values.get(idx).copied()?);
5177 }
5178 unique.len() as f64 / indices.len() as f64
5179 }
5180 _ => return Some(None),
5181 },
5182 DataBlock::VariableWidth(var) => {
5183 use xxhash_rust::xxh3::xxh3_64;
5184
5185 let mut unique: HashSet<u64> =
5187 HashSet::with_capacity(indices.len().min(MIN_RELIABLE_SAMPLES));
5188 match var.bits_per_offset {
5189 32 => {
5190 let offsets_ref = var.offsets.borrow_to_typed_slice::<u32>();
5191 let offsets: &[u32] = offsets_ref.as_ref();
5192 for i in indices.iter().copied() {
5193 let start = usize::try_from(*offsets.get(i)?).ok()?;
5194 let end = usize::try_from(*offsets.get(i + 1)?).ok()?;
5195 if start > end || end > var.data.len() {
5196 return None;
5197 }
5198 unique.insert(xxh3_64(&var.data[start..end]));
5199 }
5200 }
5201 64 => {
5202 let offsets_ref = var.offsets.borrow_to_typed_slice::<u64>();
5203 let offsets: &[u64] = offsets_ref.as_ref();
5204 for i in indices.iter().copied() {
5205 let start = usize::try_from(*offsets.get(i)?).ok()?;
5206 let end = usize::try_from(*offsets.get(i + 1)?).ok()?;
5207 if start > end || end > var.data.len() {
5208 return None;
5209 }
5210 unique.insert(xxh3_64(&var.data[start..end]));
5211 }
5212 }
5213 _ => return Some(None),
5214 }
5215 unique.len() as f64 / indices.len() as f64
5216 }
5217 _ => return Some(None),
5218 };
5219
5220 Some(Some(ratio))
5221 }
5222
5223 fn slice_repdef(repdef: &SerializedRepDefs, range: Range<usize>) -> SerializedRepDefs {
5224 let repetition_levels = repdef
5225 .repetition_levels
5226 .as_ref()
5227 .map(|levels| levels[range.clone()].to_vec());
5228 let definition_levels = repdef
5229 .definition_levels
5230 .as_ref()
5231 .map(|levels| levels[range].to_vec());
5232 SerializedRepDefs::new_with_fixed_size_list_levels(
5233 repetition_levels,
5234 definition_levels,
5235 repdef.def_meaning.clone(),
5236 repdef.has_fixed_size_list_levels(),
5237 )
5238 }
5239
5240 fn slice_arrays(
5241 arrays: &[ArrayRef],
5242 value_start: u64,
5243 num_values: u64,
5244 ) -> Result<Vec<ArrayRef>> {
5245 if num_values == 0 {
5246 return Ok(Vec::new());
5247 }
5248
5249 let mut values_to_skip = usize::try_from(value_start).map_err(|_| {
5250 Error::invalid_input(format!("Value start {} is too large", value_start))
5251 })?;
5252 let mut values_remaining = usize::try_from(num_values).map_err(|_| {
5253 Error::invalid_input(format!("Value count {} is too large", num_values))
5254 })?;
5255 let mut sliced = Vec::new();
5256
5257 for array in arrays {
5258 if values_to_skip >= array.len() {
5259 values_to_skip -= array.len();
5260 continue;
5261 }
5262
5263 let offset = values_to_skip;
5264 let len = (array.len() - offset).min(values_remaining);
5265 sliced.push(array.slice(offset, len));
5266 values_remaining -= len;
5267 values_to_skip = 0;
5268
5269 if values_remaining == 0 {
5270 break;
5271 }
5272 }
5273
5274 if values_remaining != 0 {
5275 return Err(Error::internal(format!(
5276 "Page split requested {} values starting at {}, but the page did not contain enough values",
5277 num_values, value_start
5278 )));
5279 }
5280
5281 Ok(sliced)
5282 }
5283
5284 fn split_structural_pages_for_miniblock_budget(
5285 arrays: Vec<ArrayRef>,
5286 repdef: SerializedRepDefs,
5287 plan: StructuralPagePlan,
5288 row_number: u64,
5289 num_rows: u64,
5290 ) -> Result<Vec<PrimitivePageData>> {
5291 if plan == StructuralPagePlan::Fits {
5292 return Ok(vec![PrimitivePageData {
5293 arrays,
5294 repdef,
5295 row_number,
5296 num_rows,
5297 unsplittable_miniblock_levels: None,
5298 }]);
5299 }
5300 if let StructuralPagePlan::UnsplittableOverBudget(num_levels) = plan {
5301 return Ok(vec![PrimitivePageData {
5302 arrays,
5303 repdef,
5304 row_number,
5305 num_rows,
5306 unsplittable_miniblock_levels: Some(num_levels),
5307 }]);
5308 }
5309
5310 let StructuralPagePlan::Split(splits) = plan else {
5311 unreachable!();
5312 };
5313
5314 let mut pages = Vec::with_capacity(splits.len());
5315 for split in splits {
5316 let arrays = Self::slice_arrays(&arrays, split.value_start, split.num_values)?;
5317 let repdef = Self::slice_repdef(&repdef, split.level_range);
5318 pages.push(PrimitivePageData {
5319 arrays,
5320 repdef,
5321 row_number: row_number + split.row_start,
5322 num_rows: split.num_rows,
5323 unsplittable_miniblock_levels: None,
5324 });
5325 }
5326 Ok(pages)
5327 }
5328
5329 fn encode_page(ctx: PrimitiveEncodeContext, page: PrimitivePageData) -> Result<EncodedPage> {
5330 let PrimitiveEncodeContext {
5331 column_idx,
5332 field,
5333 compression_strategy,
5334 encoding_metadata,
5335 support_large_chunk,
5336 version,
5337 is_simple_validity,
5338 has_repdef_info,
5339 } = ctx;
5340 let PrimitivePageData {
5341 arrays,
5342 repdef,
5343 row_number,
5344 num_rows,
5345 unsplittable_miniblock_levels,
5346 } = page;
5347 let num_values = arrays.iter().map(|arr| arr.len() as u64).sum();
5348
5349 if num_values == 0 {
5350 log::debug!(
5353 "Encoding column {} with {} items ({} rows) using complex-null layout",
5354 column_idx,
5355 num_values,
5356 num_rows
5357 );
5358 return Self::encode_complex_all_null(
5359 column_idx,
5360 repdef,
5361 row_number,
5362 num_rows,
5363 version,
5364 compression_strategy.as_ref(),
5365 );
5366 }
5367
5368 let leaf_validity = Self::leaf_validity(&repdef, num_values as usize)?;
5369 let all_null = leaf_validity
5370 .as_ref()
5371 .map(|validity| validity.count_set_bits() == 0)
5372 .unwrap_or(false);
5373
5374 if all_null {
5375 return if is_simple_validity {
5376 log::debug!(
5377 "Encoding column {} with {} items ({} rows) using simple-null layout",
5378 column_idx,
5379 num_values,
5380 num_rows
5381 );
5382 Self::encode_simple_all_null(column_idx, num_values, row_number)
5383 } else {
5384 log::debug!(
5385 "Encoding column {} with {} items ({} rows) using complex-null layout",
5386 column_idx,
5387 num_values,
5388 num_rows
5389 );
5390 Self::encode_complex_all_null(
5391 column_idx,
5392 repdef,
5393 row_number,
5394 num_rows,
5395 version,
5396 compression_strategy.as_ref(),
5397 )
5398 };
5399 }
5400
5401 if let DataType::Struct(fields) = &field.data_type()
5402 && fields.is_empty()
5403 {
5404 if has_repdef_info {
5405 return Err(Error::invalid_input_source(format!("Empty structs with rep/def information are not yet supported. The field {} is an empty struct that either has nulls or is in a list.", field.name).into()));
5406 }
5407 return Self::encode_simple_all_null(column_idx, num_values, row_number);
5410 }
5411
5412 let data_block = DataBlock::from_arrays(&arrays, num_values);
5413
5414 if version.resolve() >= LanceFileVersion::V2_2
5415 && let Some(scalar) = Self::find_constant_scalar(&arrays, leaf_validity.as_ref())?
5416 {
5417 log::debug!(
5418 "Encoding column {} with {} items ({} rows) using constant layout",
5419 column_idx,
5420 num_values,
5421 num_rows
5422 );
5423 return constant::encode_constant_page(
5424 column_idx, scalar, repdef, row_number, num_rows,
5425 );
5426 }
5427
5428 if let Some(num_levels) = unsplittable_miniblock_levels {
5429 let requested_encoding = encoding_metadata
5430 .get(STRUCTURAL_ENCODING_META_KEY)
5431 .map(|requested| requested.to_lowercase());
5432 let fullzip_error = match &data_block {
5433 DataBlock::FixedWidth(fixed) if !fixed.bits_per_value.is_multiple_of(8) => {
5434 Some(format!(
5435 "Full-zip fixed-width values must be byte aligned, got {} bits per value",
5436 fixed.bits_per_value
5437 ))
5438 }
5439 DataBlock::VariableWidth(variable)
5440 if !variable.bits_per_offset.is_multiple_of(8) =>
5441 {
5442 Some(format!(
5443 "Full-zip variable-width offsets must be byte aligned, got {} bits per offset",
5444 variable.bits_per_offset
5445 ))
5446 }
5447 DataBlock::VariableWidth(variable)
5448 if variable.bits_per_offset != 32 && variable.bits_per_offset != 64 =>
5449 {
5450 Some(format!(
5451 "Full-zip variable-width offsets must be 32 or 64 bits, got {} bits",
5452 variable.bits_per_offset
5453 ))
5454 }
5455 DataBlock::Struct(struct_data_block)
5456 if !struct_data_block.has_variable_width_child() =>
5457 {
5458 Some(
5459 "Full-zip packed struct requires at least one variable-width child"
5460 .to_string(),
5461 )
5462 }
5463 DataBlock::Dictionary(_) => {
5464 Some("Full-zip does not encode dictionary data blocks directly".to_string())
5465 }
5466 DataBlock::FixedSizeList(fsl) => match fsl.clone().try_into_flat() {
5467 Some(flat) if flat.bits_per_value.is_multiple_of(8) => None,
5468 Some(flat) => Some(format!(
5469 "Full-zip fixed-size-list values must be byte aligned after flattening, got {} bits per value",
5470 flat.bits_per_value
5471 )),
5472 None => Some(
5473 "Full-zip fixed-size-list capability requires a flat fixed-width child"
5474 .to_string(),
5475 ),
5476 },
5477 DataBlock::FixedWidth(_) | DataBlock::VariableWidth(_) | DataBlock::Struct(_) => {
5478 None
5479 }
5480 other => Some(format!(
5481 "Full-zip does not support value block type {}",
5482 other.name()
5483 )),
5484 };
5485 match requested_encoding.as_deref() {
5486 Some(STRUCTURAL_ENCODING_FULLZIP) => {
5487 if let Some(reason) = fullzip_error {
5488 return Err(Error::invalid_input_source(reason.into()));
5489 }
5490 return Self::encode_full_zip(
5491 column_idx,
5492 &field,
5493 compression_strategy.as_ref(),
5494 data_block,
5495 repdef,
5496 row_number,
5497 num_rows,
5498 );
5499 }
5500 Some(STRUCTURAL_ENCODING_MINIBLOCK) | None => {
5501 if requested_encoding.is_none() && fullzip_error.is_none() {
5502 log::debug!(
5503 "Encoding column {} with {} items using full-zip layout because mini-block cannot split the structural page",
5504 column_idx,
5505 num_values
5506 );
5507 return Self::encode_full_zip(
5508 column_idx,
5509 &field,
5510 compression_strategy.as_ref(),
5511 data_block,
5512 repdef,
5513 row_number,
5514 num_rows,
5515 );
5516 }
5517 return Err(Error::invalid_input_source(
5518 format!(
5519 "Mini-block cannot encode {} rep/def levels in one top-level row. \
5520 This usually means the row contains too much nested structure \
5521 for the current layout.",
5522 num_levels
5523 )
5524 .into(),
5525 ));
5526 }
5527 _ => {}
5528 }
5529 }
5530
5531 let requires_full_zip_packed_struct =
5532 if let DataBlock::Struct(ref struct_data_block) = data_block {
5533 struct_data_block.has_variable_width_child()
5534 } else {
5535 false
5536 };
5537
5538 if requires_full_zip_packed_struct {
5539 log::debug!(
5540 "Encoding column {} with {} items using full-zip packed struct layout",
5541 column_idx,
5542 num_values
5543 );
5544 return Self::encode_full_zip(
5545 column_idx,
5546 &field,
5547 compression_strategy.as_ref(),
5548 data_block,
5549 repdef,
5550 row_number,
5551 num_rows,
5552 );
5553 }
5554
5555 if let DataBlock::Dictionary(dict) = data_block {
5556 log::debug!(
5557 "Encoding column {} with {} items using dictionary encoding (already dictionary encoded)",
5558 column_idx,
5559 num_values
5560 );
5561 let (mut indices_data_block, dictionary_data_block) = dict.into_parts();
5562 indices_data_block.compute_stat();
5567 return Self::encode_miniblock(
5568 column_idx,
5569 &field,
5570 compression_strategy.as_ref(),
5571 indices_data_block,
5572 repdef,
5573 row_number,
5574 Some(dictionary_data_block),
5575 num_rows,
5576 support_large_chunk,
5577 );
5578 }
5579
5580 let dict_result = Self::should_dictionary_encode(&data_block, &field, version).and_then(|budget| {
5583 log::debug!(
5584 "Encoding column {} with {} items using dictionary encoding (mini-block layout)",
5585 column_idx,
5586 num_values
5587 );
5588 dict::dictionary_encode(
5589 &data_block,
5590 budget.max_dict_entries,
5591 budget.max_encoded_size,
5592 )
5593 });
5594
5595 if let Some((indices_data_block, dictionary_data_block)) = dict_result {
5596 Self::encode_miniblock(
5597 column_idx,
5598 &field,
5599 compression_strategy.as_ref(),
5600 indices_data_block,
5601 repdef,
5602 row_number,
5603 Some(dictionary_data_block),
5604 num_rows,
5605 support_large_chunk,
5606 )
5607 } else if Self::prefers_miniblock(&data_block, encoding_metadata.as_ref()) {
5608 log::debug!(
5609 "Encoding column {} with {} items using mini-block layout",
5610 column_idx,
5611 num_values
5612 );
5613 Self::encode_miniblock(
5614 column_idx,
5615 &field,
5616 compression_strategy.as_ref(),
5617 data_block,
5618 repdef,
5619 row_number,
5620 None,
5621 num_rows,
5622 support_large_chunk,
5623 )
5624 } else if Self::prefers_fullzip(encoding_metadata.as_ref()) {
5625 log::debug!(
5626 "Encoding column {} with {} items using full-zip layout",
5627 column_idx,
5628 num_values
5629 );
5630 Self::encode_full_zip(
5631 column_idx,
5632 &field,
5633 compression_strategy.as_ref(),
5634 data_block,
5635 repdef,
5636 row_number,
5637 num_rows,
5638 )
5639 } else {
5640 Err(Error::invalid_input_source(format!("Cannot determine structural encoding for field {}. This typically indicates an invalid value of the field metadata key {}", field.name, STRUCTURAL_ENCODING_META_KEY).into()))
5641 }
5642 }
5643
5644 fn do_flush(
5646 &mut self,
5647 arrays: Vec<ArrayRef>,
5648 repdefs: Vec<RepDefBuilder>,
5649 row_number: u64,
5650 num_rows: u64,
5651 ) -> Result<Vec<EncodeTask>> {
5652 let num_values = arrays.iter().map(|arr| arr.len() as u64).sum();
5653 let is_simple_validity = repdefs.iter().all(|rd| rd.is_simple_validity());
5654 let has_repdef_info = repdefs.iter().any(|rd| !rd.is_empty());
5655 let (repdef, structural_plan) = RepDefBuilder::serialize_with_structural_plan(
5656 repdefs,
5657 miniblock::max_repdef_levels_per_chunk,
5658 num_rows,
5659 num_values,
5660 )?;
5661 let pages = Self::split_structural_pages_for_miniblock_budget(
5662 arrays,
5663 repdef,
5664 structural_plan,
5665 row_number,
5666 num_rows,
5667 )?;
5668
5669 let mut tasks = Vec::with_capacity(pages.len());
5670 let ctx = PrimitiveEncodeContext {
5671 column_idx: self.column_index,
5672 field: self.field.clone(),
5673 compression_strategy: self.compression_strategy.clone(),
5674 encoding_metadata: self.encoding_metadata.clone(),
5675 support_large_chunk: self.support_large_chunk,
5676 version: self.version,
5677 is_simple_validity,
5678 has_repdef_info,
5679 };
5680 for page in pages {
5681 let ctx = ctx.clone();
5682 let task = spawn_cpu(move || Self::encode_page(ctx, page)).boxed();
5683 tasks.push(task);
5684 }
5685 Ok(tasks)
5686 }
5687
5688 fn extract_validity_buf(
5689 array: Arc<dyn Array>,
5690 repdef: &mut RepDefBuilder,
5691 keep_original_array: bool,
5692 ) -> Result<Arc<dyn Array>> {
5693 if let Some(validity) = array.nulls() {
5694 if keep_original_array {
5695 repdef.add_validity_bitmap(validity.clone());
5696 } else {
5697 repdef.add_validity_bitmap(deep_copy_nulls(Some(validity)).unwrap());
5698 }
5699 let data_no_nulls = array.to_data().into_builder().nulls(None).build()?;
5700 Ok(make_array(data_no_nulls))
5701 } else {
5702 repdef.add_no_null(array.len());
5703 Ok(array)
5704 }
5705 }
5706
5707 fn extract_validity(
5708 mut array: Arc<dyn Array>,
5709 repdef: &mut RepDefBuilder,
5710 keep_original_array: bool,
5711 ) -> Result<Arc<dyn Array>> {
5712 match array.data_type() {
5713 DataType::Null => {
5714 repdef.add_validity_bitmap(NullBuffer::new(BooleanBuffer::new_unset(array.len())));
5715 Ok(array)
5716 }
5717 DataType::Dictionary(_, _) => {
5718 array = dict::normalize_dict_nulls(array)?;
5719 Self::extract_validity_buf(array, repdef, keep_original_array)
5720 }
5721 _ => Self::extract_validity_buf(array, repdef, keep_original_array),
5730 }
5731 }
5732}
5733
5734impl FieldEncoder for PrimitiveStructuralEncoder {
5735 fn maybe_encode(
5737 &mut self,
5738 array: ArrayRef,
5739 _external_buffers: &mut OutOfLineBuffers,
5740 mut repdef: RepDefBuilder,
5741 row_number: u64,
5742 num_rows: u64,
5743 ) -> Result<Vec<EncodeTask>> {
5744 let array = Self::extract_validity(array, &mut repdef, self.keep_original_array)?;
5745 self.accumulated_repdefs.push(repdef);
5746
5747 if let Some((arrays, row_number, num_rows)) =
5748 self.accumulation_queue.insert(array, row_number, num_rows)
5749 {
5750 let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
5751 Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
5752 } else {
5753 Ok(vec![])
5754 }
5755 }
5756
5757 fn flush(&mut self, _external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
5759 if let Some((arrays, row_number, num_rows)) = self.accumulation_queue.flush() {
5760 let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
5761 Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
5762 } else {
5763 Ok(vec![])
5764 }
5765 }
5766
5767 fn num_columns(&self) -> u32 {
5768 1
5769 }
5770
5771 fn finish(
5772 &mut self,
5773 _external_buffers: &mut OutOfLineBuffers,
5774 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
5775 std::future::ready(Ok(vec![EncodedColumn::default()])).boxed()
5776 }
5777}
5778
5779#[cfg(test)]
5780#[allow(clippy::single_range_in_vec_init)]
5781mod tests {
5782 use super::{
5783 ChunkInstructions, DataBlock, DecodeMiniBlockTask, FixedPerValueDecompressor,
5784 FixedWidthDataBlock, FullZipCacheableState, FullZipDecodeDetails, FullZipReadSource,
5785 FullZipRepIndexDetails, FullZipScheduler, MiniBlockChunk, MiniBlockCompressed,
5786 MiniBlockRepIndex, PerValueDecompressor, PreambleAction, StructuralPageScheduler,
5787 VariableFullZipDecoder,
5788 };
5789 use crate::buffer::LanceBuffer;
5790 use crate::compression::DefaultDecompressionStrategy;
5791 use crate::constants::{
5792 COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, DICT_VALUES_COMPRESSION_LEVEL_META_KEY,
5793 DICT_VALUES_COMPRESSION_META_KEY, STRUCTURAL_ENCODING_META_KEY,
5794 STRUCTURAL_ENCODING_MINIBLOCK,
5795 };
5796 use crate::data::BlockInfo;
5797 use crate::decoder::PageEncoding;
5798 use crate::encodings::logical::primitive::{
5799 ChunkDrainInstructions, PrimitiveStructuralEncoder,
5800 };
5801 use crate::format::ProtobufUtils21;
5802 use crate::format::pb21;
5803 use crate::format::pb21::compressive_encoding::Compression;
5804 use crate::repdef::build_control_word_iterator;
5805 use crate::testing::{TestCases, check_round_trip_encoding_of_data};
5806 use crate::version::LanceFileVersion;
5807 use arrow_array::{ArrayRef, Int8Array, StringArray};
5808 use arrow_schema::DataType;
5809 use std::collections::HashMap;
5810 use std::{collections::VecDeque, sync::Arc};
5811
5812 #[test]
5813 fn test_is_narrow() {
5814 let int8_array = Int8Array::from(vec![1, 2, 3]);
5815 let array_ref: ArrayRef = Arc::new(int8_array);
5816 let block = DataBlock::from_array(array_ref);
5817
5818 assert!(PrimitiveStructuralEncoder::is_narrow(&block));
5819
5820 let string_array = StringArray::from(vec![Some("hello"), Some("world")]);
5821 let block = DataBlock::from_array(string_array);
5822 assert!(PrimitiveStructuralEncoder::is_narrow(&block));
5823
5824 let string_array = StringArray::from(vec![
5825 Some("hello world".repeat(100)),
5826 Some("world".to_string()),
5827 ]);
5828 let block = DataBlock::from_array(string_array);
5829 assert!((!PrimitiveStructuralEncoder::is_narrow(&block)));
5830 }
5831
5832 #[test]
5833 fn test_fullzip_fixed_rejects_non_byte_aligned_values() {
5834 let fixed = FixedWidthDataBlock {
5835 data: LanceBuffer::from(vec![0_u8]),
5836 bits_per_value: 1,
5837 num_values: 8,
5838 block_info: BlockInfo::new(),
5839 };
5840 let repdef = build_control_word_iterator(None, 0, None, 0, u16::MAX, 8);
5841
5842 let Err(err) = PrimitiveStructuralEncoder::serialize_full_zip_fixed(fixed, repdef, 8)
5843 else {
5844 panic!("expected full-zip to reject 1-bit fixed-width values");
5845 };
5846 assert!(
5847 err.to_string().contains("byte aligned"),
5848 "unexpected error: {err}"
5849 );
5850 }
5851
5852 #[test]
5853 fn test_map_range() {
5854 let rep = Some(vec![1, 0, 0, 1, 0, 1, 1, 0, 0]);
5857 let def = Some(vec![0, 0, 0, 0, 0, 1, 0, 0, 0]);
5858 let max_visible_def = 0;
5859 let total_items = 8;
5860 let max_rep = 1;
5861
5862 let check = |range, expected_item_range, expected_level_range| {
5863 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5864 range,
5865 rep.as_ref(),
5866 def.as_ref(),
5867 max_rep,
5868 max_visible_def,
5869 total_items,
5870 PreambleAction::Absent,
5871 );
5872 assert_eq!(item_range, expected_item_range);
5873 assert_eq!(level_range, expected_level_range);
5874 };
5875
5876 check(0..1, 0..3, 0..3);
5877 check(1..2, 3..5, 3..5);
5878 check(2..3, 5..5, 5..6);
5879 check(3..4, 5..8, 6..9);
5880 check(0..2, 0..5, 0..5);
5881 check(1..3, 3..5, 3..6);
5882 check(2..4, 5..8, 5..9);
5883 check(0..3, 0..5, 0..6);
5884 check(1..4, 3..8, 3..9);
5885 check(0..4, 0..8, 0..9);
5886
5887 let rep = Some(vec![1, 1, 0, 1]);
5890 let def = Some(vec![1, 0, 0, 0]);
5891 let max_visible_def = 0;
5892 let total_items = 3;
5893
5894 let check = |range, expected_item_range, expected_level_range| {
5895 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5896 range,
5897 rep.as_ref(),
5898 def.as_ref(),
5899 max_rep,
5900 max_visible_def,
5901 total_items,
5902 PreambleAction::Absent,
5903 );
5904 assert_eq!(item_range, expected_item_range);
5905 assert_eq!(level_range, expected_level_range);
5906 };
5907
5908 check(0..1, 0..0, 0..1);
5909 check(1..2, 0..2, 1..3);
5910 check(2..3, 2..3, 3..4);
5911 check(0..2, 0..2, 0..3);
5912 check(1..3, 0..3, 1..4);
5913 check(0..3, 0..3, 0..4);
5914
5915 let rep = Some(vec![1, 1, 0, 1]);
5918 let def = Some(vec![0, 0, 0, 1]);
5919 let max_visible_def = 0;
5920 let total_items = 3;
5921
5922 let check = |range, expected_item_range, expected_level_range| {
5923 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5924 range,
5925 rep.as_ref(),
5926 def.as_ref(),
5927 max_rep,
5928 max_visible_def,
5929 total_items,
5930 PreambleAction::Absent,
5931 );
5932 assert_eq!(item_range, expected_item_range);
5933 assert_eq!(level_range, expected_level_range);
5934 };
5935
5936 check(0..1, 0..1, 0..1);
5937 check(1..2, 1..3, 1..3);
5938 check(2..3, 3..3, 3..4);
5939 check(0..2, 0..3, 0..3);
5940 check(1..3, 1..3, 1..4);
5941 check(0..3, 0..3, 0..4);
5942
5943 let rep = Some(vec![1, 0, 1, 0, 1, 0]);
5946 let def: Option<&[u16]> = None;
5947 let max_visible_def = 0;
5948 let total_items = 6;
5949
5950 let check = |range, expected_item_range, expected_level_range| {
5951 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5952 range,
5953 rep.as_ref(),
5954 def.as_ref(),
5955 max_rep,
5956 max_visible_def,
5957 total_items,
5958 PreambleAction::Absent,
5959 );
5960 assert_eq!(item_range, expected_item_range);
5961 assert_eq!(level_range, expected_level_range);
5962 };
5963
5964 check(0..1, 0..2, 0..2);
5965 check(1..2, 2..4, 2..4);
5966 check(2..3, 4..6, 4..6);
5967 check(0..2, 0..4, 0..4);
5968 check(1..3, 2..6, 2..6);
5969 check(0..3, 0..6, 0..6);
5970
5971 let rep: Option<&[u16]> = None;
5974 let def = Some(vec![0, 0, 1, 0]);
5975 let max_visible_def = 1;
5976 let total_items = 4;
5977
5978 let check = |range, expected_item_range, expected_level_range| {
5979 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5980 range,
5981 rep.as_ref(),
5982 def.as_ref(),
5983 max_rep,
5984 max_visible_def,
5985 total_items,
5986 PreambleAction::Absent,
5987 );
5988 assert_eq!(item_range, expected_item_range);
5989 assert_eq!(level_range, expected_level_range);
5990 };
5991
5992 check(0..1, 0..1, 0..1);
5993 check(1..2, 1..2, 1..2);
5994 check(2..3, 2..3, 2..3);
5995 check(0..2, 0..2, 0..2);
5996 check(1..3, 1..3, 1..3);
5997 check(0..3, 0..3, 0..3);
5998
5999 let rep = Some(vec![0, 1, 0, 1]);
6004 let def = Some(vec![0, 0, 0, 1]);
6005 let max_visible_def = 0;
6006 let total_items = 3;
6007
6008 let check = |range, expected_item_range, expected_level_range| {
6009 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
6010 range,
6011 rep.as_ref(),
6012 def.as_ref(),
6013 max_rep,
6014 max_visible_def,
6015 total_items,
6016 PreambleAction::Take,
6017 );
6018 assert_eq!(item_range, expected_item_range);
6019 assert_eq!(level_range, expected_level_range);
6020 };
6021
6022 check(0..1, 0..3, 0..3);
6024 check(0..2, 0..3, 0..4);
6025
6026 let check = |range, expected_item_range, expected_level_range| {
6027 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
6028 range,
6029 rep.as_ref(),
6030 def.as_ref(),
6031 max_rep,
6032 max_visible_def,
6033 total_items,
6034 PreambleAction::Skip,
6035 );
6036 assert_eq!(item_range, expected_item_range);
6037 assert_eq!(level_range, expected_level_range);
6038 };
6039
6040 check(0..1, 1..3, 1..3);
6041 check(1..2, 3..3, 3..4);
6042 check(0..2, 1..3, 1..4);
6043
6044 let rep = Some(vec![0, 1, 1, 0]);
6049 let def = Some(vec![0, 1, 0, 0]);
6050 let max_visible_def = 0;
6051 let total_items = 4;
6052
6053 let check = |range, expected_item_range, expected_level_range| {
6054 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
6055 range,
6056 rep.as_ref(),
6057 def.as_ref(),
6058 max_rep,
6059 max_visible_def,
6060 total_items,
6061 PreambleAction::Take,
6062 );
6063 assert_eq!(item_range, expected_item_range);
6064 assert_eq!(level_range, expected_level_range);
6065 };
6066
6067 check(0..1, 0..1, 0..2);
6069 check(0..2, 0..3, 0..4);
6070
6071 let check = |range, expected_item_range, expected_level_range| {
6072 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
6073 range,
6074 rep.as_ref(),
6075 def.as_ref(),
6076 max_rep,
6077 max_visible_def,
6078 total_items,
6079 PreambleAction::Skip,
6080 );
6081 assert_eq!(item_range, expected_item_range);
6082 assert_eq!(level_range, expected_level_range);
6083 };
6084
6085 check(0..1, 1..1, 1..2);
6087 check(1..2, 1..3, 2..4);
6088 check(0..2, 1..3, 1..4);
6089
6090 let rep = Some(vec![0, 1, 0, 1]);
6093 let def: Option<Vec<u16>> = None;
6094 let max_visible_def = 0;
6095 let total_items = 4;
6096
6097 let check = |range, expected_item_range, expected_level_range| {
6098 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
6099 range,
6100 rep.as_ref(),
6101 def.as_ref(),
6102 max_rep,
6103 max_visible_def,
6104 total_items,
6105 PreambleAction::Take,
6106 );
6107 assert_eq!(item_range, expected_item_range);
6108 assert_eq!(level_range, expected_level_range);
6109 };
6110
6111 check(0..1, 0..3, 0..3);
6113 check(0..2, 0..4, 0..4);
6114
6115 let check = |range, expected_item_range, expected_level_range| {
6116 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
6117 range,
6118 rep.as_ref(),
6119 def.as_ref(),
6120 max_rep,
6121 max_visible_def,
6122 total_items,
6123 PreambleAction::Skip,
6124 );
6125 assert_eq!(item_range, expected_item_range);
6126 assert_eq!(level_range, expected_level_range);
6127 };
6128
6129 check(0..1, 1..3, 1..3);
6130 check(1..2, 3..4, 3..4);
6131 check(0..2, 1..4, 1..4);
6132
6133 let rep = Some(vec![2, 1, 2, 0, 1, 2]);
6137 let def = Some(vec![0, 1, 2, 0, 0, 0]);
6138 let max_rep = 2;
6139 let max_visible_def = 0;
6140 let total_items = 4;
6141
6142 let check = |range, expected_item_range, expected_level_range| {
6143 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
6144 range,
6145 rep.as_ref(),
6146 def.as_ref(),
6147 max_rep,
6148 max_visible_def,
6149 total_items,
6150 PreambleAction::Absent,
6151 );
6152 assert_eq!(item_range, expected_item_range);
6153 assert_eq!(level_range, expected_level_range);
6154 };
6155
6156 check(0..3, 0..4, 0..6);
6157 check(0..1, 0..1, 0..2);
6158 check(1..2, 1..3, 2..5);
6159 check(2..3, 3..4, 5..6);
6160
6161 let rep = Some(vec![0, 0, 1, 0, 1, 1]);
6163 let def = Some(vec![0, 1, 0, 0, 0, 0]);
6164 let max_rep = 1;
6165 let max_visible_def = 0;
6166 let total_items = 5;
6167
6168 let check = |range, expected_item_range, expected_level_range| {
6169 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
6170 range,
6171 rep.as_ref(),
6172 def.as_ref(),
6173 max_rep,
6174 max_visible_def,
6175 total_items,
6176 PreambleAction::Take,
6177 );
6178 assert_eq!(item_range, expected_item_range);
6179 assert_eq!(level_range, expected_level_range);
6180 };
6181
6182 check(0..0, 0..1, 0..2);
6183 check(0..1, 0..3, 0..4);
6184 check(0..2, 0..4, 0..5);
6185
6186 let rep = Some(vec![0, 1, 0, 1, 0, 1, 0, 1]);
6189 let def = Some(vec![1, 0, 1, 1, 0, 0, 0, 0]);
6190 let max_rep = 1;
6191 let max_visible_def = 0;
6192 let total_items = 5;
6193
6194 let check = |range, expected_item_range, expected_level_range| {
6195 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
6196 range,
6197 rep.as_ref(),
6198 def.as_ref(),
6199 max_rep,
6200 max_visible_def,
6201 total_items,
6202 PreambleAction::Skip,
6203 );
6204 assert_eq!(item_range, expected_item_range);
6205 assert_eq!(level_range, expected_level_range);
6206 };
6207
6208 check(2..3, 2..4, 5..7);
6209 }
6210
6211 #[test]
6212 fn test_slice_batch_data_and_rebase_offsets_u32() {
6213 let data = LanceBuffer::copy_slice(b"0123456789abcdefghij");
6214 let offsets = LanceBuffer::reinterpret_vec(vec![6_u32, 8_u32, 8_u32, 12_u32]);
6215
6216 let (sliced_data, normalized_offsets) =
6217 VariableFullZipDecoder::slice_batch_data_and_rebase_offsets(&data, &offsets, 32)
6218 .unwrap();
6219
6220 assert_eq!(sliced_data.as_ref(), b"6789ab");
6221 let normalized = normalized_offsets.borrow_to_typed_slice::<u32>();
6222 assert_eq!(normalized.as_ref(), &[0, 2, 2, 6]);
6223 }
6224
6225 #[test]
6226 fn test_slice_batch_data_and_rebase_offsets_u64() {
6227 let data = LanceBuffer::copy_slice(b"abcdefghijklmnopqrstuvwxyz");
6228 let offsets = LanceBuffer::reinterpret_vec(vec![10_u64, 12_u64, 16_u64, 20_u64]);
6229
6230 let (sliced_data, normalized_offsets) =
6231 VariableFullZipDecoder::slice_batch_data_and_rebase_offsets(&data, &offsets, 64)
6232 .unwrap();
6233
6234 assert_eq!(sliced_data.as_ref(), b"klmnopqrst");
6235 let normalized = normalized_offsets.borrow_to_typed_slice::<u64>();
6236 assert_eq!(normalized.as_ref(), &[0, 2, 6, 10]);
6237 }
6238
6239 #[test]
6240 fn test_slice_batch_data_and_rebase_offsets_rejects_invalid_offsets() {
6241 let data = LanceBuffer::copy_slice(b"abcd");
6242 let offsets = LanceBuffer::reinterpret_vec(vec![3_u32, 2_u32]);
6243
6244 let err = VariableFullZipDecoder::slice_batch_data_and_rebase_offsets(&data, &offsets, 32)
6245 .expect_err("offset end before start should error");
6246 assert!(err.to_string().contains("less than base"));
6247 }
6248
6249 #[test]
6250 fn test_schedule_instructions() {
6251 let rep_data: Vec<u64> = vec![5, 2, 3, 0, 4, 7, 2, 0];
6253 let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
6254 let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
6255
6256 let check = |user_ranges, expected_instructions| {
6257 let instructions =
6258 ChunkInstructions::schedule_instructions(&repetition_index, user_ranges);
6259 assert_eq!(instructions, expected_instructions);
6260 };
6261
6262 let expected_take_all = vec![
6264 ChunkInstructions {
6265 chunk_idx: 0,
6266 preamble: PreambleAction::Absent,
6267 rows_to_skip: 0,
6268 rows_to_take: 6,
6269 take_trailer: true,
6270 },
6271 ChunkInstructions {
6272 chunk_idx: 1,
6273 preamble: PreambleAction::Take,
6274 rows_to_skip: 0,
6275 rows_to_take: 2,
6276 take_trailer: false,
6277 },
6278 ChunkInstructions {
6279 chunk_idx: 2,
6280 preamble: PreambleAction::Absent,
6281 rows_to_skip: 0,
6282 rows_to_take: 5,
6283 take_trailer: true,
6284 },
6285 ChunkInstructions {
6286 chunk_idx: 3,
6287 preamble: PreambleAction::Take,
6288 rows_to_skip: 0,
6289 rows_to_take: 1,
6290 take_trailer: false,
6291 },
6292 ];
6293
6294 check(&[0..14], expected_take_all.clone());
6296
6297 check(
6299 &[
6300 0..1,
6301 1..2,
6302 2..3,
6303 3..4,
6304 4..5,
6305 5..6,
6306 6..7,
6307 7..8,
6308 8..9,
6309 9..10,
6310 10..11,
6311 11..12,
6312 12..13,
6313 13..14,
6314 ],
6315 expected_take_all,
6316 );
6317
6318 check(
6322 &[0..1, 3..4],
6323 vec![
6324 ChunkInstructions {
6325 chunk_idx: 0,
6326 preamble: PreambleAction::Absent,
6327 rows_to_skip: 0,
6328 rows_to_take: 1,
6329 take_trailer: false,
6330 },
6331 ChunkInstructions {
6332 chunk_idx: 0,
6333 preamble: PreambleAction::Absent,
6334 rows_to_skip: 3,
6335 rows_to_take: 1,
6336 take_trailer: false,
6337 },
6338 ],
6339 );
6340
6341 check(
6343 &[5..6],
6344 vec![
6345 ChunkInstructions {
6346 chunk_idx: 0,
6347 preamble: PreambleAction::Absent,
6348 rows_to_skip: 5,
6349 rows_to_take: 1,
6350 take_trailer: true,
6351 },
6352 ChunkInstructions {
6353 chunk_idx: 1,
6354 preamble: PreambleAction::Take,
6355 rows_to_skip: 0,
6356 rows_to_take: 0,
6357 take_trailer: false,
6358 },
6359 ],
6360 );
6361
6362 check(
6364 &[7..10],
6365 vec![
6366 ChunkInstructions {
6367 chunk_idx: 1,
6368 preamble: PreambleAction::Skip,
6369 rows_to_skip: 1,
6370 rows_to_take: 1,
6371 take_trailer: false,
6372 },
6373 ChunkInstructions {
6374 chunk_idx: 2,
6375 preamble: PreambleAction::Absent,
6376 rows_to_skip: 0,
6377 rows_to_take: 2,
6378 take_trailer: false,
6379 },
6380 ],
6381 );
6382 }
6383
6384 #[test]
6385 fn test_drain_instructions() {
6386 fn drain_from_instructions(
6387 instructions: &mut VecDeque<ChunkInstructions>,
6388 mut rows_desired: u64,
6389 need_preamble: &mut bool,
6390 skip_in_chunk: &mut u64,
6391 ) -> Vec<ChunkDrainInstructions> {
6392 let mut drain_instructions = Vec::with_capacity(instructions.len());
6394 while rows_desired > 0 || *need_preamble {
6395 let (next_instructions, consumed_chunk) = instructions
6396 .front()
6397 .unwrap()
6398 .drain_from_instruction(&mut rows_desired, need_preamble, skip_in_chunk);
6399 if consumed_chunk {
6400 instructions.pop_front();
6401 }
6402 drain_instructions.push(next_instructions);
6403 }
6404 drain_instructions
6405 }
6406
6407 let rep_data: Vec<u64> = vec![5, 2, 3, 0, 4, 7, 2, 0];
6409 let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
6410 let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
6411 let user_ranges = vec![1..7, 10..14];
6412
6413 let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
6415
6416 let mut to_drain = VecDeque::from(scheduled.clone());
6417
6418 let mut need_preamble = false;
6421 let mut skip_in_chunk = 0;
6422
6423 let next_batch =
6424 drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
6425
6426 assert!(!need_preamble);
6427 assert_eq!(skip_in_chunk, 4);
6428 assert_eq!(
6429 next_batch,
6430 vec![ChunkDrainInstructions {
6431 chunk_instructions: scheduled[0].clone(),
6432 rows_to_take: 4,
6433 rows_to_skip: 0,
6434 preamble_action: PreambleAction::Absent,
6435 }]
6436 );
6437
6438 let next_batch =
6439 drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
6440
6441 assert!(!need_preamble);
6442 assert_eq!(skip_in_chunk, 2);
6443
6444 assert_eq!(
6445 next_batch,
6446 vec![
6447 ChunkDrainInstructions {
6448 chunk_instructions: scheduled[0].clone(),
6449 rows_to_take: 1,
6450 rows_to_skip: 4,
6451 preamble_action: PreambleAction::Absent,
6452 },
6453 ChunkDrainInstructions {
6454 chunk_instructions: scheduled[1].clone(),
6455 rows_to_take: 1,
6456 rows_to_skip: 0,
6457 preamble_action: PreambleAction::Take,
6458 },
6459 ChunkDrainInstructions {
6460 chunk_instructions: scheduled[2].clone(),
6461 rows_to_take: 2,
6462 rows_to_skip: 0,
6463 preamble_action: PreambleAction::Absent,
6464 }
6465 ]
6466 );
6467
6468 let next_batch =
6469 drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
6470
6471 assert!(!need_preamble);
6472 assert_eq!(skip_in_chunk, 0);
6473
6474 assert_eq!(
6475 next_batch,
6476 vec![
6477 ChunkDrainInstructions {
6478 chunk_instructions: scheduled[2].clone(),
6479 rows_to_take: 1,
6480 rows_to_skip: 2,
6481 preamble_action: PreambleAction::Absent,
6482 },
6483 ChunkDrainInstructions {
6484 chunk_instructions: scheduled[3].clone(),
6485 rows_to_take: 1,
6486 rows_to_skip: 0,
6487 preamble_action: PreambleAction::Take,
6488 },
6489 ]
6490 );
6491
6492 let rep_data: Vec<u64> = vec![5, 2, 3, 3, 20, 0];
6494 let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
6495 let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
6496 let user_ranges = vec![0..28];
6497
6498 let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
6500
6501 let mut to_drain = VecDeque::from(scheduled.clone());
6502
6503 let mut need_preamble = false;
6506 let mut skip_in_chunk = 0;
6507
6508 let next_batch =
6509 drain_from_instructions(&mut to_drain, 7, &mut need_preamble, &mut skip_in_chunk);
6510
6511 assert_eq!(
6512 next_batch,
6513 vec![
6514 ChunkDrainInstructions {
6515 chunk_instructions: scheduled[0].clone(),
6516 rows_to_take: 6,
6517 rows_to_skip: 0,
6518 preamble_action: PreambleAction::Absent,
6519 },
6520 ChunkDrainInstructions {
6521 chunk_instructions: scheduled[1].clone(),
6522 rows_to_take: 1,
6523 rows_to_skip: 0,
6524 preamble_action: PreambleAction::Take,
6525 },
6526 ]
6527 );
6528
6529 assert!(!need_preamble);
6530 assert_eq!(skip_in_chunk, 1);
6531
6532 let next_batch =
6535 drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
6536
6537 assert_eq!(
6538 next_batch,
6539 vec![
6540 ChunkDrainInstructions {
6541 chunk_instructions: scheduled[1].clone(),
6542 rows_to_take: 2,
6543 rows_to_skip: 1,
6544 preamble_action: PreambleAction::Skip,
6545 },
6546 ChunkDrainInstructions {
6547 chunk_instructions: scheduled[2].clone(),
6548 rows_to_take: 0,
6549 rows_to_skip: 0,
6550 preamble_action: PreambleAction::Take,
6551 },
6552 ]
6553 );
6554
6555 assert!(!need_preamble);
6556 assert_eq!(skip_in_chunk, 0);
6557 }
6558
6559 #[tokio::test]
6560 async fn test_fullzip_initialize_is_lazy() {
6561 use futures::{FutureExt, future::BoxFuture};
6562 use std::ops::Range;
6563 use std::sync::Mutex;
6564
6565 #[derive(Debug, Clone)]
6566 struct RecordingScheduler {
6567 data: bytes::Bytes,
6568 requests: Arc<Mutex<Vec<Vec<Range<u64>>>>>,
6569 }
6570
6571 impl RecordingScheduler {
6572 fn new(data: bytes::Bytes) -> Self {
6573 Self {
6574 data,
6575 requests: Arc::new(Mutex::new(Vec::new())),
6576 }
6577 }
6578
6579 fn requests(&self) -> Vec<Vec<Range<u64>>> {
6580 self.requests.lock().unwrap().clone()
6581 }
6582 }
6583
6584 impl crate::EncodingsIo for RecordingScheduler {
6585 fn submit_request(
6586 &self,
6587 ranges: Vec<Range<u64>>,
6588 _priority: u64,
6589 ) -> BoxFuture<'static, crate::Result<Vec<bytes::Bytes>>> {
6590 self.requests.lock().unwrap().push(ranges.clone());
6591 let data = ranges
6592 .into_iter()
6593 .map(|range| self.data.slice(range.start as usize..range.end as usize))
6594 .collect::<Vec<_>>();
6595 std::future::ready(Ok(data)).boxed()
6596 }
6597 }
6598
6599 #[derive(Debug)]
6600 struct TestFixedDecompressor;
6601
6602 impl FixedPerValueDecompressor for TestFixedDecompressor {
6603 fn decompress(
6604 &self,
6605 _data: FixedWidthDataBlock,
6606 _num_rows: u64,
6607 ) -> crate::Result<DataBlock> {
6608 unimplemented!("Test decompressor")
6609 }
6610
6611 fn bits_per_value(&self) -> u64 {
6612 32
6613 }
6614 }
6615
6616 let io = Arc::new(RecordingScheduler::new(bytes::Bytes::from(vec![
6617 0;
6618 16 * 1024
6619 ])));
6620 let mut scheduler = FullZipScheduler {
6621 data_buf_position: 0,
6622 data_buf_size: 4096,
6623 rep_index: Some(FullZipRepIndexDetails {
6624 buf_position: 1000,
6625 bytes_per_value: 4,
6626 }),
6627 priority: 0,
6628 rows_in_page: 100,
6629 bits_per_offset: 32,
6630 details: Arc::new(FullZipDecodeDetails {
6631 value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
6632 def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
6633 ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
6634 max_rep: 0,
6635 max_visible_def: 0,
6636 }),
6637 cached_state: None,
6638 enable_cache: false,
6639 };
6640
6641 let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
6642 let cached_data = scheduler.initialize(&io_dyn).await.unwrap();
6643
6644 assert!(
6645 cached_data
6646 .as_arc_any()
6647 .downcast_ref::<super::NoCachedPageData>()
6648 .is_some(),
6649 "FullZip initialize should not eagerly load repetition index data"
6650 );
6651 assert!(scheduler.cached_state.is_none());
6652 assert!(
6653 io.requests().is_empty(),
6654 "FullZip initialize should not issue any I/O"
6655 );
6656 }
6657
6658 #[tokio::test]
6659 async fn test_fullzip_read_source_slices_prefetched_page() {
6660 let page_start = 200_u64;
6661 let page_data = LanceBuffer::copy_slice(&[0, 1, 2, 3, 4, 5, 6, 7]);
6662 let source = FullZipReadSource::PrefetchedPage {
6663 base_offset: page_start,
6664 data: page_data,
6665 };
6666 let ranges = vec![
6667 page_start..(page_start + 3),
6668 (page_start + 4)..(page_start + 8),
6669 ];
6670 let mut data = source.fetch(&ranges, 0).await.unwrap();
6671 assert_eq!(data.pop_front().unwrap().as_ref(), &[0, 1, 2]);
6672 assert_eq!(data.pop_front().unwrap().as_ref(), &[4, 5, 6, 7]);
6673 }
6674
6675 #[tokio::test]
6676 async fn test_fullzip_initialize_caches_rep_index_when_enabled() {
6677 use futures::{FutureExt, future::BoxFuture};
6678 use std::ops::Range;
6679 use std::sync::Mutex;
6680
6681 #[derive(Debug, Clone)]
6682 struct RecordingScheduler {
6683 data: bytes::Bytes,
6684 requests: Arc<Mutex<Vec<Vec<Range<u64>>>>>,
6685 }
6686
6687 impl RecordingScheduler {
6688 fn new(data: bytes::Bytes) -> Self {
6689 Self {
6690 data,
6691 requests: Arc::new(Mutex::new(Vec::new())),
6692 }
6693 }
6694
6695 fn requests(&self) -> Vec<Vec<Range<u64>>> {
6696 self.requests.lock().unwrap().clone()
6697 }
6698 }
6699
6700 impl crate::EncodingsIo for RecordingScheduler {
6701 fn submit_request(
6702 &self,
6703 ranges: Vec<Range<u64>>,
6704 _priority: u64,
6705 ) -> BoxFuture<'static, crate::Result<Vec<bytes::Bytes>>> {
6706 self.requests.lock().unwrap().push(ranges.clone());
6707 let data = ranges
6708 .into_iter()
6709 .map(|range| self.data.slice(range.start as usize..range.end as usize))
6710 .collect::<Vec<_>>();
6711 std::future::ready(Ok(data)).boxed()
6712 }
6713 }
6714
6715 #[derive(Debug)]
6716 struct TestFixedDecompressor;
6717
6718 impl FixedPerValueDecompressor for TestFixedDecompressor {
6719 fn decompress(
6720 &self,
6721 _data: FixedWidthDataBlock,
6722 _num_rows: u64,
6723 ) -> crate::Result<DataBlock> {
6724 unimplemented!("Test decompressor")
6725 }
6726
6727 fn bits_per_value(&self) -> u64 {
6728 32
6729 }
6730 }
6731
6732 let rows_in_page = 100_u64;
6733 let bytes_per_value = 4_u64;
6734 let rep_start = 1000_u64;
6735 let rep_size = ((rows_in_page + 1) * bytes_per_value) as usize;
6736 let mut data = vec![0_u8; 16 * 1024];
6737 data[rep_start as usize..rep_start as usize + rep_size].fill(7);
6738 let io = Arc::new(RecordingScheduler::new(bytes::Bytes::from(data)));
6739
6740 let mut scheduler = FullZipScheduler {
6741 data_buf_position: 0,
6742 data_buf_size: 4096,
6743 rep_index: Some(FullZipRepIndexDetails {
6744 buf_position: rep_start,
6745 bytes_per_value,
6746 }),
6747 priority: 0,
6748 rows_in_page,
6749 bits_per_offset: 32,
6750 details: Arc::new(FullZipDecodeDetails {
6751 value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
6752 def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
6753 ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
6754 max_rep: 0,
6755 max_visible_def: 0,
6756 }),
6757 cached_state: None,
6758 enable_cache: true,
6759 };
6760
6761 let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
6762 let cached_data = scheduler.initialize(&io_dyn).await.unwrap();
6763 assert!(
6764 cached_data
6765 .as_arc_any()
6766 .downcast_ref::<FullZipCacheableState>()
6767 .is_some()
6768 );
6769 assert!(scheduler.cached_state.is_some());
6770 assert_eq!(
6771 io.requests(),
6772 vec![vec![
6773 rep_start..(rep_start + (rows_in_page + 1) * bytes_per_value)
6774 ]]
6775 );
6776 }
6777
6778 #[tokio::test]
6779 async fn test_fullzip_full_page_bypasses_rep_index_io() {
6780 use futures::{FutureExt, future::BoxFuture};
6781 use std::ops::Range;
6782 use std::sync::Mutex;
6783
6784 #[derive(Debug, Clone)]
6785 struct RecordingScheduler {
6786 data: bytes::Bytes,
6787 requests: Arc<Mutex<Vec<Vec<Range<u64>>>>>,
6788 }
6789
6790 impl RecordingScheduler {
6791 fn new(data: bytes::Bytes) -> Self {
6792 Self {
6793 data,
6794 requests: Arc::new(Mutex::new(Vec::new())),
6795 }
6796 }
6797
6798 fn requests(&self) -> Vec<Vec<Range<u64>>> {
6799 self.requests.lock().unwrap().clone()
6800 }
6801 }
6802
6803 impl crate::EncodingsIo for RecordingScheduler {
6804 fn submit_request(
6805 &self,
6806 ranges: Vec<Range<u64>>,
6807 _priority: u64,
6808 ) -> BoxFuture<'static, crate::Result<Vec<bytes::Bytes>>> {
6809 self.requests.lock().unwrap().push(ranges.clone());
6810 let data = ranges
6811 .into_iter()
6812 .map(|range| self.data.slice(range.start as usize..range.end as usize))
6813 .collect::<Vec<_>>();
6814 std::future::ready(Ok(data)).boxed()
6815 }
6816 }
6817
6818 #[derive(Debug)]
6819 struct TestFixedDecompressor;
6820
6821 impl FixedPerValueDecompressor for TestFixedDecompressor {
6822 fn decompress(
6823 &self,
6824 _data: FixedWidthDataBlock,
6825 _num_rows: u64,
6826 ) -> crate::Result<DataBlock> {
6827 unimplemented!("Test decompressor")
6828 }
6829
6830 fn bits_per_value(&self) -> u64 {
6831 32
6832 }
6833 }
6834
6835 let rows_in_page = 100_u64;
6836 let data_start = 256_u64;
6837 let data_size = 500_u64;
6838 let rep_start = 4096_u64;
6839 let bytes_per_value = 4_u64;
6840
6841 let mut bytes = vec![0_u8; 16 * 1024];
6842 for i in 0..=rows_in_page {
6843 let offset = (i * 5) as u32;
6844 let pos = rep_start as usize + (i * bytes_per_value) as usize;
6845 bytes[pos..pos + 4].copy_from_slice(&offset.to_le_bytes());
6846 }
6847 let io = Arc::new(RecordingScheduler::new(bytes::Bytes::from(bytes)));
6848
6849 let scheduler = FullZipScheduler {
6850 data_buf_position: data_start,
6851 data_buf_size: data_size,
6852 rep_index: Some(FullZipRepIndexDetails {
6853 buf_position: rep_start,
6854 bytes_per_value,
6855 }),
6856 priority: 0,
6857 rows_in_page,
6858 bits_per_offset: 32,
6859 details: Arc::new(FullZipDecodeDetails {
6860 value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
6861 def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
6862 ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
6863 max_rep: 0,
6864 max_visible_def: 0,
6865 }),
6866 cached_state: None,
6867 enable_cache: false,
6868 };
6869
6870 let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
6871 let tasks = scheduler
6872 .schedule_ranges_rep(
6873 &[0..rows_in_page],
6874 &io_dyn,
6875 FullZipRepIndexDetails {
6876 buf_position: rep_start,
6877 bytes_per_value,
6878 },
6879 )
6880 .unwrap();
6881
6882 let requests = io.requests();
6883 assert_eq!(requests.len(), 1);
6884 assert_eq!(requests[0], vec![data_start..(data_start + data_size)]);
6885
6886 let _ = tasks.into_iter().next().unwrap().decoder_fut.await.unwrap();
6887 let requests_after_await = io.requests();
6888 assert_eq!(
6889 requests_after_await.len(),
6890 1,
6891 "full page path should not issue rep-index I/O"
6892 );
6893 }
6894
6895 #[tokio::test]
6897 async fn test_fuzz_issue_4492_empty_rep_values() {
6898 use lance_datagen::{RowCount, Seed, array, gen_batch};
6899
6900 let seed = 1823859942947654717u64;
6901 let num_rows = 2741usize;
6902
6903 let batch_gen = gen_batch().with_seed(Seed::from(seed));
6905 let base_generator = array::rand_type(&DataType::FixedSizeBinary(32));
6906 let list_generator = array::rand_list_any(base_generator, false);
6907
6908 let batch = batch_gen
6909 .anon_col(list_generator)
6910 .into_batch_rows(RowCount::from(num_rows as u64))
6911 .unwrap();
6912
6913 let list_array = batch.column(0).clone();
6914
6915 let mut metadata = HashMap::new();
6917 metadata.insert(
6918 STRUCTURAL_ENCODING_META_KEY.to_string(),
6919 STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
6920 );
6921
6922 let test_cases = TestCases::default()
6923 .with_min_file_version(LanceFileVersion::V2_1)
6924 .with_batch_size(100)
6925 .with_range(0..num_rows.min(500) as u64)
6926 .with_indices(vec![0, num_rows as u64 / 2, (num_rows - 1) as u64]);
6927
6928 check_round_trip_encoding_of_data(vec![list_array], &test_cases, metadata).await
6929 }
6930
6931 async fn test_minichunk_size_helper(
6932 string_data: Vec<Option<String>>,
6933 minichunk_size: u64,
6934 file_version: LanceFileVersion,
6935 ) {
6936 use crate::constants::MINICHUNK_SIZE_META_KEY;
6937 use crate::testing::{TestCases, check_round_trip_encoding_of_data};
6938 use arrow_array::{ArrayRef, StringArray};
6939 use std::sync::Arc;
6940
6941 let string_array: ArrayRef = Arc::new(StringArray::from(string_data));
6942
6943 let mut metadata = HashMap::new();
6944 metadata.insert(
6945 MINICHUNK_SIZE_META_KEY.to_string(),
6946 minichunk_size.to_string(),
6947 );
6948 metadata.insert(
6949 STRUCTURAL_ENCODING_META_KEY.to_string(),
6950 STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
6951 );
6952
6953 let test_cases = TestCases::default()
6954 .with_min_file_version(file_version)
6955 .with_batch_size(1000);
6956
6957 check_round_trip_encoding_of_data(vec![string_array], &test_cases, metadata).await;
6958 }
6959
6960 #[tokio::test]
6961 async fn test_minichunk_size_roundtrip() {
6962 let mut string_data = Vec::new();
6964 for i in 0..100 {
6965 string_data.push(Some(format!("test_string_{}", i).repeat(50)));
6966 }
6967 test_minichunk_size_helper(string_data, 64, LanceFileVersion::V2_1).await;
6969 }
6970
6971 #[tokio::test]
6972 async fn test_minichunk_size_128kb_v2_2() {
6973 let mut string_data = Vec::new();
6975 for i in 0..10000 {
6977 string_data.push(Some(format!("test_string_{}", i).repeat(50)));
6978 }
6979 test_minichunk_size_helper(string_data, 128 * 1024, LanceFileVersion::V2_2).await;
6980 }
6981
6982 #[tokio::test]
6983 async fn test_binary_large_minichunk_size_over_max_miniblock_values() {
6984 let mut string_data = Vec::new();
6985 for i in 0..10000 {
6987 string_data.push(Some(format!("t_{}", i)));
6988 }
6989 test_minichunk_size_helper(string_data, 128 * 1024, LanceFileVersion::V2_2).await;
6990 }
6991
6992 #[tokio::test]
6993 async fn test_large_dictionary_general_compression() {
6994 use arrow_array::{ArrayRef, StringArray};
6995 use std::collections::HashMap;
6996 use std::sync::Arc;
6997
6998 let unique_values: Vec<String> = (0..100)
7001 .map(|i| format!("value_{:04}_{}", i, "x".repeat(500)))
7002 .collect();
7003
7004 let repeated_strings: Vec<_> = unique_values
7006 .iter()
7007 .cycle()
7008 .take(100_000)
7009 .map(|s| Some(s.as_str()))
7010 .collect();
7011
7012 let string_array = Arc::new(StringArray::from(repeated_strings)) as ArrayRef;
7013
7014 let test_cases = TestCases::default()
7016 .with_min_file_version(LanceFileVersion::V2_2)
7017 .with_verify_encoding(Arc::new(|cols: &[crate::encoder::EncodedColumn], _| {
7018 assert_eq!(cols.len(), 1);
7019 let col = &cols[0];
7020
7021 if let Some(PageEncoding::Structural(page_layout)) =
7023 &col.final_pages.first().map(|p| &p.description)
7024 && let Some(pb21::page_layout::Layout::MiniBlockLayout(mini_block)) =
7025 &page_layout.layout
7026 && let Some(dictionary_encoding) = &mini_block.dictionary
7027 {
7028 match dictionary_encoding.compression.as_ref() {
7029 Some(Compression::General(general)) => {
7030 let compression = general.compression.as_ref().unwrap();
7032 assert!(
7033 compression.scheme()
7034 == pb21::CompressionScheme::CompressionAlgorithmLz4
7035 || compression.scheme()
7036 == pb21::CompressionScheme::CompressionAlgorithmZstd,
7037 "Expected LZ4 or Zstd compression for large dictionary"
7038 );
7039 }
7040 _ => panic!("Expected General compression for large dictionary"),
7041 }
7042 }
7043 }));
7044
7045 check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
7046 }
7047
7048 fn dictionary_encoding_from_page(
7049 page: &crate::encoder::EncodedPage,
7050 ) -> &crate::format::pb21::CompressiveEncoding {
7051 let PageEncoding::Structural(layout) = &page.description else {
7052 panic!("Expected structural page encoding");
7053 };
7054 let pb21::page_layout::Layout::MiniBlockLayout(layout) = layout.layout.as_ref().unwrap()
7055 else {
7056 panic!("Expected mini-block layout");
7057 };
7058 layout
7059 .dictionary
7060 .as_ref()
7061 .unwrap_or_else(|| panic!("Expected dictionary encoding"))
7062 }
7063
7064 async fn encode_variable_dict_page(
7065 metadata: HashMap<String, String>,
7066 ) -> crate::encoder::EncodedPage {
7067 use arrow_array::types::Int32Type;
7068 use arrow_array::{ArrayRef, DictionaryArray, Int32Array, StringArray};
7069
7070 let values = Arc::new(StringArray::from(
7071 (0..128)
7072 .map(|i| format!("value_{i:04}_{}", "x".repeat(256)))
7073 .collect::<Vec<_>>(),
7074 )) as ArrayRef;
7075 let keys = Int32Array::from_iter_values((0..20_000).map(|i| i % 128));
7076 let dict_array =
7077 Arc::new(DictionaryArray::<Int32Type>::try_new(keys, values).unwrap()) as ArrayRef;
7078
7079 let field = arrow_schema::Field::new(
7080 "dict_col",
7081 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
7082 false,
7083 )
7084 .with_metadata(metadata);
7085
7086 encode_first_page(field, dict_array, LanceFileVersion::V2_2).await
7087 }
7088
7089 async fn encode_auto_fixed_dict_page(
7090 metadata: HashMap<String, String>,
7091 ) -> crate::encoder::EncodedPage {
7092 use arrow_array::{ArrayRef, Decimal128Array};
7093
7094 let values = (0..20_000)
7096 .map(|i| match i % 3 {
7097 0 => 10_i128,
7098 1 => 20_i128,
7099 _ => 30_i128,
7100 })
7101 .collect::<Vec<_>>();
7102 let decimal = Decimal128Array::from_iter_values(values)
7103 .with_precision_and_scale(38, 0)
7104 .unwrap();
7105 let decimal = Arc::new(decimal) as ArrayRef;
7106
7107 let mut field_metadata = metadata;
7108 field_metadata.insert(
7110 "lance-encoding:dict-size-ratio".to_string(),
7111 "0.99".to_string(),
7112 );
7113 let field = arrow_schema::Field::new("fixed_col", DataType::Decimal128(38, 0), false)
7114 .with_metadata(field_metadata);
7115
7116 encode_first_page(field, decimal, LanceFileVersion::V2_2).await
7117 }
7118
7119 #[tokio::test]
7120 async fn test_dict_values_general_compression_default_lz4_for_variable_dict_values() {
7121 let page = encode_variable_dict_page(HashMap::new()).await;
7122 let dictionary_encoding = dictionary_encoding_from_page(&page);
7123 let Some(Compression::General(general)) = dictionary_encoding.compression.as_ref() else {
7124 panic!("Expected General compression for dictionary values");
7125 };
7126 let compression = general.compression.as_ref().unwrap();
7127 assert_eq!(
7128 compression.scheme(),
7129 pb21::CompressionScheme::CompressionAlgorithmLz4
7130 );
7131 }
7132
7133 #[tokio::test]
7134 async fn test_dict_values_general_compression_default_lz4_for_fixed_dict_values() {
7135 let page = encode_auto_fixed_dict_page(HashMap::new()).await;
7136 let dictionary_encoding = dictionary_encoding_from_page(&page);
7137 let Some(Compression::General(general)) = dictionary_encoding.compression.as_ref() else {
7138 panic!("Expected General compression for dictionary values");
7139 };
7140 let compression = general.compression.as_ref().unwrap();
7141 assert_eq!(
7142 compression.scheme(),
7143 pb21::CompressionScheme::CompressionAlgorithmLz4
7144 );
7145 }
7146
7147 #[tokio::test]
7148 async fn test_dict_values_general_compression_zstd() {
7149 let mut metadata = HashMap::new();
7150 metadata.insert(
7151 DICT_VALUES_COMPRESSION_META_KEY.to_string(),
7152 "zstd".to_string(),
7153 );
7154 let page = encode_variable_dict_page(metadata).await;
7155 let dictionary_encoding = dictionary_encoding_from_page(&page);
7156 let Some(Compression::General(general)) = dictionary_encoding.compression.as_ref() else {
7157 panic!("Expected General compression for dictionary values");
7158 };
7159 let compression = general.compression.as_ref().unwrap();
7160 assert_eq!(
7161 compression.scheme(),
7162 pb21::CompressionScheme::CompressionAlgorithmZstd
7163 );
7164 }
7165
7166 #[tokio::test]
7167 async fn test_dict_values_general_compression_none() {
7168 let mut metadata = HashMap::new();
7169 metadata.insert(
7170 DICT_VALUES_COMPRESSION_META_KEY.to_string(),
7171 "none".to_string(),
7172 );
7173 let page = encode_variable_dict_page(metadata).await;
7174 let dictionary_encoding = dictionary_encoding_from_page(&page);
7175 assert!(
7176 !matches!(
7177 dictionary_encoding.compression.as_ref(),
7178 Some(Compression::General(_))
7179 ),
7180 "Expected dictionary values to avoid General compression"
7181 );
7182 }
7183
7184 #[test]
7185 fn test_resolve_dict_values_compression_metadata_defaults_to_lz4() {
7186 let metadata = PrimitiveStructuralEncoder::resolve_dict_values_compression_metadata(
7187 &HashMap::new(),
7188 None,
7189 None,
7190 );
7191 assert_eq!(metadata.get(COMPRESSION_META_KEY), Some(&"lz4".to_string()),);
7192 assert!(!metadata.contains_key(COMPRESSION_LEVEL_META_KEY));
7193 }
7194
7195 #[test]
7196 fn test_resolve_dict_values_compression_metadata_metadata_overrides_env() {
7197 let field_metadata = HashMap::from([
7198 (
7199 DICT_VALUES_COMPRESSION_META_KEY.to_string(),
7200 "none".to_string(),
7201 ),
7202 (
7203 DICT_VALUES_COMPRESSION_LEVEL_META_KEY.to_string(),
7204 "7".to_string(),
7205 ),
7206 ]);
7207 let metadata = PrimitiveStructuralEncoder::resolve_dict_values_compression_metadata(
7208 &field_metadata,
7209 Some("zstd".to_string()),
7210 Some("3".to_string()),
7211 );
7212 assert_eq!(
7213 metadata.get(COMPRESSION_META_KEY),
7214 Some(&"none".to_string()),
7215 );
7216 assert_eq!(
7217 metadata.get(COMPRESSION_LEVEL_META_KEY),
7218 Some(&"7".to_string()),
7219 );
7220 }
7221
7222 #[test]
7223 fn test_resolve_dict_values_compression_metadata_env_fallback() {
7224 let metadata = PrimitiveStructuralEncoder::resolve_dict_values_compression_metadata(
7225 &HashMap::new(),
7226 Some("zstd".to_string()),
7227 Some("9".to_string()),
7228 );
7229 assert_eq!(
7230 metadata.get(COMPRESSION_META_KEY),
7231 Some(&"zstd".to_string()),
7232 );
7233 assert_eq!(
7234 metadata.get(COMPRESSION_LEVEL_META_KEY),
7235 Some(&"9".to_string()),
7236 );
7237 }
7238
7239 #[tokio::test]
7240 async fn test_dictionary_encode_int64() {
7241 use crate::constants::{DICT_SIZE_RATIO_META_KEY, STRUCTURAL_ENCODING_META_KEY};
7242 use crate::testing::{TestCases, check_round_trip_encoding_of_data};
7243 use crate::version::LanceFileVersion;
7244 use arrow_array::{ArrayRef, Int64Array};
7245 use std::collections::HashMap;
7246 use std::sync::Arc;
7247
7248 let values = (0..1000)
7250 .map(|i| match i % 3 {
7251 0 => 10i64,
7252 1 => 20i64,
7253 _ => 30i64,
7254 })
7255 .collect::<Vec<_>>();
7256 let array = Arc::new(Int64Array::from(values)) as ArrayRef;
7257
7258 let mut metadata = HashMap::new();
7259 metadata.insert(
7260 STRUCTURAL_ENCODING_META_KEY.to_string(),
7261 STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
7262 );
7263 metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.99".to_string());
7264
7265 let test_cases = TestCases::default()
7266 .with_min_file_version(LanceFileVersion::V2_2)
7267 .with_batch_size(1000)
7268 .with_range(0..1000)
7269 .with_indices(vec![0, 1, 10, 999])
7270 .with_expected_encoding("dictionary");
7271
7272 check_round_trip_encoding_of_data(vec![array], &test_cases, metadata).await;
7273 }
7274
7275 #[tokio::test]
7276 async fn test_dictionary_encode_float64() {
7277 use crate::constants::{DICT_SIZE_RATIO_META_KEY, STRUCTURAL_ENCODING_META_KEY};
7278 use crate::testing::{TestCases, check_round_trip_encoding_of_data};
7279 use crate::version::LanceFileVersion;
7280 use arrow_array::{ArrayRef, Float64Array};
7281 use std::collections::HashMap;
7282 use std::sync::Arc;
7283
7284 let values = (0..1000)
7286 .map(|i| match i % 3 {
7287 0 => 0.1f64,
7288 1 => 0.2f64,
7289 _ => 0.3f64,
7290 })
7291 .collect::<Vec<_>>();
7292 let array = Arc::new(Float64Array::from(values)) as ArrayRef;
7293
7294 let mut metadata = HashMap::new();
7295 metadata.insert(
7296 STRUCTURAL_ENCODING_META_KEY.to_string(),
7297 STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
7298 );
7299 metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.99".to_string());
7300
7301 let test_cases = TestCases::default()
7302 .with_min_file_version(LanceFileVersion::V2_2)
7303 .with_batch_size(1000)
7304 .with_range(0..1000)
7305 .with_indices(vec![0, 1, 10, 999])
7306 .with_expected_encoding("dictionary");
7307
7308 check_round_trip_encoding_of_data(vec![array], &test_cases, metadata).await;
7309 }
7310
7311 #[test]
7312 fn test_miniblock_dictionary_out_of_line_bitpacking_decode() {
7313 let rows = 10_000;
7314 let unique_values = 2_000;
7315
7316 let dictionary_encoding =
7317 ProtobufUtils21::out_of_line_bitpacking(64, ProtobufUtils21::flat(11, None));
7318 let layout = pb21::MiniBlockLayout {
7319 rep_compression: None,
7320 def_compression: None,
7321 value_compression: Some(ProtobufUtils21::flat(64, None)),
7322 dictionary: Some(dictionary_encoding),
7323 num_dictionary_items: unique_values,
7324 layers: vec![pb21::RepDefLayer::RepdefAllValidItem as i32],
7325 num_buffers: 1,
7326 repetition_index_depth: 0,
7327 num_items: rows,
7328 has_large_chunk: false,
7329 };
7330
7331 let buffer_offsets_and_sizes = vec![(0, 0), (0, 0), (0, 0)];
7332 let scheduler = super::MiniBlockScheduler::try_new(
7333 &buffer_offsets_and_sizes,
7334 0,
7335 rows,
7336 &layout,
7337 &DefaultDecompressionStrategy::default(),
7338 )
7339 .unwrap();
7340
7341 let dictionary = scheduler.dictionary.unwrap();
7342 assert_eq!(dictionary.num_dictionary_items, unique_values);
7343 assert_eq!(
7344 dictionary.dictionary_data_alignment,
7345 crate::encoder::MIN_PAGE_BUFFER_ALIGNMENT
7346 );
7347 }
7348
7349 fn create_test_fixed_data_block(
7351 num_values: u64,
7352 cardinality: u64,
7353 bits_per_value: u64,
7354 ) -> DataBlock {
7355 assert!(cardinality > 0);
7356 assert!(cardinality <= num_values);
7357 let block_info = BlockInfo::default();
7358
7359 assert_eq!(bits_per_value % 8, 0);
7360 let data = match bits_per_value {
7361 32 => {
7362 let values = (0..num_values)
7363 .map(|i| (i % cardinality) as u32)
7364 .collect::<Vec<_>>();
7365 crate::buffer::LanceBuffer::reinterpret_vec(values)
7366 }
7367 64 => {
7368 let values = (0..num_values).map(|i| i % cardinality).collect::<Vec<_>>();
7369 crate::buffer::LanceBuffer::reinterpret_vec(values)
7370 }
7371 128 => {
7372 let values = (0..num_values)
7373 .map(|i| (i % cardinality) as u128)
7374 .collect::<Vec<_>>();
7375 crate::buffer::LanceBuffer::reinterpret_vec(values)
7376 }
7377 _ => unreachable!(),
7378 };
7379 DataBlock::FixedWidth(FixedWidthDataBlock {
7380 bits_per_value,
7381 data,
7382 num_values,
7383 block_info,
7384 })
7385 }
7386
7387 fn create_test_variable_width_block(num_values: u64, cardinality: u64) -> DataBlock {
7389 use arrow_array::StringArray;
7390
7391 assert!(cardinality <= num_values && cardinality > 0);
7392
7393 let mut values = Vec::with_capacity(num_values as usize);
7394 for i in 0..num_values {
7395 values.push(format!("value_{:016}", i % cardinality));
7396 }
7397
7398 let array = StringArray::from(values);
7399 DataBlock::from_array(Arc::new(array) as ArrayRef)
7400 }
7401
7402 fn create_sorted_string_array(num_values: u64, cardinality: u64) -> ArrayRef {
7403 use arrow_array::StringArray;
7404
7405 assert!(cardinality <= num_values && cardinality > 0);
7406
7407 let mut values = Vec::with_capacity(num_values as usize);
7408 for i in 0..num_values {
7409 let value_idx = i * cardinality / num_values;
7410 values.push(format!("value_{:016}", value_idx));
7411 }
7412
7413 Arc::new(StringArray::from(values)) as ArrayRef
7414 }
7415
7416 fn create_sorted_variable_width_block(num_values: u64, cardinality: u64) -> DataBlock {
7417 DataBlock::from_array(create_sorted_string_array(num_values, cardinality))
7418 }
7419
7420 #[test]
7421 fn test_should_dictionary_encode() {
7422 use crate::constants::DICT_SIZE_RATIO_META_KEY;
7423 use lance_core::datatypes::Field as LanceField;
7424
7425 let block = create_test_variable_width_block(1000, 10);
7427
7428 let mut metadata = HashMap::new();
7429 metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.8".to_string());
7430 let arrow_field =
7431 arrow_schema::Field::new("test", DataType::Utf8, false).with_metadata(metadata);
7432 let field = LanceField::try_from(&arrow_field).unwrap();
7433
7434 let result = PrimitiveStructuralEncoder::should_dictionary_encode(
7435 &block,
7436 &field,
7437 LanceFileVersion::V2_1,
7438 );
7439
7440 assert!(
7441 result.is_some(),
7442 "Should use dictionary encode based on size"
7443 );
7444 }
7445
7446 #[test]
7447 fn test_block_sampling_detects_low_cardinality_in_short_sorted_runs() {
7448 let sample_count: usize = 4096;
7449 let num_values: u64 = 200_000;
7450 let cardinality: u64 = 8_000;
7451 let run_length = num_values / cardinality;
7452 let stride = num_values as usize / sample_count;
7453 assert!(
7454 stride > run_length as usize,
7455 "test must construct the stride > run_length case"
7456 );
7457
7458 let block = create_sorted_variable_width_block(num_values, cardinality);
7459 let sample_unique_ratio =
7460 PrimitiveStructuralEncoder::sample_unique_ratio(&block, sample_count).unwrap();
7461
7462 assert!(
7463 sample_unique_ratio.is_some_and(|ratio| ratio < 0.98),
7464 "sorted low-cardinality data must not be classified as near-unique"
7465 );
7466 }
7467
7468 #[test]
7469 fn test_should_dictionary_encode_sorted_low_cardinality() {
7470 use crate::constants::DICT_SIZE_RATIO_META_KEY;
7471 use lance_core::datatypes::Field as LanceField;
7472
7473 let block = create_sorted_variable_width_block(200_000, 8_000);
7474
7475 let mut metadata = HashMap::new();
7476 metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.8".to_string());
7477 let arrow_field =
7478 arrow_schema::Field::new("test", DataType::Utf8, false).with_metadata(metadata);
7479 let field = LanceField::try_from(&arrow_field).unwrap();
7480
7481 let result = PrimitiveStructuralEncoder::should_dictionary_encode(
7482 &block,
7483 &field,
7484 LanceFileVersion::V2_2,
7485 );
7486
7487 assert!(
7488 result.is_some(),
7489 "sorted low-cardinality data should reach dictionary encoding"
7490 );
7491 }
7492
7493 #[test]
7494 fn test_should_not_dictionary_encode_sorted_high_cardinality_short_runs() {
7495 use crate::constants::DICT_SIZE_RATIO_META_KEY;
7496 use lance_core::datatypes::Field as LanceField;
7497
7498 let num_values = 200_002;
7499 let cardinality = 100_001;
7500 let block = create_sorted_variable_width_block(num_values, cardinality);
7501
7502 let mut metadata = HashMap::new();
7503 metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.8".to_string());
7504 let arrow_field =
7505 arrow_schema::Field::new("test", DataType::Utf8, false).with_metadata(metadata);
7506 let field = LanceField::try_from(&arrow_field).unwrap();
7507
7508 let result = PrimitiveStructuralEncoder::should_dictionary_encode(
7509 &block,
7510 &field,
7511 LanceFileVersion::V2_2,
7512 );
7513
7514 assert!(
7515 result.is_none(),
7516 "sorted high-cardinality short runs should not trigger a full dictionary probe"
7517 );
7518 }
7519
7520 #[tokio::test]
7521 async fn test_encode_sorted_low_cardinality_uses_dictionary_layout() {
7522 use crate::constants::DICT_SIZE_RATIO_META_KEY;
7523
7524 let mut metadata = HashMap::new();
7525 metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.8".to_string());
7526 let field = arrow_schema::Field::new("test", DataType::Utf8, false).with_metadata(metadata);
7527 let array = create_sorted_string_array(200_000, 8_000);
7528
7529 let page = encode_first_page(field, array, LanceFileVersion::V2_2).await;
7530 let _ = dictionary_encoding_from_page(&page);
7531 }
7532
7533 #[test]
7534 fn test_should_not_dictionary_encode_unsupported_bits() {
7535 use crate::constants::DICT_SIZE_RATIO_META_KEY;
7536 use lance_core::datatypes::Field as LanceField;
7537
7538 let block = create_test_fixed_data_block(1000, 1000, 32);
7539
7540 let mut metadata = HashMap::new();
7541 metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.8".to_string());
7542 let arrow_field =
7543 arrow_schema::Field::new("test", DataType::Int32, false).with_metadata(metadata);
7544 let field = LanceField::try_from(&arrow_field).unwrap();
7545
7546 let result = PrimitiveStructuralEncoder::should_dictionary_encode(
7547 &block,
7548 &field,
7549 LanceFileVersion::V2_1,
7550 );
7551
7552 assert!(
7553 result.is_none(),
7554 "Should not use dictionary encode for unsupported bit width"
7555 );
7556 }
7557
7558 #[test]
7559 fn test_should_not_dictionary_encode_near_unique_sample() {
7560 use crate::constants::DICT_SIZE_RATIO_META_KEY;
7561 use lance_core::datatypes::Field as LanceField;
7562
7563 let num_values = 5000;
7564 let block = create_test_variable_width_block(num_values, num_values);
7565
7566 let mut metadata = HashMap::new();
7567 metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "1.0".to_string());
7568 let arrow_field =
7569 arrow_schema::Field::new("test", DataType::Utf8, false).with_metadata(metadata);
7570 let field = LanceField::try_from(&arrow_field).unwrap();
7571
7572 let result = PrimitiveStructuralEncoder::should_dictionary_encode(
7573 &block,
7574 &field,
7575 LanceFileVersion::V2_1,
7576 );
7577
7578 assert!(
7579 result.is_none(),
7580 "Should not probe dictionary encoding for near-unique data"
7581 );
7582 }
7583
7584 #[test]
7585 fn test_v2_1_miniblock_serializes_log_num_values_15() {
7586 let miniblocks = MiniBlockCompressed {
7587 data: vec![LanceBuffer::from(vec![1_u8; 16])],
7588 chunks: vec![
7589 MiniBlockChunk {
7590 buffer_sizes: vec![8],
7591 log_num_values: 15,
7592 },
7593 MiniBlockChunk {
7594 buffer_sizes: vec![8],
7595 log_num_values: 0,
7596 },
7597 ],
7598 num_values: 32_769,
7599 };
7600
7601 let serialized =
7602 PrimitiveStructuralEncoder::serialize_miniblocks(miniblocks, None, None, false)
7603 .unwrap();
7604
7605 let chunk_metadata = serialized.metadata.borrow_to_typed_slice::<u16>();
7606 assert_eq!(chunk_metadata.len(), 2);
7607 assert_eq!(
7608 chunk_metadata[0] & 0x0F,
7609 15,
7610 "V2.1 metadata should use all 4 bits for log_num_values"
7611 );
7612 }
7613
7614 async fn encode_first_page(
7615 field: arrow_schema::Field,
7616 array: ArrayRef,
7617 version: LanceFileVersion,
7618 ) -> crate::encoder::EncodedPage {
7619 use crate::encoder::{
7620 ColumnIndexSequence, EncodingOptions, MIN_PAGE_BUFFER_ALIGNMENT, OutOfLineBuffers,
7621 default_encoding_strategy,
7622 };
7623 use crate::repdef::RepDefBuilder;
7624
7625 let lance_field = lance_core::datatypes::Field::try_from(&field).unwrap();
7626 let encoding_strategy = default_encoding_strategy(version);
7627 let mut column_index_seq = ColumnIndexSequence::default();
7628 let encoding_options = EncodingOptions {
7629 cache_bytes_per_column: 1,
7630 max_page_bytes: 32 * 1024 * 1024,
7631 keep_original_array: true,
7632 buffer_alignment: MIN_PAGE_BUFFER_ALIGNMENT,
7633 version,
7634 };
7635
7636 let mut encoder = encoding_strategy
7637 .create_field_encoder(
7638 encoding_strategy.as_ref(),
7639 &lance_field,
7640 &mut column_index_seq,
7641 &encoding_options,
7642 )
7643 .unwrap();
7644
7645 let mut external_buffers = OutOfLineBuffers::new(0, MIN_PAGE_BUFFER_ALIGNMENT);
7646 let repdef = RepDefBuilder::default();
7647 let num_rows = array.len() as u64;
7648 let mut pages = Vec::new();
7649 for task in encoder
7650 .maybe_encode(array, &mut external_buffers, repdef, 0, num_rows)
7651 .unwrap()
7652 {
7653 pages.push(task.await.unwrap());
7654 }
7655 for task in encoder.flush(&mut external_buffers).unwrap() {
7656 pages.push(task.await.unwrap());
7657 }
7658 pages.into_iter().next().unwrap()
7659 }
7660
7661 #[tokio::test]
7662 async fn test_constant_layout_out_of_line_fixed_size_binary_v2_2() {
7663 use crate::format::pb21::page_layout::Layout;
7664
7665 let val = vec![0xABu8; 33];
7666 let arr: ArrayRef = Arc::new(
7667 arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
7668 std::iter::repeat_n(Some(val.as_slice()), 256),
7669 33,
7670 )
7671 .unwrap(),
7672 );
7673 let field = arrow_schema::Field::new("c", DataType::FixedSizeBinary(33), true);
7674 let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7675
7676 let PageEncoding::Structural(layout) = &page.description else {
7677 panic!("Expected structural encoding");
7678 };
7679 let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
7680 panic!("Expected constant layout in slot 2");
7681 };
7682 assert!(layout.inline_value.is_none());
7683 assert_eq!(page.data.len(), 1);
7684
7685 let test_cases = TestCases::default()
7686 .with_min_file_version(LanceFileVersion::V2_2)
7687 .with_max_file_version(LanceFileVersion::V2_2)
7688 .with_page_sizes(vec![4096]);
7689 check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7690 }
7691
7692 #[tokio::test]
7693 async fn test_constant_layout_out_of_line_utf8_v2_2() {
7694 use crate::format::pb21::page_layout::Layout;
7695
7696 let arr: ArrayRef = Arc::new(arrow_array::StringArray::from_iter_values(
7697 std::iter::repeat_n("hello", 512),
7698 ));
7699 let field = arrow_schema::Field::new("c", DataType::Utf8, true);
7700 let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7701
7702 let PageEncoding::Structural(layout) = &page.description else {
7703 panic!("Expected structural encoding");
7704 };
7705 let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
7706 panic!("Expected constant layout in slot 2");
7707 };
7708 assert!(layout.inline_value.is_none());
7709 assert_eq!(page.data.len(), 1);
7710
7711 let test_cases = TestCases::default()
7712 .with_min_file_version(LanceFileVersion::V2_2)
7713 .with_max_file_version(LanceFileVersion::V2_2)
7714 .with_page_sizes(vec![4096]);
7715 check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7716 }
7717
7718 #[tokio::test]
7719 async fn test_constant_layout_nullable_item_v2_2() {
7720 use crate::format::pb21::page_layout::Layout;
7721
7722 let arr: ArrayRef = Arc::new(arrow_array::Int32Array::from(vec![
7723 Some(7),
7724 None,
7725 Some(7),
7726 None,
7727 Some(7),
7728 ]));
7729 let field = arrow_schema::Field::new("c", DataType::Int32, true);
7730 let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7731
7732 let PageEncoding::Structural(layout) = &page.description else {
7733 panic!("Expected structural encoding");
7734 };
7735 let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
7736 panic!("Expected constant layout in slot 2");
7737 };
7738 assert!(layout.inline_value.is_some());
7739 assert_eq!(page.data.len(), 2);
7740
7741 let test_cases = TestCases::default()
7742 .with_min_file_version(LanceFileVersion::V2_2)
7743 .with_max_file_version(LanceFileVersion::V2_2)
7744 .with_page_sizes(vec![4096]);
7745 check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7746 }
7747
7748 #[tokio::test]
7749 async fn test_constant_layout_list_repdef_v2_2() {
7750 use crate::format::pb21::page_layout::Layout;
7751 use arrow_array::builder::{Int32Builder, ListBuilder};
7752
7753 let mut builder = ListBuilder::new(Int32Builder::new());
7754 builder.values().append_value(7);
7755 builder.values().append_null();
7756 builder.values().append_value(7);
7757 builder.append(true);
7758
7759 builder.append(true);
7760
7761 builder.values().append_value(7);
7762 builder.append(true);
7763
7764 builder.append_null();
7765
7766 let arr: ArrayRef = Arc::new(builder.finish());
7767 let field = arrow_schema::Field::new(
7768 "c",
7769 DataType::List(Arc::new(arrow_schema::Field::new(
7770 "item",
7771 DataType::Int32,
7772 true,
7773 ))),
7774 true,
7775 );
7776 let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7777
7778 let PageEncoding::Structural(layout) = &page.description else {
7779 panic!("Expected structural encoding");
7780 };
7781 let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
7782 panic!("Expected constant layout in slot 2");
7783 };
7784 assert!(layout.inline_value.is_some());
7785 assert_eq!(page.data.len(), 2);
7786
7787 let test_cases = TestCases::default()
7788 .with_min_file_version(LanceFileVersion::V2_2)
7789 .with_max_file_version(LanceFileVersion::V2_2)
7790 .with_page_sizes(vec![4096]);
7791 check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7792 }
7793
7794 #[tokio::test]
7795 async fn test_constant_layout_fixed_size_list_not_used_v2_2() {
7796 use crate::format::pb21::page_layout::Layout;
7797 use arrow_array::builder::{FixedSizeListBuilder, Int32Builder};
7798
7799 let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 3);
7800 for _ in 0..64 {
7801 builder.values().append_value(1);
7802 builder.values().append_null();
7803 builder.values().append_value(3);
7804 builder.append(true);
7805 }
7806 let arr: ArrayRef = Arc::new(builder.finish());
7807 let field = arrow_schema::Field::new(
7808 "c",
7809 DataType::FixedSizeList(
7810 Arc::new(arrow_schema::Field::new("item", DataType::Int32, true)),
7811 3,
7812 ),
7813 true,
7814 );
7815 let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7816
7817 if let PageEncoding::Structural(layout) = &page.description {
7818 assert!(
7819 !matches!(layout.layout.as_ref().unwrap(), Layout::ConstantLayout(_)),
7820 "FixedSizeList should not use constant layout yet"
7821 );
7822 }
7823
7824 let test_cases = TestCases::default()
7825 .with_min_file_version(LanceFileVersion::V2_2)
7826 .with_max_file_version(LanceFileVersion::V2_2)
7827 .with_page_sizes(vec![4096]);
7828 check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7829 }
7830
7831 #[tokio::test]
7832 async fn test_constant_layout_not_written_before_v2_2() {
7833 use crate::format::pb21::page_layout::Layout;
7834
7835 let arr: ArrayRef = Arc::new(arrow_array::Int32Array::from(vec![7; 1024]));
7836 let field = arrow_schema::Field::new("c", DataType::Int32, true);
7837 let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_1).await;
7838
7839 let PageEncoding::Structural(layout) = &page.description else {
7840 return;
7841 };
7842 assert!(
7843 !matches!(layout.layout.as_ref().unwrap(), Layout::ConstantLayout(_)),
7844 "Should not emit constant layout before v2.2"
7845 );
7846
7847 let test_cases = TestCases::default()
7848 .with_min_file_version(LanceFileVersion::V2_1)
7849 .with_max_file_version(LanceFileVersion::V2_1)
7850 .with_page_sizes(vec![4096]);
7851 check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7852 }
7853
7854 #[tokio::test]
7855 async fn test_all_null_constant_layout_still_works_v2_2() {
7856 use crate::format::pb21::page_layout::Layout;
7857
7858 let arr: ArrayRef = Arc::new(arrow_array::Int32Array::from(vec![None, None, None]));
7859 let field = arrow_schema::Field::new("c", DataType::Int32, true);
7860 let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
7861
7862 let PageEncoding::Structural(layout) = &page.description else {
7863 panic!("Expected structural encoding");
7864 };
7865 let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
7866 panic!("Expected layout in slot 2");
7867 };
7868 assert!(layout.inline_value.is_none());
7869 assert_eq!(page.data.len(), 0);
7870
7871 let test_cases = TestCases::default()
7872 .with_min_file_version(LanceFileVersion::V2_2)
7873 .with_max_file_version(LanceFileVersion::V2_2)
7874 .with_page_sizes(vec![4096]);
7875 check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
7876 }
7877
7878 #[test]
7879 fn test_encode_decode_complex_all_null_vals_roundtrip() {
7880 use crate::compression::{
7881 DecompressionStrategy, DefaultCompressionStrategy, DefaultDecompressionStrategy,
7882 };
7883
7884 let values: Arc<[u16]> = Arc::from((0..2048).map(|i| (i % 5) as u16).collect::<Vec<u16>>());
7885
7886 let compression_strategy = DefaultCompressionStrategy::default();
7887 let decompression_strategy = DefaultDecompressionStrategy::default();
7888
7889 let (compressed_buf, encoding) = PrimitiveStructuralEncoder::encode_complex_all_null_vals(
7890 &values,
7891 &compression_strategy,
7892 )
7893 .unwrap();
7894
7895 let decompressor = decompression_strategy
7896 .create_block_decompressor(&encoding)
7897 .unwrap();
7898 let decompressed = decompressor
7899 .decompress(compressed_buf, values.len() as u64)
7900 .unwrap();
7901 let decompressed_fixed_width = decompressed.as_fixed_width().unwrap();
7902 assert_eq!(decompressed_fixed_width.num_values, values.len() as u64);
7903 assert_eq!(decompressed_fixed_width.bits_per_value, 16);
7904 let rep_result = decompressed_fixed_width.data.borrow_to_typed_slice::<u16>();
7905 assert_eq!(rep_result.as_ref(), values.as_ref());
7906 }
7907
7908 #[tokio::test]
7909 async fn test_complex_all_null_compression_gated_by_version() {
7910 use crate::format::pb21::page_layout::Layout;
7911 use arrow_array::ListArray;
7912
7913 let list_array = ListArray::from_iter_primitive::<arrow_array::types::Int32Type, _, _>(
7914 (0..1000).map(|i| if i % 2 == 0 { None } else { Some(vec![]) }),
7915 );
7916 let arr: ArrayRef = Arc::new(list_array);
7917 let field = arrow_schema::Field::new(
7918 "c",
7919 DataType::List(Arc::new(arrow_schema::Field::new(
7920 "item",
7921 DataType::Int32,
7922 true,
7923 ))),
7924 true,
7925 );
7926
7927 let page_v21 = encode_first_page(field.clone(), arr.clone(), LanceFileVersion::V2_1).await;
7928 let PageEncoding::Structural(layout_v21) = &page_v21.description else {
7929 panic!("Expected structural encoding");
7930 };
7931 let Layout::ConstantLayout(layout_v21) = layout_v21.layout.as_ref().unwrap() else {
7932 panic!("Expected constant layout");
7933 };
7934 assert!(layout_v21.rep_compression.is_none());
7935 assert!(layout_v21.def_compression.is_none());
7936 assert_eq!(layout_v21.num_rep_values, 0);
7937 assert_eq!(layout_v21.num_def_values, 0);
7938
7939 let page_v22 = encode_first_page(field, arr, LanceFileVersion::V2_2).await;
7940 let PageEncoding::Structural(layout_v22) = &page_v22.description else {
7941 panic!("Expected structural encoding");
7942 };
7943 let Layout::ConstantLayout(layout_v22) = layout_v22.layout.as_ref().unwrap() else {
7944 panic!("Expected constant layout");
7945 };
7946 assert!(layout_v22.def_compression.is_some());
7947 assert!(layout_v22.num_def_values > 0);
7948 }
7949
7950 #[tokio::test]
7951 async fn test_complex_all_null_round_trip() {
7952 use arrow_array::ListArray;
7953
7954 let list_array = ListArray::from_iter_primitive::<arrow_array::types::Int32Type, _, _>(
7955 (0..1000).map(|i| if i % 2 == 0 { None } else { Some(vec![]) }),
7956 );
7957
7958 let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_2);
7959 check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, HashMap::new())
7960 .await;
7961 }
7962
7963 #[tokio::test]
7965 async fn test_sparse_boolean_list_roundtrip() {
7966 use arrow_array::builder::{BooleanBuilder, ListBuilder};
7967
7968 let mut list_builder = ListBuilder::new(BooleanBuilder::new());
7969 for i in 0..1000i32 {
7970 if i % 64 == 0 {
7971 list_builder.values().append_value(i % 128 == 0);
7973 list_builder.append(true);
7974 } else {
7975 list_builder.append(false);
7976 }
7977 }
7978 let list_array = Arc::new(list_builder.finish());
7979
7980 let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
7981 check_round_trip_encoding_of_data(vec![list_array], &test_cases, HashMap::new()).await;
7982 }
7983}