1use std::{
5 any::Any,
6 collections::{HashMap, VecDeque},
7 env,
8 fmt::Debug,
9 iter,
10 ops::Range,
11 sync::Arc,
12 vec,
13};
14
15use crate::{
16 constants::{
17 STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK,
18 },
19 data::DictionaryDataBlock,
20 encodings::logical::primitive::blob::{BlobDescriptionPageScheduler, BlobPageScheduler},
21 format::{
22 pb21::{self, compressive_encoding::Compression, CompressiveEncoding, PageLayout},
23 ProtobufUtils21,
24 },
25};
26use arrow_array::{cast::AsArray, make_array, types::UInt64Type, Array, ArrayRef, PrimitiveArray};
27use arrow_buffer::{BooleanBuffer, NullBuffer, ScalarBuffer};
28use arrow_schema::{DataType, Field as ArrowField};
29use bytes::Bytes;
30use futures::{future::BoxFuture, stream::FuturesOrdered, FutureExt, TryStreamExt};
31use itertools::Itertools;
32use lance_arrow::deepcopy::deep_copy_nulls;
33use lance_core::{
34 cache::{CacheKey, Context, DeepSizeOf},
35 error::{Error, LanceOptionExt},
36 utils::bit::pad_bytes,
37};
38use log::trace;
39use snafu::location;
40
41use crate::{
42 compression::{
43 BlockDecompressor, CompressionStrategy, DecompressionStrategy, MiniBlockDecompressor,
44 },
45 data::{AllNullDataBlock, DataBlock, VariableWidthBlock},
46 utils::bytepack::BytepackedIntegerEncoder,
47};
48use crate::{
49 compression::{FixedPerValueDecompressor, VariablePerValueDecompressor},
50 encodings::logical::primitive::fullzip::PerValueDataBlock,
51};
52use crate::{
53 encodings::logical::primitive::miniblock::MiniBlockChunk, utils::bytepack::ByteUnpacker,
54};
55use crate::{
56 encodings::logical::primitive::miniblock::MiniBlockCompressed,
57 statistics::{ComputeStat, GetStat, Stat},
58};
59use crate::{
60 repdef::{
61 build_control_word_iterator, CompositeRepDefUnraveler, ControlWordIterator,
62 ControlWordParser, DefinitionInterpretation, RepDefSlicer,
63 },
64 utils::accumulation::AccumulationQueue,
65};
66use lance_core::{datatypes::Field, utils::tokio::spawn_cpu, Result};
67
68use crate::constants::DICT_SIZE_RATIO_META_KEY;
69use crate::encodings::logical::primitive::dict::DICT_INDICES_BITS_PER_VALUE;
70use crate::version::LanceFileVersion;
71use crate::{
72 buffer::LanceBuffer,
73 data::{BlockInfo, DataBlockBuilder, FixedWidthDataBlock},
74 decoder::{
75 ColumnInfo, DecodePageTask, DecodedArray, DecodedPage, FilterExpression, LoadedPageShard,
76 MessageType, PageEncoding, PageInfo, ScheduledScanLine, SchedulerContext,
77 StructuralDecodeArrayTask, StructuralFieldDecoder, StructuralFieldScheduler,
78 StructuralPageDecoder, StructuralSchedulingJob, UnloadedPageShard,
79 },
80 encoder::{
81 EncodeTask, EncodedColumn, EncodedPage, EncodingOptions, FieldEncoder, OutOfLineBuffers,
82 },
83 repdef::{LevelBuffer, RepDefBuilder, RepDefUnraveler},
84 EncodingsIo,
85};
86
87pub mod blob;
88pub mod dict;
89pub mod fullzip;
90pub mod miniblock;
91
92const FILL_BYTE: u8 = 0xFE;
93
94struct PageLoadTask {
95 decoder_fut: BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>,
96 num_rows: u64,
97}
98
99trait StructuralPageScheduler: std::fmt::Debug + Send {
102 fn initialize<'a>(
104 &'a mut self,
105 io: &Arc<dyn EncodingsIo>,
106 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>>;
107 fn load(&mut self, data: &Arc<dyn CachedPageData>);
109 fn schedule_ranges(
118 &self,
119 ranges: &[Range<u64>],
120 io: &Arc<dyn EncodingsIo>,
121 ) -> Result<Vec<PageLoadTask>>;
122}
123
124#[derive(Debug)]
126struct ChunkMeta {
127 num_values: u64,
128 chunk_size_bytes: u64,
129 offset_bytes: u64,
130}
131
132#[derive(Debug, Clone)]
134struct DecodedMiniBlockChunk {
135 rep: Option<ScalarBuffer<u16>>,
136 def: Option<ScalarBuffer<u16>>,
137 values: DataBlock,
138}
139
140#[derive(Debug)]
148struct DecodeMiniBlockTask {
149 rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
150 def_decompressor: Option<Arc<dyn BlockDecompressor>>,
151 value_decompressor: Arc<dyn MiniBlockDecompressor>,
152 dictionary_data: Option<Arc<DataBlock>>,
153 def_meaning: Arc<[DefinitionInterpretation]>,
154 num_buffers: u64,
155 max_visible_level: u16,
156 instructions: Vec<(ChunkDrainInstructions, LoadedChunk)>,
157 has_large_chunk: bool,
158}
159
160impl DecodeMiniBlockTask {
161 fn decode_levels(
162 rep_decompressor: &dyn BlockDecompressor,
163 levels: LanceBuffer,
164 num_levels: u16,
165 ) -> Result<ScalarBuffer<u16>> {
166 let rep = rep_decompressor.decompress(levels, num_levels as u64)?;
167 let rep = rep.as_fixed_width().unwrap();
168 debug_assert_eq!(rep.num_values, num_levels as u64);
169 debug_assert_eq!(rep.bits_per_value, 16);
170 Ok(rep.data.borrow_to_typed_slice::<u16>())
171 }
172
173 fn extend_levels(
180 range: Range<u64>,
181 levels: &mut Option<LevelBuffer>,
182 level_buf: &Option<impl AsRef<[u16]>>,
183 dest_offset: usize,
184 ) {
185 if let Some(level_buf) = level_buf {
186 if levels.is_none() {
187 let mut new_levels_vec =
190 LevelBuffer::with_capacity(dest_offset + (range.end - range.start) as usize);
191 new_levels_vec.extend(iter::repeat_n(0, dest_offset));
192 *levels = Some(new_levels_vec);
193 }
194 levels.as_mut().unwrap().extend(
195 level_buf.as_ref()[range.start as usize..range.end as usize]
196 .iter()
197 .copied(),
198 );
199 } else if let Some(levels) = levels {
200 let num_values = (range.end - range.start) as usize;
201 levels.extend(iter::repeat_n(0, num_values));
204 }
205 }
206
207 fn map_range(
244 range: Range<u64>,
245 rep: Option<&impl AsRef<[u16]>>,
246 def: Option<&impl AsRef<[u16]>>,
247 max_rep: u16,
248 max_visible_def: u16,
249 total_items: u64,
252 preamble_action: PreambleAction,
253 ) -> (Range<u64>, Range<u64>) {
254 if let Some(rep) = rep {
255 let mut rep = rep.as_ref();
256 let mut items_in_preamble = 0_u64;
259 let first_row_start = match preamble_action {
260 PreambleAction::Skip | PreambleAction::Take => {
261 let first_row_start = if let Some(def) = def.as_ref() {
262 let mut first_row_start = None;
263 for (idx, (rep, def)) in rep.iter().zip(def.as_ref()).enumerate() {
264 if *rep == max_rep {
265 first_row_start = Some(idx as u64);
266 break;
267 }
268 if *def <= max_visible_def {
269 items_in_preamble += 1;
270 }
271 }
272 first_row_start
273 } else {
274 let first_row_start =
275 rep.iter().position(|&r| r == max_rep).map(|r| r as u64);
276 items_in_preamble = first_row_start.unwrap_or(rep.len() as u64);
277 first_row_start
278 };
279 if first_row_start.is_none() {
282 assert!(preamble_action == PreambleAction::Take);
283 return (0..total_items, 0..rep.len() as u64);
284 }
285 let first_row_start = first_row_start.unwrap();
286 rep = &rep[first_row_start as usize..];
287 first_row_start
288 }
289 PreambleAction::Absent => {
290 debug_assert!(rep[0] == max_rep);
291 0
292 }
293 };
294
295 if range.start == range.end {
297 debug_assert!(preamble_action == PreambleAction::Take);
298 debug_assert!(items_in_preamble <= total_items);
299 return (0..items_in_preamble, 0..first_row_start);
300 }
301 assert!(range.start < range.end);
302
303 let mut rows_seen = 0;
304 let mut new_start = 0;
305 let mut new_levels_start = 0;
306
307 if let Some(def) = def {
308 let def = &def.as_ref()[first_row_start as usize..];
309
310 let mut lead_invis_seen = 0;
312
313 if range.start > 0 {
314 if def[0] > max_visible_def {
315 lead_invis_seen += 1;
316 }
317 for (idx, (rep, def)) in rep.iter().zip(def).skip(1).enumerate() {
318 if *rep == max_rep {
319 rows_seen += 1;
320 if rows_seen == range.start {
321 new_start = idx as u64 + 1 - lead_invis_seen;
322 new_levels_start = idx as u64 + 1;
323 break;
324 }
325 }
326 if *def > max_visible_def {
327 lead_invis_seen += 1;
328 }
329 }
330 }
331
332 rows_seen += 1;
333
334 let mut new_end = u64::MAX;
335 let mut new_levels_end = rep.len() as u64;
336 let new_start_is_visible = def[new_levels_start as usize] <= max_visible_def;
337 let mut tail_invis_seen = if new_start_is_visible { 0 } else { 1 };
338 for (idx, (rep, def)) in rep[(new_levels_start + 1) as usize..]
339 .iter()
340 .zip(&def[(new_levels_start + 1) as usize..])
341 .enumerate()
342 {
343 if *rep == max_rep {
344 rows_seen += 1;
345 if rows_seen == range.end + 1 {
346 new_end = idx as u64 + new_start + 1 - tail_invis_seen;
347 new_levels_end = idx as u64 + new_levels_start + 1;
348 break;
349 }
350 }
351 if *def > max_visible_def {
352 tail_invis_seen += 1;
353 }
354 }
355
356 if new_end == u64::MAX {
357 new_levels_end = rep.len() as u64;
358 let total_invis_seen = lead_invis_seen + tail_invis_seen;
359 new_end = rep.len() as u64 - total_invis_seen;
360 }
361
362 assert_ne!(new_end, u64::MAX);
363
364 if preamble_action == PreambleAction::Skip {
366 new_start += items_in_preamble;
367 new_end += items_in_preamble;
368 new_levels_start += first_row_start;
369 new_levels_end += first_row_start;
370 } else if preamble_action == PreambleAction::Take {
371 debug_assert_eq!(new_start, 0);
372 debug_assert_eq!(new_levels_start, 0);
373 new_end += items_in_preamble;
374 new_levels_end += first_row_start;
375 }
376
377 debug_assert!(new_end <= total_items);
378 (new_start..new_end, new_levels_start..new_levels_end)
379 } else {
380 if range.start > 0 {
386 for (idx, rep) in rep.iter().skip(1).enumerate() {
387 if *rep == max_rep {
388 rows_seen += 1;
389 if rows_seen == range.start {
390 new_start = idx as u64 + 1;
391 break;
392 }
393 }
394 }
395 }
396 let mut new_end = rep.len() as u64;
397 if range.end < total_items {
399 for (idx, rep) in rep[(new_start + 1) as usize..].iter().enumerate() {
400 if *rep == max_rep {
401 rows_seen += 1;
402 if rows_seen == range.end {
403 new_end = idx as u64 + new_start + 1;
404 break;
405 }
406 }
407 }
408 }
409
410 if preamble_action == PreambleAction::Skip {
412 new_start += first_row_start;
413 new_end += first_row_start;
414 } else if preamble_action == PreambleAction::Take {
415 debug_assert_eq!(new_start, 0);
416 new_end += first_row_start;
417 }
418
419 debug_assert!(new_end <= total_items);
420 (new_start..new_end, new_start..new_end)
421 }
422 } else {
423 (range.clone(), range)
426 }
427 }
428
429 fn read_buffer_sizes<const LARGE: bool>(
431 buf: &[u8],
432 offset: &mut usize,
433 num_buffers: u64,
434 ) -> Vec<u32> {
435 let read_size = if LARGE { 4 } else { 2 };
436 (0..num_buffers)
437 .map(|_| {
438 let bytes = &buf[*offset..*offset + read_size];
439 let size = if LARGE {
440 u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]])
441 } else {
442 u16::from_le_bytes([bytes[0], bytes[1]]) as u32
444 };
445 *offset += read_size;
446 size
447 })
448 .collect()
449 }
450
451 fn decode_miniblock_chunk(
453 &self,
454 buf: &LanceBuffer,
455 items_in_chunk: u64,
456 ) -> Result<DecodedMiniBlockChunk> {
457 let mut offset = 0;
458 let num_levels = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
459 offset += 2;
460
461 let rep_size = if self.rep_decompressor.is_some() {
462 let rep_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
463 offset += 2;
464 Some(rep_size)
465 } else {
466 None
467 };
468 let def_size = if self.def_decompressor.is_some() {
469 let def_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
470 offset += 2;
471 Some(def_size)
472 } else {
473 None
474 };
475
476 let buffer_sizes = if self.has_large_chunk {
477 Self::read_buffer_sizes::<true>(buf, &mut offset, self.num_buffers)
478 } else {
479 Self::read_buffer_sizes::<false>(buf, &mut offset, self.num_buffers)
480 };
481
482 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
483
484 let rep = rep_size.map(|rep_size| {
485 let rep = buf.slice_with_length(offset, rep_size as usize);
486 offset += rep_size as usize;
487 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
488 rep
489 });
490
491 let def = def_size.map(|def_size| {
492 let def = buf.slice_with_length(offset, def_size as usize);
493 offset += def_size as usize;
494 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
495 def
496 });
497
498 let buffers = buffer_sizes
499 .into_iter()
500 .map(|buf_size| {
501 let buf = buf.slice_with_length(offset, buf_size as usize);
502 offset += buf_size as usize;
503 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
504 buf
505 })
506 .collect::<Vec<_>>();
507
508 let values = self
509 .value_decompressor
510 .decompress(buffers, items_in_chunk)?;
511
512 let rep = rep
513 .map(|rep| {
514 Self::decode_levels(
515 self.rep_decompressor.as_ref().unwrap().as_ref(),
516 rep,
517 num_levels,
518 )
519 })
520 .transpose()?;
521 let def = def
522 .map(|def| {
523 Self::decode_levels(
524 self.def_decompressor.as_ref().unwrap().as_ref(),
525 def,
526 num_levels,
527 )
528 })
529 .transpose()?;
530
531 Ok(DecodedMiniBlockChunk { rep, def, values })
532 }
533}
534
535impl DecodePageTask for DecodeMiniBlockTask {
536 fn decode(self: Box<Self>) -> Result<DecodedPage> {
537 let mut repbuf: Option<LevelBuffer> = None;
539 let mut defbuf: Option<LevelBuffer> = None;
540
541 let max_rep = self.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
542
543 let estimated_size_bytes = self
545 .instructions
546 .iter()
547 .map(|(_, chunk)| chunk.data.len())
548 .sum::<usize>()
549 * 2;
550 let mut data_builder =
551 DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
552
553 let mut level_offset = 0;
555
556 let needs_caching: Vec<bool> = self
558 .instructions
559 .windows(2)
560 .map(|w| w[0].1.chunk_idx == w[1].1.chunk_idx)
561 .chain(std::iter::once(false)) .collect();
563
564 let mut chunk_cache: Option<(usize, DecodedMiniBlockChunk)> = None;
566
567 for (idx, (instructions, chunk)) in self.instructions.iter().enumerate() {
569 let should_cache_this_chunk = needs_caching[idx];
570
571 let decoded_chunk = match &chunk_cache {
572 Some((cached_chunk_idx, ref cached_chunk))
573 if *cached_chunk_idx == chunk.chunk_idx =>
574 {
575 cached_chunk.clone()
577 }
578 _ => {
579 let decoded = self.decode_miniblock_chunk(&chunk.data, chunk.items_in_chunk)?;
581
582 if should_cache_this_chunk {
584 chunk_cache = Some((chunk.chunk_idx, decoded.clone()));
585 }
586 decoded
587 }
588 };
589
590 let DecodedMiniBlockChunk { rep, def, values } = decoded_chunk;
591
592 let row_range_start =
594 instructions.rows_to_skip + instructions.chunk_instructions.rows_to_skip;
595 let row_range_end = row_range_start + instructions.rows_to_take;
596
597 let (item_range, level_range) = Self::map_range(
599 row_range_start..row_range_end,
600 rep.as_ref(),
601 def.as_ref(),
602 max_rep,
603 self.max_visible_level,
604 chunk.items_in_chunk,
605 instructions.preamble_action,
606 );
607 if item_range.end - item_range.start > chunk.items_in_chunk {
608 return Err(lance_core::Error::Internal {
609 message: format!(
610 "Item range {:?} is greater than chunk items in chunk {:?}",
611 item_range, chunk.items_in_chunk
612 ),
613 location: location!(),
614 });
615 }
616
617 Self::extend_levels(level_range.clone(), &mut repbuf, &rep, level_offset);
619 Self::extend_levels(level_range.clone(), &mut defbuf, &def, level_offset);
620 level_offset += (level_range.end - level_range.start) as usize;
621 data_builder.append(&values, item_range);
622 }
623
624 let mut data = data_builder.finish();
625
626 let unraveler =
627 RepDefUnraveler::new(repbuf, defbuf, self.def_meaning.clone(), data.num_values());
628
629 if let Some(dictionary) = &self.dictionary_data {
630 let DataBlock::FixedWidth(indices) = data else {
632 return Err(lance_core::Error::Internal {
633 message: format!(
634 "Expected FixedWidth DataBlock for dictionary indices, got {:?}",
635 data
636 ),
637 location: location!(),
638 });
639 };
640 data = DataBlock::Dictionary(DictionaryDataBlock::from_parts(
641 indices,
642 dictionary.as_ref().clone(),
643 ));
644 }
645
646 Ok(DecodedPage {
647 data,
648 repdef: unraveler,
649 })
650 }
651}
652
653#[derive(Debug)]
656struct LoadedChunk {
657 data: LanceBuffer,
658 items_in_chunk: u64,
659 byte_range: Range<u64>,
660 chunk_idx: usize,
661}
662
663impl Clone for LoadedChunk {
664 fn clone(&self) -> Self {
665 Self {
666 data: self.data.clone(),
668 items_in_chunk: self.items_in_chunk,
669 byte_range: self.byte_range.clone(),
670 chunk_idx: self.chunk_idx,
671 }
672 }
673}
674
675#[derive(Debug)]
678struct MiniBlockDecoder {
679 rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
680 def_decompressor: Option<Arc<dyn BlockDecompressor>>,
681 value_decompressor: Arc<dyn MiniBlockDecompressor>,
682 def_meaning: Arc<[DefinitionInterpretation]>,
683 loaded_chunks: VecDeque<LoadedChunk>,
684 instructions: VecDeque<ChunkInstructions>,
685 offset_in_current_chunk: u64,
686 num_rows: u64,
687 num_buffers: u64,
688 dictionary: Option<Arc<DataBlock>>,
689 has_large_chunk: bool,
690}
691
692impl StructuralPageDecoder for MiniBlockDecoder {
695 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
696 let mut items_desired = num_rows;
697 let mut need_preamble = false;
698 let mut skip_in_chunk = self.offset_in_current_chunk;
699 let mut drain_instructions = Vec::new();
700 while items_desired > 0 || need_preamble {
701 let (instructions, consumed) = self
702 .instructions
703 .front()
704 .unwrap()
705 .drain_from_instruction(&mut items_desired, &mut need_preamble, &mut skip_in_chunk);
706
707 while self.loaded_chunks.front().unwrap().chunk_idx
708 != instructions.chunk_instructions.chunk_idx
709 {
710 self.loaded_chunks.pop_front();
711 }
712 drain_instructions.push((instructions, self.loaded_chunks.front().unwrap().clone()));
713 if consumed {
714 self.instructions.pop_front();
715 }
716 }
717 self.offset_in_current_chunk = skip_in_chunk;
720
721 let max_visible_level = self
722 .def_meaning
723 .iter()
724 .take_while(|l| !l.is_list())
725 .map(|l| l.num_def_levels())
726 .sum::<u16>();
727
728 Ok(Box::new(DecodeMiniBlockTask {
729 instructions: drain_instructions,
730 def_decompressor: self.def_decompressor.clone(),
731 rep_decompressor: self.rep_decompressor.clone(),
732 value_decompressor: self.value_decompressor.clone(),
733 dictionary_data: self.dictionary.clone(),
734 def_meaning: self.def_meaning.clone(),
735 num_buffers: self.num_buffers,
736 max_visible_level,
737 has_large_chunk: self.has_large_chunk,
738 }))
739 }
740
741 fn num_rows(&self) -> u64 {
742 self.num_rows
743 }
744}
745
746#[derive(Debug)]
747struct CachedComplexAllNullState {
748 rep: Option<ScalarBuffer<u16>>,
749 def: Option<ScalarBuffer<u16>>,
750}
751
752impl DeepSizeOf for CachedComplexAllNullState {
753 fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
754 self.rep.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
755 + self.def.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
756 }
757}
758
759impl CachedPageData for CachedComplexAllNullState {
760 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
761 self
762 }
763}
764
765#[derive(Debug)]
774pub struct ComplexAllNullScheduler {
775 buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
777 def_meaning: Arc<[DefinitionInterpretation]>,
778 repdef: Option<Arc<CachedComplexAllNullState>>,
779 max_visible_level: u16,
780}
781
782impl ComplexAllNullScheduler {
783 pub fn new(
784 buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
785 def_meaning: Arc<[DefinitionInterpretation]>,
786 ) -> Self {
787 let max_visible_level = def_meaning
788 .iter()
789 .take_while(|l| !l.is_list())
790 .map(|l| l.num_def_levels())
791 .sum::<u16>();
792 Self {
793 buffer_offsets_and_sizes,
794 def_meaning,
795 repdef: None,
796 max_visible_level,
797 }
798 }
799}
800
801impl StructuralPageScheduler for ComplexAllNullScheduler {
802 fn initialize<'a>(
803 &'a mut self,
804 io: &Arc<dyn EncodingsIo>,
805 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
806 let (rep_pos, rep_size) = self.buffer_offsets_and_sizes[0];
808 let (def_pos, def_size) = self.buffer_offsets_and_sizes[1];
809 let has_rep = rep_size > 0;
810 let has_def = def_size > 0;
811
812 let mut reads = Vec::with_capacity(2);
813 if has_rep {
814 reads.push(rep_pos..rep_pos + rep_size);
815 }
816 if has_def {
817 reads.push(def_pos..def_pos + def_size);
818 }
819
820 let data = io.submit_request(reads, 0);
821
822 async move {
823 let data = data.await?;
824 let mut data_iter = data.into_iter();
825
826 let rep = if has_rep {
827 let rep = data_iter.next().unwrap();
828 let rep = LanceBuffer::from_bytes(rep, 2);
829 let rep = rep.borrow_to_typed_slice::<u16>();
830 Some(rep)
831 } else {
832 None
833 };
834
835 let def = if has_def {
836 let def = data_iter.next().unwrap();
837 let def = LanceBuffer::from_bytes(def, 2);
838 let def = def.borrow_to_typed_slice::<u16>();
839 Some(def)
840 } else {
841 None
842 };
843
844 let repdef = Arc::new(CachedComplexAllNullState { rep, def });
845
846 self.repdef = Some(repdef.clone());
847
848 Ok(repdef as Arc<dyn CachedPageData>)
849 }
850 .boxed()
851 }
852
853 fn load(&mut self, data: &Arc<dyn CachedPageData>) {
854 self.repdef = Some(
855 data.clone()
856 .as_arc_any()
857 .downcast::<CachedComplexAllNullState>()
858 .unwrap(),
859 );
860 }
861
862 fn schedule_ranges(
863 &self,
864 ranges: &[Range<u64>],
865 _io: &Arc<dyn EncodingsIo>,
866 ) -> Result<Vec<PageLoadTask>> {
867 let ranges = VecDeque::from_iter(ranges.iter().cloned());
868 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
869 let decoder = Box::new(ComplexAllNullPageDecoder {
870 ranges,
871 rep: self.repdef.as_ref().unwrap().rep.clone(),
872 def: self.repdef.as_ref().unwrap().def.clone(),
873 num_rows,
874 def_meaning: self.def_meaning.clone(),
875 max_visible_level: self.max_visible_level,
876 }) as Box<dyn StructuralPageDecoder>;
877 let page_load_task = PageLoadTask {
878 decoder_fut: std::future::ready(Ok(decoder)).boxed(),
879 num_rows,
880 };
881 Ok(vec![page_load_task])
882 }
883}
884
885#[derive(Debug)]
886pub struct ComplexAllNullPageDecoder {
887 ranges: VecDeque<Range<u64>>,
888 rep: Option<ScalarBuffer<u16>>,
889 def: Option<ScalarBuffer<u16>>,
890 num_rows: u64,
891 def_meaning: Arc<[DefinitionInterpretation]>,
892 max_visible_level: u16,
893}
894
895impl ComplexAllNullPageDecoder {
896 fn drain_ranges(&mut self, num_rows: u64) -> Vec<Range<u64>> {
897 let mut rows_desired = num_rows;
898 let mut ranges = Vec::with_capacity(self.ranges.len());
899 while rows_desired > 0 {
900 let front = self.ranges.front_mut().unwrap();
901 let avail = front.end - front.start;
902 if avail > rows_desired {
903 ranges.push(front.start..front.start + rows_desired);
904 front.start += rows_desired;
905 rows_desired = 0;
906 } else {
907 ranges.push(self.ranges.pop_front().unwrap());
908 rows_desired -= avail;
909 }
910 }
911 ranges
912 }
913}
914
915impl StructuralPageDecoder for ComplexAllNullPageDecoder {
916 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
917 let drained_ranges = self.drain_ranges(num_rows);
918 Ok(Box::new(DecodeComplexAllNullTask {
919 ranges: drained_ranges,
920 rep: self.rep.clone(),
921 def: self.def.clone(),
922 def_meaning: self.def_meaning.clone(),
923 max_visible_level: self.max_visible_level,
924 }))
925 }
926
927 fn num_rows(&self) -> u64 {
928 self.num_rows
929 }
930}
931
932#[derive(Debug)]
935pub struct DecodeComplexAllNullTask {
936 ranges: Vec<Range<u64>>,
937 rep: Option<ScalarBuffer<u16>>,
938 def: Option<ScalarBuffer<u16>>,
939 def_meaning: Arc<[DefinitionInterpretation]>,
940 max_visible_level: u16,
941}
942
943impl DecodeComplexAllNullTask {
944 fn decode_level(
945 &self,
946 levels: &Option<ScalarBuffer<u16>>,
947 num_values: u64,
948 ) -> Option<Vec<u16>> {
949 levels.as_ref().map(|levels| {
950 let mut referenced_levels = Vec::with_capacity(num_values as usize);
951 for range in &self.ranges {
952 referenced_levels.extend(
953 levels[range.start as usize..range.end as usize]
954 .iter()
955 .copied(),
956 );
957 }
958 referenced_levels
959 })
960 }
961}
962
963impl DecodePageTask for DecodeComplexAllNullTask {
964 fn decode(self: Box<Self>) -> Result<DecodedPage> {
965 let num_values = self.ranges.iter().map(|r| r.end - r.start).sum::<u64>();
966 let rep = self.decode_level(&self.rep, num_values);
967 let def = self.decode_level(&self.def, num_values);
968
969 let num_values = if let Some(def) = &def {
973 def.iter().filter(|&d| *d <= self.max_visible_level).count() as u64
974 } else {
975 num_values
976 };
977
978 let data = DataBlock::AllNull(AllNullDataBlock { num_values });
979 let unraveler = RepDefUnraveler::new(rep, def, self.def_meaning, num_values);
980 Ok(DecodedPage {
981 data,
982 repdef: unraveler,
983 })
984 }
985}
986
987#[derive(Debug, Default)]
992pub struct SimpleAllNullScheduler {}
993
994impl StructuralPageScheduler for SimpleAllNullScheduler {
995 fn initialize<'a>(
996 &'a mut self,
997 _io: &Arc<dyn EncodingsIo>,
998 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
999 std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
1000 }
1001
1002 fn load(&mut self, _cache: &Arc<dyn CachedPageData>) {}
1003
1004 fn schedule_ranges(
1005 &self,
1006 ranges: &[Range<u64>],
1007 _io: &Arc<dyn EncodingsIo>,
1008 ) -> Result<Vec<PageLoadTask>> {
1009 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1010 let decoder =
1011 Box::new(SimpleAllNullPageDecoder { num_rows }) as Box<dyn StructuralPageDecoder>;
1012 let page_load_task = PageLoadTask {
1013 decoder_fut: std::future::ready(Ok(decoder)).boxed(),
1014 num_rows,
1015 };
1016 Ok(vec![page_load_task])
1017 }
1018}
1019
1020#[derive(Debug)]
1023struct SimpleAllNullDecodePageTask {
1024 num_values: u64,
1025}
1026impl DecodePageTask for SimpleAllNullDecodePageTask {
1027 fn decode(self: Box<Self>) -> Result<DecodedPage> {
1028 let unraveler = RepDefUnraveler::new(
1029 None,
1030 Some(vec![1; self.num_values as usize]),
1031 Arc::new([DefinitionInterpretation::NullableItem]),
1032 self.num_values,
1033 );
1034 Ok(DecodedPage {
1035 data: DataBlock::AllNull(AllNullDataBlock {
1036 num_values: self.num_values,
1037 }),
1038 repdef: unraveler,
1039 })
1040 }
1041}
1042
1043#[derive(Debug)]
1044pub struct SimpleAllNullPageDecoder {
1045 num_rows: u64,
1046}
1047
1048impl StructuralPageDecoder for SimpleAllNullPageDecoder {
1049 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
1050 Ok(Box::new(SimpleAllNullDecodePageTask {
1051 num_values: num_rows,
1052 }))
1053 }
1054
1055 fn num_rows(&self) -> u64 {
1056 self.num_rows
1057 }
1058}
1059
1060#[derive(Debug, Clone)]
1061struct MiniBlockSchedulerDictionary {
1062 dictionary_decompressor: Arc<dyn BlockDecompressor>,
1064 dictionary_buf_position_and_size: (u64, u64),
1065 dictionary_data_alignment: u64,
1066 num_dictionary_items: u64,
1067}
1068
1069#[derive(Debug)]
1071struct MiniBlockRepIndexBlock {
1072 first_row: u64,
1076 starts_including_trailer: u64,
1079 has_preamble: bool,
1081 has_trailer: bool,
1083}
1084
1085impl DeepSizeOf for MiniBlockRepIndexBlock {
1086 fn deep_size_of_children(&self, _context: &mut Context) -> usize {
1087 0
1088 }
1089}
1090
1091#[derive(Debug)]
1096struct MiniBlockRepIndex {
1097 blocks: Vec<MiniBlockRepIndexBlock>,
1098}
1099
1100impl DeepSizeOf for MiniBlockRepIndex {
1101 fn deep_size_of_children(&self, context: &mut Context) -> usize {
1102 self.blocks.deep_size_of_children(context)
1103 }
1104}
1105
1106impl MiniBlockRepIndex {
1107 pub fn default_from_chunks(chunks: &[ChunkMeta]) -> Self {
1112 let mut blocks = Vec::with_capacity(chunks.len());
1113 let mut offset: u64 = 0;
1114
1115 for c in chunks {
1116 blocks.push(MiniBlockRepIndexBlock {
1117 first_row: offset,
1118 starts_including_trailer: c.num_values,
1119 has_preamble: false,
1120 has_trailer: false,
1121 });
1122
1123 offset += c.num_values;
1124 }
1125
1126 Self { blocks }
1127 }
1128
1129 pub fn decode_from_bytes(rep_bytes: &[u8], stride: usize) -> Self {
1135 let buffer = crate::buffer::LanceBuffer::from(rep_bytes.to_vec());
1137 let u64_slice = buffer.borrow_to_typed_slice::<u64>();
1138 let n = u64_slice.len() / stride;
1139
1140 let mut blocks = Vec::with_capacity(n);
1141 let mut chunk_has_preamble = false;
1142 let mut offset: u64 = 0;
1143
1144 for i in 0..n {
1146 let base_idx = i * stride;
1147 let ends = u64_slice[base_idx];
1148 let partial = u64_slice[base_idx + 1];
1149
1150 let has_trailer = partial > 0;
1151 let starts_including_trailer =
1153 ends + (has_trailer as u64) - (chunk_has_preamble as u64);
1154
1155 blocks.push(MiniBlockRepIndexBlock {
1156 first_row: offset,
1157 starts_including_trailer,
1158 has_preamble: chunk_has_preamble,
1159 has_trailer,
1160 });
1161
1162 chunk_has_preamble = has_trailer;
1163 offset += starts_including_trailer;
1164 }
1165
1166 Self { blocks }
1167 }
1168}
1169
1170#[derive(Debug)]
1172struct MiniBlockCacheableState {
1173 chunk_meta: Vec<ChunkMeta>,
1175 rep_index: MiniBlockRepIndex,
1177 dictionary: Option<Arc<DataBlock>>,
1179}
1180
1181impl DeepSizeOf for MiniBlockCacheableState {
1182 fn deep_size_of_children(&self, context: &mut Context) -> usize {
1183 self.rep_index.deep_size_of_children(context)
1184 + self
1185 .dictionary
1186 .as_ref()
1187 .map(|dict| dict.data_size() as usize)
1188 .unwrap_or(0)
1189 }
1190}
1191
1192impl CachedPageData for MiniBlockCacheableState {
1193 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
1194 self
1195 }
1196}
1197
1198#[derive(Debug)]
1225pub struct MiniBlockScheduler {
1226 buffer_offsets_and_sizes: Vec<(u64, u64)>,
1228 priority: u64,
1229 items_in_page: u64,
1230 repetition_index_depth: u16,
1231 num_buffers: u64,
1232 rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
1233 def_decompressor: Option<Arc<dyn BlockDecompressor>>,
1234 value_decompressor: Arc<dyn MiniBlockDecompressor>,
1235 def_meaning: Arc<[DefinitionInterpretation]>,
1236 dictionary: Option<MiniBlockSchedulerDictionary>,
1237 page_meta: Option<Arc<MiniBlockCacheableState>>,
1239 has_large_chunk: bool,
1240}
1241
1242impl MiniBlockScheduler {
1243 fn try_new(
1244 buffer_offsets_and_sizes: &[(u64, u64)],
1245 priority: u64,
1246 items_in_page: u64,
1247 layout: &pb21::MiniBlockLayout,
1248 decompressors: &dyn DecompressionStrategy,
1249 ) -> Result<Self> {
1250 let rep_decompressor = layout
1251 .rep_compression
1252 .as_ref()
1253 .map(|rep_compression| {
1254 decompressors
1255 .create_block_decompressor(rep_compression)
1256 .map(Arc::from)
1257 })
1258 .transpose()?;
1259 let def_decompressor = layout
1260 .def_compression
1261 .as_ref()
1262 .map(|def_compression| {
1263 decompressors
1264 .create_block_decompressor(def_compression)
1265 .map(Arc::from)
1266 })
1267 .transpose()?;
1268 let def_meaning = layout
1269 .layers
1270 .iter()
1271 .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
1272 .collect::<Vec<_>>();
1273 let value_decompressor = decompressors.create_miniblock_decompressor(
1274 layout.value_compression.as_ref().unwrap(),
1275 decompressors,
1276 )?;
1277
1278 let dictionary = if let Some(dictionary_encoding) = layout.dictionary.as_ref() {
1279 let num_dictionary_items = layout.num_dictionary_items;
1280 match dictionary_encoding.compression.as_ref().unwrap() {
1281 Compression::Variable(_) => Some(MiniBlockSchedulerDictionary {
1282 dictionary_decompressor: decompressors
1283 .create_block_decompressor(dictionary_encoding)?
1284 .into(),
1285 dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1286 dictionary_data_alignment: 4,
1287 num_dictionary_items,
1288 }),
1289 Compression::Flat(_) => Some(MiniBlockSchedulerDictionary {
1290 dictionary_decompressor: decompressors
1291 .create_block_decompressor(dictionary_encoding)?
1292 .into(),
1293 dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1294 dictionary_data_alignment: 16,
1295 num_dictionary_items,
1296 }),
1297 Compression::General(_) => Some(MiniBlockSchedulerDictionary {
1298 dictionary_decompressor: decompressors
1299 .create_block_decompressor(dictionary_encoding)?
1300 .into(),
1301 dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1302 dictionary_data_alignment: 1,
1303 num_dictionary_items,
1304 }),
1305 _ => unreachable!(
1306 "Mini-block dictionary encoding must use Variable, Flat, or General compression"
1307 ),
1308 }
1309 } else {
1310 None
1311 };
1312
1313 Ok(Self {
1314 buffer_offsets_and_sizes: buffer_offsets_and_sizes.to_vec(),
1315 rep_decompressor,
1316 def_decompressor,
1317 value_decompressor: value_decompressor.into(),
1318 repetition_index_depth: layout.repetition_index_depth as u16,
1319 num_buffers: layout.num_buffers,
1320 priority,
1321 items_in_page,
1322 dictionary,
1323 def_meaning: def_meaning.into(),
1324 page_meta: None,
1325 has_large_chunk: layout.has_large_chunk,
1326 })
1327 }
1328
1329 fn lookup_chunks(&self, chunk_indices: &[usize]) -> Vec<LoadedChunk> {
1330 let page_meta = self.page_meta.as_ref().unwrap();
1331 chunk_indices
1332 .iter()
1333 .map(|&chunk_idx| {
1334 let chunk_meta = &page_meta.chunk_meta[chunk_idx];
1335 let bytes_start = chunk_meta.offset_bytes;
1336 let bytes_end = bytes_start + chunk_meta.chunk_size_bytes;
1337 LoadedChunk {
1338 byte_range: bytes_start..bytes_end,
1339 items_in_chunk: chunk_meta.num_values,
1340 chunk_idx,
1341 data: LanceBuffer::empty(),
1342 }
1343 })
1344 .collect()
1345 }
1346}
1347
1348#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1349enum PreambleAction {
1350 Take,
1351 Skip,
1352 Absent,
1353}
1354
1355#[derive(Clone, Debug, PartialEq, Eq)]
1390struct ChunkInstructions {
1391 chunk_idx: usize,
1393 preamble: PreambleAction,
1399 rows_to_skip: u64,
1403 rows_to_take: u64,
1406 take_trailer: bool,
1413}
1414
1415#[derive(Debug, PartialEq, Eq)]
1433struct ChunkDrainInstructions {
1434 chunk_instructions: ChunkInstructions,
1435 rows_to_skip: u64,
1436 rows_to_take: u64,
1437 preamble_action: PreambleAction,
1438}
1439
1440impl ChunkInstructions {
1441 fn schedule_instructions(
1447 rep_index: &MiniBlockRepIndex,
1448 user_ranges: &[Range<u64>],
1449 ) -> Vec<Self> {
1450 let mut chunk_instructions = Vec::with_capacity(user_ranges.len());
1454
1455 for user_range in user_ranges {
1456 let mut rows_needed = user_range.end - user_range.start;
1457 let mut need_preamble = false;
1458
1459 let mut block_index = match rep_index
1462 .blocks
1463 .binary_search_by_key(&user_range.start, |block| block.first_row)
1464 {
1465 Ok(idx) => {
1466 let mut idx = idx;
1469 while idx > 0 && rep_index.blocks[idx - 1].first_row == user_range.start {
1470 idx -= 1;
1471 }
1472 idx
1473 }
1474 Err(idx) => idx - 1,
1476 };
1477
1478 let mut to_skip = user_range.start - rep_index.blocks[block_index].first_row;
1479
1480 while rows_needed > 0 || need_preamble {
1481 if block_index >= rep_index.blocks.len() {
1483 log::warn!("schedule_instructions inconsistency: block_index >= rep_index.blocks.len(), exiting early");
1484 break;
1485 }
1486
1487 let chunk = &rep_index.blocks[block_index];
1488 let rows_avail = chunk.starts_including_trailer.saturating_sub(to_skip);
1489
1490 if rows_avail == 0 && to_skip == 0 {
1494 if chunk.has_preamble && need_preamble {
1496 chunk_instructions.push(Self {
1497 chunk_idx: block_index,
1498 preamble: PreambleAction::Take,
1499 rows_to_skip: 0,
1500 rows_to_take: 0,
1501 take_trailer: chunk.has_trailer,
1505 });
1506 if chunk.starts_including_trailer > 0
1510 || block_index == rep_index.blocks.len() - 1
1511 {
1512 need_preamble = false;
1513 }
1514 }
1515 block_index += 1;
1517 continue;
1518 }
1519
1520 if rows_avail == 0 && to_skip > 0 {
1524 to_skip -= chunk.starts_including_trailer;
1527 block_index += 1;
1528 continue;
1529 }
1530
1531 let rows_to_take = rows_avail.min(rows_needed);
1532 rows_needed -= rows_to_take;
1533
1534 let mut take_trailer = false;
1535 let preamble = if chunk.has_preamble {
1536 if need_preamble {
1537 PreambleAction::Take
1538 } else {
1539 PreambleAction::Skip
1540 }
1541 } else {
1542 PreambleAction::Absent
1543 };
1544
1545 if rows_to_take == rows_avail && chunk.has_trailer {
1547 take_trailer = true;
1548 need_preamble = true;
1549 } else {
1550 need_preamble = false;
1551 };
1552
1553 chunk_instructions.push(Self {
1554 preamble,
1555 chunk_idx: block_index,
1556 rows_to_skip: to_skip,
1557 rows_to_take,
1558 take_trailer,
1559 });
1560
1561 to_skip = 0;
1562 block_index += 1;
1563 }
1564 }
1565
1566 if user_ranges.len() > 1 {
1570 let mut merged_instructions = Vec::with_capacity(chunk_instructions.len());
1572 let mut instructions_iter = chunk_instructions.into_iter();
1573 merged_instructions.push(instructions_iter.next().unwrap());
1574 for instruction in instructions_iter {
1575 let last = merged_instructions.last_mut().unwrap();
1576 if last.chunk_idx == instruction.chunk_idx
1577 && last.rows_to_take + last.rows_to_skip == instruction.rows_to_skip
1578 {
1579 last.rows_to_take += instruction.rows_to_take;
1580 last.take_trailer |= instruction.take_trailer;
1581 } else {
1582 merged_instructions.push(instruction);
1583 }
1584 }
1585 merged_instructions
1586 } else {
1587 chunk_instructions
1588 }
1589 }
1590
1591 fn drain_from_instruction(
1592 &self,
1593 rows_desired: &mut u64,
1594 need_preamble: &mut bool,
1595 skip_in_chunk: &mut u64,
1596 ) -> (ChunkDrainInstructions, bool) {
1597 debug_assert!(!*need_preamble || *skip_in_chunk == 0);
1599 let rows_avail = self.rows_to_take - *skip_in_chunk;
1600 let has_preamble = self.preamble != PreambleAction::Absent;
1601 let preamble_action = match (*need_preamble, has_preamble) {
1602 (true, true) => PreambleAction::Take,
1603 (true, false) => panic!("Need preamble but there isn't one"),
1604 (false, true) => PreambleAction::Skip,
1605 (false, false) => PreambleAction::Absent,
1606 };
1607
1608 let rows_taking = if *rows_desired >= rows_avail {
1611 *need_preamble = self.take_trailer;
1619 rows_avail
1620 } else {
1621 *need_preamble = false;
1624 *rows_desired
1625 };
1626 let rows_skipped = *skip_in_chunk;
1627
1628 let consumed_chunk = if *rows_desired >= rows_avail {
1630 *rows_desired -= rows_avail;
1631 *skip_in_chunk = 0;
1632 true
1633 } else {
1634 *skip_in_chunk += *rows_desired;
1635 *rows_desired = 0;
1636 false
1637 };
1638
1639 (
1640 ChunkDrainInstructions {
1641 chunk_instructions: self.clone(),
1642 rows_to_skip: rows_skipped,
1643 rows_to_take: rows_taking,
1644 preamble_action,
1645 },
1646 consumed_chunk,
1647 )
1648 }
1649}
1650
1651enum Words {
1652 U16(ScalarBuffer<u16>),
1653 U32(ScalarBuffer<u32>),
1654}
1655
1656struct WordsIter<'a> {
1657 iter: Box<dyn Iterator<Item = u32> + 'a>,
1658}
1659
1660impl Words {
1661 pub fn len(&self) -> usize {
1662 match self {
1663 Self::U16(b) => b.len(),
1664 Self::U32(b) => b.len(),
1665 }
1666 }
1667
1668 pub fn iter(&self) -> WordsIter<'_> {
1669 match self {
1670 Self::U16(buf) => WordsIter {
1671 iter: Box::new(buf.iter().map(|&x| x as u32)),
1672 },
1673 Self::U32(buf) => WordsIter {
1674 iter: Box::new(buf.iter().copied()),
1675 },
1676 }
1677 }
1678
1679 pub fn from_bytes(bytes: Bytes, has_large_chunk: bool) -> Result<Self> {
1680 let bytes_per_value = if has_large_chunk { 4 } else { 2 };
1681 assert_eq!(bytes.len() % bytes_per_value, 0);
1682 let buffer = LanceBuffer::from_bytes(bytes, bytes_per_value as u64);
1683 if has_large_chunk {
1684 Ok(Self::U32(buffer.borrow_to_typed_slice::<u32>()))
1685 } else {
1686 Ok(Self::U16(buffer.borrow_to_typed_slice::<u16>()))
1687 }
1688 }
1689}
1690
1691impl<'a> Iterator for WordsIter<'a> {
1692 type Item = u32;
1693
1694 fn next(&mut self) -> Option<Self::Item> {
1695 self.iter.next()
1696 }
1697}
1698
1699impl StructuralPageScheduler for MiniBlockScheduler {
1700 fn initialize<'a>(
1701 &'a mut self,
1702 io: &Arc<dyn EncodingsIo>,
1703 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
1704 let (meta_buf_position, meta_buf_size) = self.buffer_offsets_and_sizes[0];
1708 let value_buf_position = self.buffer_offsets_and_sizes[1].0;
1709 let mut bufs_needed = 1;
1710 if self.dictionary.is_some() {
1711 bufs_needed += 1;
1712 }
1713 if self.repetition_index_depth > 0 {
1714 bufs_needed += 1;
1715 }
1716 let mut required_ranges = Vec::with_capacity(bufs_needed);
1717 required_ranges.push(meta_buf_position..meta_buf_position + meta_buf_size);
1718 if let Some(ref dictionary) = self.dictionary {
1719 required_ranges.push(
1720 dictionary.dictionary_buf_position_and_size.0
1721 ..dictionary.dictionary_buf_position_and_size.0
1722 + dictionary.dictionary_buf_position_and_size.1,
1723 );
1724 }
1725 if self.repetition_index_depth > 0 {
1726 let (rep_index_pos, rep_index_size) = self.buffer_offsets_and_sizes.last().unwrap();
1727 required_ranges.push(*rep_index_pos..*rep_index_pos + *rep_index_size);
1728 }
1729 let io_req = io.submit_request(required_ranges, 0);
1730
1731 async move {
1732 let mut buffers = io_req.await?.into_iter().fuse();
1733 let meta_bytes = buffers.next().unwrap();
1734 let dictionary_bytes = self.dictionary.as_ref().and_then(|_| buffers.next());
1735 let rep_index_bytes = buffers.next();
1736
1737 let words = Words::from_bytes(meta_bytes, self.has_large_chunk)?;
1739 let mut chunk_meta = Vec::with_capacity(words.len());
1740
1741 let mut rows_counter = 0;
1742 let mut offset_bytes = value_buf_position;
1743 for (word_idx, word) in words.iter().enumerate() {
1744 let log_num_values = word & 0x0F;
1745 let divided_bytes = word >> 4;
1746 let num_bytes = (divided_bytes as usize + 1) * MINIBLOCK_ALIGNMENT;
1747 debug_assert!(num_bytes > 0);
1748 let num_values = if word_idx < words.len() - 1 {
1749 debug_assert!(log_num_values > 0);
1750 1 << log_num_values
1751 } else {
1752 debug_assert!(
1753 log_num_values == 0
1754 || (1 << log_num_values) == (self.items_in_page - rows_counter)
1755 );
1756 self.items_in_page - rows_counter
1757 };
1758 rows_counter += num_values;
1759
1760 chunk_meta.push(ChunkMeta {
1761 num_values,
1762 chunk_size_bytes: num_bytes as u64,
1763 offset_bytes,
1764 });
1765 offset_bytes += num_bytes as u64;
1766 }
1767
1768 let rep_index = if let Some(rep_index_data) = rep_index_bytes {
1770 assert!(rep_index_data.len() % 8 == 0);
1771 let stride = self.repetition_index_depth as usize + 1;
1772 MiniBlockRepIndex::decode_from_bytes(&rep_index_data, stride)
1773 } else {
1774 MiniBlockRepIndex::default_from_chunks(&chunk_meta)
1775 };
1776
1777 let mut page_meta = MiniBlockCacheableState {
1778 chunk_meta,
1779 rep_index,
1780 dictionary: None,
1781 };
1782
1783 if let Some(ref mut dictionary) = self.dictionary {
1785 let dictionary_data = dictionary_bytes.unwrap();
1786 page_meta.dictionary =
1787 Some(Arc::new(dictionary.dictionary_decompressor.decompress(
1788 LanceBuffer::from_bytes(
1789 dictionary_data,
1790 dictionary.dictionary_data_alignment,
1791 ),
1792 dictionary.num_dictionary_items,
1793 )?));
1794 };
1795 let page_meta = Arc::new(page_meta);
1796 self.page_meta = Some(page_meta.clone());
1797 Ok(page_meta as Arc<dyn CachedPageData>)
1798 }
1799 .boxed()
1800 }
1801
1802 fn load(&mut self, data: &Arc<dyn CachedPageData>) {
1803 self.page_meta = Some(
1804 data.clone()
1805 .as_arc_any()
1806 .downcast::<MiniBlockCacheableState>()
1807 .unwrap(),
1808 );
1809 }
1810
1811 fn schedule_ranges(
1812 &self,
1813 ranges: &[Range<u64>],
1814 io: &Arc<dyn EncodingsIo>,
1815 ) -> Result<Vec<PageLoadTask>> {
1816 let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
1817
1818 let page_meta = self.page_meta.as_ref().unwrap();
1819
1820 let chunk_instructions =
1821 ChunkInstructions::schedule_instructions(&page_meta.rep_index, ranges);
1822
1823 debug_assert_eq!(
1824 num_rows,
1825 chunk_instructions
1826 .iter()
1827 .map(|ci| ci.rows_to_take)
1828 .sum::<u64>()
1829 );
1830
1831 let chunks_needed = chunk_instructions
1832 .iter()
1833 .map(|ci| ci.chunk_idx)
1834 .unique()
1835 .collect::<Vec<_>>();
1836
1837 let mut loaded_chunks = self.lookup_chunks(&chunks_needed);
1838 let chunk_ranges = loaded_chunks
1839 .iter()
1840 .map(|c| c.byte_range.clone())
1841 .collect::<Vec<_>>();
1842 let loaded_chunk_data = io.submit_request(chunk_ranges, self.priority);
1843
1844 let rep_decompressor = self.rep_decompressor.clone();
1845 let def_decompressor = self.def_decompressor.clone();
1846 let value_decompressor = self.value_decompressor.clone();
1847 let num_buffers = self.num_buffers;
1848 let has_large_chunk = self.has_large_chunk;
1849 let dictionary = page_meta
1850 .dictionary
1851 .as_ref()
1852 .map(|dictionary| dictionary.clone());
1853 let def_meaning = self.def_meaning.clone();
1854
1855 let res = async move {
1856 let loaded_chunk_data = loaded_chunk_data.await?;
1857 for (loaded_chunk, chunk_data) in loaded_chunks.iter_mut().zip(loaded_chunk_data) {
1858 loaded_chunk.data = LanceBuffer::from_bytes(chunk_data, 1);
1859 }
1860
1861 Ok(Box::new(MiniBlockDecoder {
1862 rep_decompressor,
1863 def_decompressor,
1864 value_decompressor,
1865 def_meaning,
1866 loaded_chunks: VecDeque::from_iter(loaded_chunks),
1867 instructions: VecDeque::from(chunk_instructions),
1868 offset_in_current_chunk: 0,
1869 dictionary,
1870 num_rows,
1871 num_buffers,
1872 has_large_chunk,
1873 }) as Box<dyn StructuralPageDecoder>)
1874 }
1875 .boxed();
1876 let page_load_task = PageLoadTask {
1877 decoder_fut: res,
1878 num_rows,
1879 };
1880 Ok(vec![page_load_task])
1881 }
1882}
1883
1884#[derive(Debug, Clone, Copy)]
1885struct FullZipRepIndexDetails {
1886 buf_position: u64,
1887 bytes_per_value: u64, }
1889
1890#[derive(Debug)]
1891enum PerValueDecompressor {
1892 Fixed(Arc<dyn FixedPerValueDecompressor>),
1893 Variable(Arc<dyn VariablePerValueDecompressor>),
1894}
1895
1896#[derive(Debug)]
1897struct FullZipDecodeDetails {
1898 value_decompressor: PerValueDecompressor,
1899 def_meaning: Arc<[DefinitionInterpretation]>,
1900 ctrl_word_parser: ControlWordParser,
1901 max_rep: u16,
1902 max_visible_def: u16,
1903}
1904
1905#[derive(Debug)]
1913pub struct FullZipScheduler {
1914 data_buf_position: u64,
1915 rep_index: Option<FullZipRepIndexDetails>,
1916 priority: u64,
1917 rows_in_page: u64,
1918 bits_per_offset: u8,
1919 details: Arc<FullZipDecodeDetails>,
1920 cached_state: Option<Arc<FullZipCacheableState>>,
1922 enable_cache: bool,
1924}
1925
1926impl FullZipScheduler {
1927 fn try_new(
1928 buffer_offsets_and_sizes: &[(u64, u64)],
1929 priority: u64,
1930 rows_in_page: u64,
1931 layout: &pb21::FullZipLayout,
1932 decompressors: &dyn DecompressionStrategy,
1933 ) -> Result<Self> {
1934 let (data_buf_position, _) = buffer_offsets_and_sizes[0];
1938 let rep_index = buffer_offsets_and_sizes.get(1).map(|(pos, len)| {
1939 let num_reps = rows_in_page + 1;
1940 let bytes_per_rep = len / num_reps;
1941 debug_assert_eq!(len % num_reps, 0);
1942 debug_assert!(
1943 bytes_per_rep == 1
1944 || bytes_per_rep == 2
1945 || bytes_per_rep == 4
1946 || bytes_per_rep == 8
1947 );
1948 FullZipRepIndexDetails {
1949 buf_position: *pos,
1950 bytes_per_value: bytes_per_rep,
1951 }
1952 });
1953
1954 let value_decompressor = match layout.details {
1955 Some(pb21::full_zip_layout::Details::BitsPerValue(_)) => {
1956 let decompressor = decompressors.create_fixed_per_value_decompressor(
1957 layout.value_compression.as_ref().unwrap(),
1958 )?;
1959 PerValueDecompressor::Fixed(decompressor.into())
1960 }
1961 Some(pb21::full_zip_layout::Details::BitsPerOffset(_)) => {
1962 let decompressor = decompressors.create_variable_per_value_decompressor(
1963 layout.value_compression.as_ref().unwrap(),
1964 )?;
1965 PerValueDecompressor::Variable(decompressor.into())
1966 }
1967 None => {
1968 panic!("Full-zip layout must have a `details` field");
1969 }
1970 };
1971 let ctrl_word_parser = ControlWordParser::new(
1972 layout.bits_rep.try_into().unwrap(),
1973 layout.bits_def.try_into().unwrap(),
1974 );
1975 let def_meaning = layout
1976 .layers
1977 .iter()
1978 .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
1979 .collect::<Vec<_>>();
1980
1981 let max_rep = def_meaning.iter().filter(|d| d.is_list()).count() as u16;
1982 let max_visible_def = def_meaning
1983 .iter()
1984 .filter(|d| !d.is_list())
1985 .map(|d| d.num_def_levels())
1986 .sum();
1987
1988 let bits_per_offset = match layout.details {
1989 Some(pb21::full_zip_layout::Details::BitsPerValue(_)) => 32,
1990 Some(pb21::full_zip_layout::Details::BitsPerOffset(bits_per_offset)) => {
1991 bits_per_offset as u8
1992 }
1993 None => panic!("Full-zip layout must have a `details` field"),
1994 };
1995
1996 let details = Arc::new(FullZipDecodeDetails {
1997 value_decompressor,
1998 def_meaning: def_meaning.into(),
1999 ctrl_word_parser,
2000 max_rep,
2001 max_visible_def,
2002 });
2003 Ok(Self {
2004 data_buf_position,
2005 rep_index,
2006 details,
2007 priority,
2008 rows_in_page,
2009 bits_per_offset,
2010 cached_state: None,
2011 enable_cache: false, })
2013 }
2014
2015 fn create_decoder(
2017 details: Arc<FullZipDecodeDetails>,
2018 data: VecDeque<LanceBuffer>,
2019 num_rows: u64,
2020 bits_per_offset: u8,
2021 ) -> Result<Box<dyn StructuralPageDecoder>> {
2022 match &details.value_decompressor {
2023 PerValueDecompressor::Fixed(decompressor) => {
2024 let bits_per_value = decompressor.bits_per_value();
2025 if bits_per_value == 0 {
2026 return Err(lance_core::Error::Internal {
2027 message: "Invalid encoding: bits_per_value must be greater than 0".into(),
2028 location: location!(),
2029 });
2030 }
2031 if bits_per_value % 8 != 0 {
2032 return Err(lance_core::Error::NotSupported {
2033 source: "Bit-packed full-zip encoding (non-byte-aligned values) is not yet implemented".into(),
2034 location: location!(),
2035 });
2036 }
2037 let bytes_per_value = bits_per_value / 8;
2038 let total_bytes_per_value =
2039 bytes_per_value as usize + details.ctrl_word_parser.bytes_per_word();
2040 Ok(Box::new(FixedFullZipDecoder {
2041 details,
2042 data,
2043 num_rows,
2044 offset_in_current: 0,
2045 bytes_per_value: bytes_per_value as usize,
2046 total_bytes_per_value,
2047 }) as Box<dyn StructuralPageDecoder>)
2048 }
2049 PerValueDecompressor::Variable(_decompressor) => {
2050 Ok(Box::new(VariableFullZipDecoder::new(
2051 details,
2052 data,
2053 num_rows,
2054 bits_per_offset,
2055 bits_per_offset,
2056 )))
2057 }
2058 }
2059 }
2060
2061 fn extract_byte_ranges_from_pairs(
2064 buffer: LanceBuffer,
2065 bytes_per_value: u64,
2066 data_buf_position: u64,
2067 ) -> Vec<Range<u64>> {
2068 ByteUnpacker::new(buffer, bytes_per_value as usize)
2069 .chunks(2)
2070 .into_iter()
2071 .map(|mut c| {
2072 let start = c.next().unwrap() + data_buf_position;
2073 let end = c.next().unwrap() + data_buf_position;
2074 start..end
2075 })
2076 .collect::<Vec<_>>()
2077 }
2078
2079 fn extract_byte_ranges_from_cached(
2082 buffer: &LanceBuffer,
2083 ranges: &[Range<u64>],
2084 bytes_per_value: u64,
2085 data_buf_position: u64,
2086 ) -> Vec<Range<u64>> {
2087 ranges
2088 .iter()
2089 .map(|r| {
2090 let start_offset = (r.start * bytes_per_value) as usize;
2091 let end_offset = (r.end * bytes_per_value) as usize;
2092
2093 let start_slice = &buffer[start_offset..start_offset + bytes_per_value as usize];
2094 let start_val =
2095 ByteUnpacker::new(start_slice.iter().copied(), bytes_per_value as usize)
2096 .next()
2097 .unwrap();
2098
2099 let end_slice = &buffer[end_offset..end_offset + bytes_per_value as usize];
2100 let end_val =
2101 ByteUnpacker::new(end_slice.iter().copied(), bytes_per_value as usize)
2102 .next()
2103 .unwrap();
2104
2105 (data_buf_position + start_val)..(data_buf_position + end_val)
2106 })
2107 .collect()
2108 }
2109
2110 fn compute_rep_index_ranges(
2112 ranges: &[Range<u64>],
2113 rep_index: &FullZipRepIndexDetails,
2114 ) -> Vec<Range<u64>> {
2115 ranges
2116 .iter()
2117 .flat_map(|r| {
2118 let first_val_start =
2119 rep_index.buf_position + (r.start * rep_index.bytes_per_value);
2120 let first_val_end = first_val_start + rep_index.bytes_per_value;
2121 let last_val_start = rep_index.buf_position + (r.end * rep_index.bytes_per_value);
2122 let last_val_end = last_val_start + rep_index.bytes_per_value;
2123 [first_val_start..first_val_end, last_val_start..last_val_end]
2124 })
2125 .collect()
2126 }
2127
2128 async fn resolve_byte_ranges(
2130 data_buf_position: u64,
2131 ranges: &[Range<u64>],
2132 io: &Arc<dyn EncodingsIo>,
2133 rep_index: &FullZipRepIndexDetails,
2134 cached_state: Option<&Arc<FullZipCacheableState>>,
2135 priority: u64,
2136 ) -> Result<Vec<Range<u64>>> {
2137 if let Some(cached_state) = cached_state {
2138 Ok(Self::extract_byte_ranges_from_cached(
2140 &cached_state.rep_index_buffer,
2141 ranges,
2142 rep_index.bytes_per_value,
2143 data_buf_position,
2144 ))
2145 } else {
2146 let rep_ranges = Self::compute_rep_index_ranges(ranges, rep_index);
2148 let rep_data = io.submit_request(rep_ranges, priority).await?;
2149 let rep_buffer = LanceBuffer::concat(
2150 &rep_data
2151 .into_iter()
2152 .map(|d| LanceBuffer::from_bytes(d, 1))
2153 .collect::<Vec<_>>(),
2154 );
2155 Ok(Self::extract_byte_ranges_from_pairs(
2156 rep_buffer,
2157 rep_index.bytes_per_value,
2158 data_buf_position,
2159 ))
2160 }
2161 }
2162
2163 fn schedule_ranges_rep(
2165 &self,
2166 ranges: &[Range<u64>],
2167 io: &Arc<dyn EncodingsIo>,
2168 rep_index: FullZipRepIndexDetails,
2169 ) -> Result<Vec<PageLoadTask>> {
2170 let data_buf_position = self.data_buf_position;
2172 let cached_state = self.cached_state.clone();
2173 let priority = self.priority;
2174 let details = self.details.clone();
2175 let bits_per_offset = self.bits_per_offset;
2176 let ranges = ranges.to_vec();
2177 let io_clone = io.clone();
2178 let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
2179
2180 let load_task = async move {
2181 let byte_ranges = Self::resolve_byte_ranges(
2183 data_buf_position,
2184 &ranges,
2185 &io_clone,
2186 &rep_index,
2187 cached_state.as_ref(),
2188 priority,
2189 )
2190 .await?;
2191
2192 let data = io_clone.submit_request(byte_ranges, priority).await?;
2194 let data = data
2195 .into_iter()
2196 .map(|d| LanceBuffer::from_bytes(d, 1))
2197 .collect::<VecDeque<_>>();
2198
2199 let num_rows: u64 = ranges.iter().map(|r| r.end - r.start).sum();
2201
2202 Self::create_decoder(details, data, num_rows, bits_per_offset)
2204 }
2205 .boxed();
2206 let page_load_task = PageLoadTask {
2207 decoder_fut: load_task,
2208 num_rows,
2209 };
2210 Ok(vec![page_load_task])
2211 }
2212
2213 fn schedule_ranges_simple(
2217 &self,
2218 ranges: &[Range<u64>],
2219 io: &dyn EncodingsIo,
2220 ) -> Result<Vec<PageLoadTask>> {
2221 let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
2223
2224 let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor else {
2225 unreachable!()
2226 };
2227
2228 let bits_per_value = decompressor.bits_per_value();
2230 assert_eq!(bits_per_value % 8, 0);
2231 let bytes_per_value = bits_per_value / 8;
2232 let bytes_per_cw = self.details.ctrl_word_parser.bytes_per_word();
2233 let total_bytes_per_value = bytes_per_value + bytes_per_cw as u64;
2234 let byte_ranges = ranges.iter().map(|r| {
2235 debug_assert!(r.end <= self.rows_in_page);
2236 let start = self.data_buf_position + r.start * total_bytes_per_value;
2237 let end = self.data_buf_position + r.end * total_bytes_per_value;
2238 start..end
2239 });
2240
2241 let data = io.submit_request(byte_ranges.collect(), self.priority);
2243
2244 let details = self.details.clone();
2245
2246 let load_task = async move {
2247 let data = data.await?;
2248 let data = data
2249 .into_iter()
2250 .map(|d| LanceBuffer::from_bytes(d, 1))
2251 .collect();
2252 Ok(Box::new(FixedFullZipDecoder {
2253 details,
2254 data,
2255 num_rows,
2256 offset_in_current: 0,
2257 bytes_per_value: bytes_per_value as usize,
2258 total_bytes_per_value: total_bytes_per_value as usize,
2259 }) as Box<dyn StructuralPageDecoder>)
2260 }
2261 .boxed();
2262 let page_load_task = PageLoadTask {
2263 decoder_fut: load_task,
2264 num_rows,
2265 };
2266 Ok(vec![page_load_task])
2267 }
2268}
2269
2270#[derive(Debug)]
2272struct FullZipCacheableState {
2273 rep_index_buffer: LanceBuffer,
2275}
2276
2277impl DeepSizeOf for FullZipCacheableState {
2278 fn deep_size_of_children(&self, _context: &mut Context) -> usize {
2279 self.rep_index_buffer.len()
2280 }
2281}
2282
2283impl CachedPageData for FullZipCacheableState {
2284 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
2285 self
2286 }
2287}
2288
2289impl StructuralPageScheduler for FullZipScheduler {
2290 fn initialize<'a>(
2293 &'a mut self,
2294 io: &Arc<dyn EncodingsIo>,
2295 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
2296 if self.enable_cache && self.rep_index.is_some() {
2298 let rep_index = self.rep_index.as_ref().unwrap();
2299 let total_size = (self.rows_in_page + 1) * rep_index.bytes_per_value;
2301 let rep_index_range = rep_index.buf_position..(rep_index.buf_position + total_size);
2302
2303 let io_clone = io.clone();
2305 let future = async move {
2306 let rep_index_data = io_clone.submit_request(vec![rep_index_range], 0).await?;
2307 let rep_index_buffer = LanceBuffer::from_bytes(rep_index_data[0].clone(), 1);
2308
2309 Ok(Arc::new(FullZipCacheableState { rep_index_buffer }) as Arc<dyn CachedPageData>)
2311 };
2312
2313 future.boxed()
2314 } else {
2315 std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
2317 }
2318 }
2319
2320 fn load(&mut self, cache: &Arc<dyn CachedPageData>) {
2324 if let Ok(cached_state) = cache
2326 .clone()
2327 .as_arc_any()
2328 .downcast::<FullZipCacheableState>()
2329 {
2330 self.cached_state = Some(cached_state);
2332 }
2333 }
2334
2335 fn schedule_ranges(
2336 &self,
2337 ranges: &[Range<u64>],
2338 io: &Arc<dyn EncodingsIo>,
2339 ) -> Result<Vec<PageLoadTask>> {
2340 if let Some(rep_index) = self.rep_index {
2341 self.schedule_ranges_rep(ranges, io, rep_index)
2342 } else {
2343 self.schedule_ranges_simple(ranges, io.as_ref())
2344 }
2345 }
2346}
2347
2348#[derive(Debug)]
2356struct FixedFullZipDecoder {
2357 details: Arc<FullZipDecodeDetails>,
2358 data: VecDeque<LanceBuffer>,
2359 offset_in_current: usize,
2360 bytes_per_value: usize,
2361 total_bytes_per_value: usize,
2362 num_rows: u64,
2363}
2364
2365impl FixedFullZipDecoder {
2366 fn slice_next_task(&mut self, num_rows: u64) -> FullZipDecodeTaskItem {
2367 debug_assert!(num_rows > 0);
2368 let cur_buf = self.data.front_mut().unwrap();
2369 let start = self.offset_in_current;
2370 if self.details.ctrl_word_parser.has_rep() {
2371 let mut rows_started = 0;
2374 let mut num_items = 0;
2377 while self.offset_in_current < cur_buf.len() {
2378 let control = self.details.ctrl_word_parser.parse_desc(
2379 &cur_buf[self.offset_in_current..],
2380 self.details.max_rep,
2381 self.details.max_visible_def,
2382 );
2383 if control.is_new_row {
2384 if rows_started == num_rows {
2385 break;
2386 }
2387 rows_started += 1;
2388 }
2389 num_items += 1;
2390 if control.is_visible {
2391 self.offset_in_current += self.total_bytes_per_value;
2392 } else {
2393 self.offset_in_current += self.details.ctrl_word_parser.bytes_per_word();
2394 }
2395 }
2396
2397 let task_slice = cur_buf.slice_with_length(start, self.offset_in_current - start);
2398 if self.offset_in_current == cur_buf.len() {
2399 self.data.pop_front();
2400 self.offset_in_current = 0;
2401 }
2402
2403 FullZipDecodeTaskItem {
2404 data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2405 data: task_slice,
2406 bits_per_value: self.bytes_per_value as u64 * 8,
2407 num_values: num_items,
2408 block_info: BlockInfo::new(),
2409 }),
2410 rows_in_buf: rows_started,
2411 }
2412 } else {
2413 let cur_buf = self.data.front_mut().unwrap();
2416 let bytes_avail = cur_buf.len() - self.offset_in_current;
2417 let offset_in_cur = self.offset_in_current;
2418
2419 let bytes_needed = num_rows as usize * self.total_bytes_per_value;
2420 let mut rows_taken = num_rows;
2421 let task_slice = if bytes_needed >= bytes_avail {
2422 self.offset_in_current = 0;
2423 rows_taken = bytes_avail as u64 / self.total_bytes_per_value as u64;
2424 self.data
2425 .pop_front()
2426 .unwrap()
2427 .slice_with_length(offset_in_cur, bytes_avail)
2428 } else {
2429 self.offset_in_current += bytes_needed;
2430 cur_buf.slice_with_length(offset_in_cur, bytes_needed)
2431 };
2432 FullZipDecodeTaskItem {
2433 data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2434 data: task_slice,
2435 bits_per_value: self.bytes_per_value as u64 * 8,
2436 num_values: rows_taken,
2437 block_info: BlockInfo::new(),
2438 }),
2439 rows_in_buf: rows_taken,
2440 }
2441 }
2442 }
2443}
2444
2445impl StructuralPageDecoder for FixedFullZipDecoder {
2446 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2447 let mut task_data = Vec::with_capacity(self.data.len());
2448 let mut remaining = num_rows;
2449 while remaining > 0 {
2450 let task_item = self.slice_next_task(remaining);
2451 remaining -= task_item.rows_in_buf;
2452 task_data.push(task_item);
2453 }
2454 Ok(Box::new(FixedFullZipDecodeTask {
2455 details: self.details.clone(),
2456 data: task_data,
2457 bytes_per_value: self.bytes_per_value,
2458 num_rows: num_rows as usize,
2459 }))
2460 }
2461
2462 fn num_rows(&self) -> u64 {
2463 self.num_rows
2464 }
2465}
2466
2467#[derive(Debug)]
2472struct VariableFullZipDecoder {
2473 details: Arc<FullZipDecodeDetails>,
2474 decompressor: Arc<dyn VariablePerValueDecompressor>,
2475 data: LanceBuffer,
2476 offsets: LanceBuffer,
2477 rep: ScalarBuffer<u16>,
2478 def: ScalarBuffer<u16>,
2479 repdef_starts: Vec<usize>,
2480 data_starts: Vec<usize>,
2481 offset_starts: Vec<usize>,
2482 visible_item_counts: Vec<u64>,
2483 bits_per_offset: u8,
2484 current_idx: usize,
2485 num_rows: u64,
2486}
2487
2488impl VariableFullZipDecoder {
2489 fn new(
2490 details: Arc<FullZipDecodeDetails>,
2491 data: VecDeque<LanceBuffer>,
2492 num_rows: u64,
2493 in_bits_per_length: u8,
2494 out_bits_per_offset: u8,
2495 ) -> Self {
2496 let decompressor = match details.value_decompressor {
2497 PerValueDecompressor::Variable(ref d) => d.clone(),
2498 _ => unreachable!(),
2499 };
2500
2501 assert_eq!(in_bits_per_length % 8, 0);
2502 assert!(out_bits_per_offset == 32 || out_bits_per_offset == 64);
2503
2504 let mut decoder = Self {
2505 details,
2506 decompressor,
2507 data: LanceBuffer::empty(),
2508 offsets: LanceBuffer::empty(),
2509 rep: LanceBuffer::empty().borrow_to_typed_slice(),
2510 def: LanceBuffer::empty().borrow_to_typed_slice(),
2511 bits_per_offset: out_bits_per_offset,
2512 repdef_starts: Vec::with_capacity(num_rows as usize + 1),
2513 data_starts: Vec::with_capacity(num_rows as usize + 1),
2514 offset_starts: Vec::with_capacity(num_rows as usize + 1),
2515 visible_item_counts: Vec::with_capacity(num_rows as usize + 1),
2516 current_idx: 0,
2517 num_rows,
2518 };
2519
2520 decoder.unzip(data, in_bits_per_length, out_bits_per_offset, num_rows);
2541
2542 decoder
2543 }
2544
2545 unsafe fn parse_length(data: &[u8], bits_per_offset: u8) -> u64 {
2546 match bits_per_offset {
2547 8 => *data.get_unchecked(0) as u64,
2548 16 => u16::from_le_bytes([*data.get_unchecked(0), *data.get_unchecked(1)]) as u64,
2549 32 => u32::from_le_bytes([
2550 *data.get_unchecked(0),
2551 *data.get_unchecked(1),
2552 *data.get_unchecked(2),
2553 *data.get_unchecked(3),
2554 ]) as u64,
2555 64 => u64::from_le_bytes([
2556 *data.get_unchecked(0),
2557 *data.get_unchecked(1),
2558 *data.get_unchecked(2),
2559 *data.get_unchecked(3),
2560 *data.get_unchecked(4),
2561 *data.get_unchecked(5),
2562 *data.get_unchecked(6),
2563 *data.get_unchecked(7),
2564 ]),
2565 _ => unreachable!(),
2566 }
2567 }
2568
2569 fn unzip(
2570 &mut self,
2571 data: VecDeque<LanceBuffer>,
2572 in_bits_per_length: u8,
2573 out_bits_per_offset: u8,
2574 num_rows: u64,
2575 ) {
2576 let mut rep = Vec::with_capacity(num_rows as usize);
2578 let mut def = Vec::with_capacity(num_rows as usize);
2579 let bytes_cw = self.details.ctrl_word_parser.bytes_per_word() * num_rows as usize;
2580
2581 let bytes_per_offset = out_bits_per_offset as usize / 8;
2584 let bytes_offsets = bytes_per_offset * (num_rows as usize + 1);
2585 let mut offsets_data = Vec::with_capacity(bytes_offsets);
2586
2587 let bytes_per_length = in_bits_per_length as usize / 8;
2588 let bytes_lengths = bytes_per_length * num_rows as usize;
2589
2590 let bytes_data = data.iter().map(|d| d.len()).sum::<usize>();
2591 let mut unzipped_data =
2594 Vec::with_capacity((bytes_data - bytes_cw).saturating_sub(bytes_lengths));
2595
2596 let mut current_offset = 0_u64;
2597 let mut visible_item_count = 0_u64;
2598 for databuf in data.into_iter() {
2599 let mut databuf = databuf.as_ref();
2600 while !databuf.is_empty() {
2601 let data_start = unzipped_data.len();
2602 let offset_start = offsets_data.len();
2603 let repdef_start = rep.len().max(def.len());
2606 let ctrl_desc = self.details.ctrl_word_parser.parse_desc(
2608 databuf,
2609 self.details.max_rep,
2610 self.details.max_visible_def,
2611 );
2612 self.details
2613 .ctrl_word_parser
2614 .parse(databuf, &mut rep, &mut def);
2615 databuf = &databuf[self.details.ctrl_word_parser.bytes_per_word()..];
2616
2617 if ctrl_desc.is_new_row {
2618 self.repdef_starts.push(repdef_start);
2619 self.data_starts.push(data_start);
2620 self.offset_starts.push(offset_start);
2621 self.visible_item_counts.push(visible_item_count);
2622 }
2623 if ctrl_desc.is_visible {
2624 visible_item_count += 1;
2625 if ctrl_desc.is_valid_item {
2626 debug_assert!(databuf.len() >= bytes_per_length);
2628 let length = unsafe { Self::parse_length(databuf, in_bits_per_length) };
2629 match out_bits_per_offset {
2630 32 => offsets_data
2631 .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2632 64 => offsets_data.extend_from_slice(¤t_offset.to_le_bytes()),
2633 _ => unreachable!(),
2634 };
2635 databuf = &databuf[bytes_per_offset..];
2636 unzipped_data.extend_from_slice(&databuf[..length as usize]);
2637 databuf = &databuf[length as usize..];
2638 current_offset += length;
2639 } else {
2640 match out_bits_per_offset {
2642 32 => offsets_data
2643 .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2644 64 => offsets_data.extend_from_slice(¤t_offset.to_le_bytes()),
2645 _ => unreachable!(),
2646 }
2647 }
2648 }
2649 }
2650 }
2651 self.repdef_starts.push(rep.len().max(def.len()));
2652 self.data_starts.push(unzipped_data.len());
2653 self.offset_starts.push(offsets_data.len());
2654 self.visible_item_counts.push(visible_item_count);
2655 match out_bits_per_offset {
2656 32 => offsets_data.extend_from_slice(&(current_offset as u32).to_le_bytes()),
2657 64 => offsets_data.extend_from_slice(¤t_offset.to_le_bytes()),
2658 _ => unreachable!(),
2659 };
2660 self.rep = ScalarBuffer::from(rep);
2661 self.def = ScalarBuffer::from(def);
2662 self.data = LanceBuffer::from(unzipped_data);
2663 self.offsets = LanceBuffer::from(offsets_data);
2664 }
2665}
2666
2667impl StructuralPageDecoder for VariableFullZipDecoder {
2668 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2669 let start = self.current_idx;
2670 let end = start + num_rows as usize;
2671
2672 let data = self.data.clone();
2680
2681 let offset_start = self.offset_starts[start];
2682 let offset_end = self.offset_starts[end] + (self.bits_per_offset as usize / 8);
2683 let offsets = self
2684 .offsets
2685 .slice_with_length(offset_start, offset_end - offset_start);
2686
2687 let repdef_start = self.repdef_starts[start];
2688 let repdef_end = self.repdef_starts[end];
2689 let rep = if self.rep.is_empty() {
2690 self.rep.clone()
2691 } else {
2692 self.rep.slice(repdef_start, repdef_end - repdef_start)
2693 };
2694 let def = if self.def.is_empty() {
2695 self.def.clone()
2696 } else {
2697 self.def.slice(repdef_start, repdef_end - repdef_start)
2698 };
2699
2700 let visible_item_counts_start = self.visible_item_counts[start];
2701 let visible_item_counts_end = self.visible_item_counts[end];
2702 let num_visible_items = visible_item_counts_end - visible_item_counts_start;
2703
2704 self.current_idx += num_rows as usize;
2705
2706 Ok(Box::new(VariableFullZipDecodeTask {
2707 details: self.details.clone(),
2708 decompressor: self.decompressor.clone(),
2709 data,
2710 offsets,
2711 bits_per_offset: self.bits_per_offset,
2712 num_visible_items,
2713 rep,
2714 def,
2715 }))
2716 }
2717
2718 fn num_rows(&self) -> u64 {
2719 self.num_rows
2720 }
2721}
2722
2723#[derive(Debug)]
2724struct VariableFullZipDecodeTask {
2725 details: Arc<FullZipDecodeDetails>,
2726 decompressor: Arc<dyn VariablePerValueDecompressor>,
2727 data: LanceBuffer,
2728 offsets: LanceBuffer,
2729 bits_per_offset: u8,
2730 num_visible_items: u64,
2731 rep: ScalarBuffer<u16>,
2732 def: ScalarBuffer<u16>,
2733}
2734
2735impl DecodePageTask for VariableFullZipDecodeTask {
2736 fn decode(self: Box<Self>) -> Result<DecodedPage> {
2737 let block = VariableWidthBlock {
2738 data: self.data,
2739 offsets: self.offsets,
2740 bits_per_offset: self.bits_per_offset,
2741 num_values: self.num_visible_items,
2742 block_info: BlockInfo::new(),
2743 };
2744 let decomopressed = self.decompressor.decompress(block)?;
2745 let rep = if self.rep.is_empty() {
2746 None
2747 } else {
2748 Some(self.rep.to_vec())
2749 };
2750 let def = if self.def.is_empty() {
2751 None
2752 } else {
2753 Some(self.def.to_vec())
2754 };
2755 let unraveler = RepDefUnraveler::new(
2756 rep,
2757 def,
2758 self.details.def_meaning.clone(),
2759 self.num_visible_items,
2760 );
2761 Ok(DecodedPage {
2762 data: decomopressed,
2763 repdef: unraveler,
2764 })
2765 }
2766}
2767
2768#[derive(Debug)]
2769struct FullZipDecodeTaskItem {
2770 data: PerValueDataBlock,
2771 rows_in_buf: u64,
2772}
2773
2774#[derive(Debug)]
2777struct FixedFullZipDecodeTask {
2778 details: Arc<FullZipDecodeDetails>,
2779 data: Vec<FullZipDecodeTaskItem>,
2780 num_rows: usize,
2781 bytes_per_value: usize,
2782}
2783
2784impl DecodePageTask for FixedFullZipDecodeTask {
2785 fn decode(self: Box<Self>) -> Result<DecodedPage> {
2786 let estimated_size_bytes = self
2788 .data
2789 .iter()
2790 .map(|task_item| task_item.data.data_size() as usize)
2791 .sum::<usize>()
2792 * 2;
2793 let mut data_builder =
2794 DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
2795
2796 if self.details.ctrl_word_parser.bytes_per_word() == 0 {
2797 for task_item in self.data.into_iter() {
2801 let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
2802 unreachable!()
2803 };
2804 let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
2805 else {
2806 unreachable!()
2807 };
2808 debug_assert_eq!(fixed_data.num_values, task_item.rows_in_buf);
2809 let decompressed = decompressor.decompress(fixed_data, task_item.rows_in_buf)?;
2810 data_builder.append(&decompressed, 0..task_item.rows_in_buf);
2811 }
2812
2813 let unraveler = RepDefUnraveler::new(
2814 None,
2815 None,
2816 self.details.def_meaning.clone(),
2817 self.num_rows as u64,
2818 );
2819
2820 Ok(DecodedPage {
2821 data: data_builder.finish(),
2822 repdef: unraveler,
2823 })
2824 } else {
2825 let mut rep = Vec::with_capacity(self.num_rows);
2827 let mut def = Vec::with_capacity(self.num_rows);
2828
2829 for task_item in self.data.into_iter() {
2830 let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
2831 unreachable!()
2832 };
2833 let mut buf_slice = fixed_data.data.as_ref();
2834 let num_values = fixed_data.num_values as usize;
2835 let mut values = Vec::with_capacity(
2838 fixed_data.data.len()
2839 - (self.details.ctrl_word_parser.bytes_per_word() * num_values),
2840 );
2841 let mut visible_items = 0;
2842 for _ in 0..num_values {
2843 self.details
2845 .ctrl_word_parser
2846 .parse(buf_slice, &mut rep, &mut def);
2847 buf_slice = &buf_slice[self.details.ctrl_word_parser.bytes_per_word()..];
2848
2849 let is_visible = def
2850 .last()
2851 .map(|d| *d <= self.details.max_visible_def)
2852 .unwrap_or(true);
2853 if is_visible {
2854 values.extend_from_slice(buf_slice[..self.bytes_per_value].as_ref());
2856 buf_slice = &buf_slice[self.bytes_per_value..];
2857 visible_items += 1;
2858 }
2859 }
2860
2861 let values_buf = LanceBuffer::from(values);
2863 let fixed_data = FixedWidthDataBlock {
2864 bits_per_value: self.bytes_per_value as u64 * 8,
2865 block_info: BlockInfo::new(),
2866 data: values_buf,
2867 num_values: visible_items,
2868 };
2869 let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
2870 else {
2871 unreachable!()
2872 };
2873 let decompressed = decompressor.decompress(fixed_data, visible_items)?;
2874 data_builder.append(&decompressed, 0..visible_items);
2875 }
2876
2877 let repetition = if rep.is_empty() { None } else { Some(rep) };
2878 let definition = if def.is_empty() { None } else { Some(def) };
2879
2880 let unraveler = RepDefUnraveler::new(
2881 repetition,
2882 definition,
2883 self.details.def_meaning.clone(),
2884 self.num_rows as u64,
2885 );
2886 let data = data_builder.finish();
2887
2888 Ok(DecodedPage {
2889 data,
2890 repdef: unraveler,
2891 })
2892 }
2893 }
2894}
2895
2896#[derive(Debug)]
2897struct StructuralPrimitiveFieldSchedulingJob<'a> {
2898 scheduler: &'a StructuralPrimitiveFieldScheduler,
2899 ranges: Vec<Range<u64>>,
2900 page_idx: usize,
2901 range_idx: usize,
2902 global_row_offset: u64,
2903}
2904
2905impl<'a> StructuralPrimitiveFieldSchedulingJob<'a> {
2906 pub fn new(scheduler: &'a StructuralPrimitiveFieldScheduler, ranges: Vec<Range<u64>>) -> Self {
2907 Self {
2908 scheduler,
2909 ranges,
2910 page_idx: 0,
2911 range_idx: 0,
2912 global_row_offset: 0,
2913 }
2914 }
2915}
2916
2917impl StructuralSchedulingJob for StructuralPrimitiveFieldSchedulingJob<'_> {
2918 fn schedule_next(&mut self, context: &mut SchedulerContext) -> Result<Vec<ScheduledScanLine>> {
2919 if self.range_idx >= self.ranges.len() {
2920 return Ok(Vec::new());
2921 }
2922 let mut range = self.ranges[self.range_idx].clone();
2924 let priority = range.start;
2925
2926 let mut cur_page = &self.scheduler.page_schedulers[self.page_idx];
2927 trace!(
2928 "Current range is {:?} and current page has {} rows",
2929 range,
2930 cur_page.num_rows
2931 );
2932 while cur_page.num_rows + self.global_row_offset <= range.start {
2934 self.global_row_offset += cur_page.num_rows;
2935 self.page_idx += 1;
2936 trace!("Skipping entire page of {} rows", cur_page.num_rows);
2937 cur_page = &self.scheduler.page_schedulers[self.page_idx];
2938 }
2939
2940 let mut ranges_in_page = Vec::new();
2944 while cur_page.num_rows + self.global_row_offset > range.start {
2945 range.start = range.start.max(self.global_row_offset);
2946 let start_in_page = range.start - self.global_row_offset;
2947 let end_in_page = start_in_page + (range.end - range.start);
2948 let end_in_page = end_in_page.min(cur_page.num_rows);
2949 let last_in_range = (end_in_page + self.global_row_offset) >= range.end;
2950
2951 ranges_in_page.push(start_in_page..end_in_page);
2952 if last_in_range {
2953 self.range_idx += 1;
2954 if self.range_idx == self.ranges.len() {
2955 break;
2956 }
2957 range = self.ranges[self.range_idx].clone();
2958 } else {
2959 break;
2960 }
2961 }
2962
2963 trace!(
2964 "Scheduling {} rows across {} ranges from page with {} rows (priority={}, column_index={}, page_index={})",
2965 ranges_in_page.iter().map(|r| r.end - r.start).sum::<u64>(),
2966 ranges_in_page.len(),
2967 cur_page.num_rows,
2968 priority,
2969 self.scheduler.column_index,
2970 cur_page.page_index,
2971 );
2972
2973 self.global_row_offset += cur_page.num_rows;
2974 self.page_idx += 1;
2975
2976 let page_decoders = cur_page
2977 .scheduler
2978 .schedule_ranges(&ranges_in_page, context.io())?;
2979
2980 let cur_path = context.current_path();
2981 page_decoders
2982 .into_iter()
2983 .map(|page_load_task| {
2984 let cur_path = cur_path.clone();
2985 let page_decoder = page_load_task.decoder_fut;
2986 let unloaded_page = async move {
2987 let page_decoder = page_decoder.await?;
2988 Ok(LoadedPageShard {
2989 decoder: page_decoder,
2990 path: cur_path,
2991 })
2992 }
2993 .boxed();
2994 Ok(ScheduledScanLine {
2995 decoders: vec![MessageType::UnloadedPage(UnloadedPageShard(unloaded_page))],
2996 rows_scheduled: page_load_task.num_rows,
2997 })
2998 })
2999 .collect::<Result<Vec<_>>>()
3000 }
3001}
3002
3003#[derive(Debug)]
3004struct PageInfoAndScheduler {
3005 page_index: usize,
3006 num_rows: u64,
3007 scheduler: Box<dyn StructuralPageScheduler>,
3008}
3009
3010#[derive(Debug)]
3015pub struct StructuralPrimitiveFieldScheduler {
3016 page_schedulers: Vec<PageInfoAndScheduler>,
3017 column_index: u32,
3018}
3019
3020impl StructuralPrimitiveFieldScheduler {
3021 pub fn try_new(
3022 column_info: &ColumnInfo,
3023 decompressors: &dyn DecompressionStrategy,
3024 cache_repetition_index: bool,
3025 target_field: &Field,
3026 ) -> Result<Self> {
3027 let page_schedulers = column_info
3028 .page_infos
3029 .iter()
3030 .enumerate()
3031 .map(|(page_index, page_info)| {
3032 Self::page_info_to_scheduler(
3033 page_info,
3034 page_index,
3035 decompressors,
3036 cache_repetition_index,
3037 target_field,
3038 )
3039 })
3040 .collect::<Result<Vec<_>>>()?;
3041 Ok(Self {
3042 page_schedulers,
3043 column_index: column_info.index,
3044 })
3045 }
3046
3047 fn page_layout_to_scheduler(
3048 page_info: &PageInfo,
3049 page_layout: &PageLayout,
3050 decompressors: &dyn DecompressionStrategy,
3051 cache_repetition_index: bool,
3052 target_field: &Field,
3053 ) -> Result<Box<dyn StructuralPageScheduler>> {
3054 use pb21::page_layout::Layout;
3055 Ok(match page_layout.layout.as_ref().expect_ok()? {
3056 Layout::MiniBlockLayout(mini_block) => Box::new(MiniBlockScheduler::try_new(
3057 &page_info.buffer_offsets_and_sizes,
3058 page_info.priority,
3059 mini_block.num_items,
3060 mini_block,
3061 decompressors,
3062 )?),
3063 Layout::FullZipLayout(full_zip) => {
3064 let mut scheduler = FullZipScheduler::try_new(
3065 &page_info.buffer_offsets_and_sizes,
3066 page_info.priority,
3067 page_info.num_rows,
3068 full_zip,
3069 decompressors,
3070 )?;
3071 scheduler.enable_cache = cache_repetition_index;
3072 Box::new(scheduler)
3073 }
3074 Layout::AllNullLayout(all_null) => {
3075 let def_meaning = all_null
3076 .layers
3077 .iter()
3078 .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
3079 .collect::<Vec<_>>();
3080 if def_meaning.len() == 1
3081 && def_meaning[0] == DefinitionInterpretation::NullableItem
3082 {
3083 Box::new(SimpleAllNullScheduler::default()) as Box<dyn StructuralPageScheduler>
3084 } else {
3085 Box::new(ComplexAllNullScheduler::new(
3086 page_info.buffer_offsets_and_sizes.clone(),
3087 def_meaning.into(),
3088 )) as Box<dyn StructuralPageScheduler>
3089 }
3090 }
3091 Layout::BlobLayout(blob) => {
3092 let inner_scheduler = Self::page_layout_to_scheduler(
3093 page_info,
3094 blob.inner_layout.as_ref().expect_ok()?.as_ref(),
3095 decompressors,
3096 cache_repetition_index,
3097 target_field,
3098 )?;
3099 let def_meaning = blob
3100 .layers
3101 .iter()
3102 .map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
3103 .collect::<Vec<_>>();
3104 if matches!(target_field.data_type(), DataType::Struct(_)) {
3105 Box::new(BlobDescriptionPageScheduler::new(
3107 inner_scheduler,
3108 def_meaning.into(),
3109 ))
3110 } else {
3111 Box::new(BlobPageScheduler::new(
3113 inner_scheduler,
3114 page_info.priority,
3115 page_info.num_rows,
3116 def_meaning.into(),
3117 ))
3118 }
3119 }
3120 })
3121 }
3122
3123 fn page_info_to_scheduler(
3124 page_info: &PageInfo,
3125 page_index: usize,
3126 decompressors: &dyn DecompressionStrategy,
3127 cache_repetition_index: bool,
3128 target_field: &Field,
3129 ) -> Result<PageInfoAndScheduler> {
3130 let page_layout = page_info.encoding.as_structural();
3131 let scheduler = Self::page_layout_to_scheduler(
3132 page_info,
3133 page_layout,
3134 decompressors,
3135 cache_repetition_index,
3136 target_field,
3137 )?;
3138 Ok(PageInfoAndScheduler {
3139 page_index,
3140 num_rows: page_info.num_rows,
3141 scheduler,
3142 })
3143 }
3144}
3145
3146pub trait CachedPageData: Any + Send + Sync + DeepSizeOf + 'static {
3147 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static>;
3148}
3149
3150pub struct NoCachedPageData;
3151
3152impl DeepSizeOf for NoCachedPageData {
3153 fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
3154 0
3155 }
3156}
3157impl CachedPageData for NoCachedPageData {
3158 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
3159 self
3160 }
3161}
3162
3163pub struct CachedFieldData {
3164 pages: Vec<Arc<dyn CachedPageData>>,
3165}
3166
3167impl DeepSizeOf for CachedFieldData {
3168 fn deep_size_of_children(&self, ctx: &mut Context) -> usize {
3169 self.pages.deep_size_of_children(ctx)
3170 }
3171}
3172
3173#[derive(Debug, Clone)]
3175pub struct FieldDataCacheKey {
3176 pub column_index: u32,
3177}
3178
3179impl CacheKey for FieldDataCacheKey {
3180 type ValueType = CachedFieldData;
3181
3182 fn key(&self) -> std::borrow::Cow<'_, str> {
3183 self.column_index.to_string().into()
3184 }
3185}
3186
3187impl StructuralFieldScheduler for StructuralPrimitiveFieldScheduler {
3188 fn initialize<'a>(
3189 &'a mut self,
3190 _filter: &'a FilterExpression,
3191 context: &'a SchedulerContext,
3192 ) -> BoxFuture<'a, Result<()>> {
3193 let cache_key = FieldDataCacheKey {
3194 column_index: self.column_index,
3195 };
3196 let cache = context.cache().clone();
3197
3198 async move {
3199 if let Some(cached_data) = cache.get_with_key(&cache_key).await {
3200 self.page_schedulers
3201 .iter_mut()
3202 .zip(cached_data.pages.iter())
3203 .for_each(|(page_scheduler, cached_data)| {
3204 page_scheduler.scheduler.load(cached_data);
3205 });
3206 return Ok(());
3207 }
3208
3209 let page_data = self
3210 .page_schedulers
3211 .iter_mut()
3212 .map(|s| s.scheduler.initialize(context.io()))
3213 .collect::<FuturesOrdered<_>>();
3214
3215 let page_data = page_data.try_collect::<Vec<_>>().await?;
3216 let cached_data = Arc::new(CachedFieldData { pages: page_data });
3217 cache.insert_with_key(&cache_key, cached_data).await;
3218 Ok(())
3219 }
3220 .boxed()
3221 }
3222
3223 fn schedule_ranges<'a>(
3224 &'a self,
3225 ranges: &[Range<u64>],
3226 _filter: &FilterExpression,
3227 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
3228 let ranges = ranges.to_vec();
3229 Ok(Box::new(StructuralPrimitiveFieldSchedulingJob::new(
3230 self, ranges,
3231 )))
3232 }
3233}
3234
3235#[derive(Debug)]
3238pub struct StructuralCompositeDecodeArrayTask {
3239 tasks: Vec<Box<dyn DecodePageTask>>,
3240 should_validate: bool,
3241 data_type: DataType,
3242}
3243
3244impl StructuralCompositeDecodeArrayTask {
3245 fn restore_validity(
3246 array: Arc<dyn Array>,
3247 unraveler: &mut CompositeRepDefUnraveler,
3248 ) -> Arc<dyn Array> {
3249 let validity = unraveler.unravel_validity(array.len());
3250 let Some(validity) = validity else {
3251 return array;
3252 };
3253 if array.data_type() == &DataType::Null {
3254 return array;
3256 }
3257 assert_eq!(validity.len(), array.len());
3258 make_array(unsafe {
3261 array
3262 .to_data()
3263 .into_builder()
3264 .nulls(Some(validity))
3265 .build_unchecked()
3266 })
3267 }
3268}
3269
3270impl StructuralDecodeArrayTask for StructuralCompositeDecodeArrayTask {
3271 fn decode(self: Box<Self>) -> Result<DecodedArray> {
3272 let mut arrays = Vec::with_capacity(self.tasks.len());
3273 let mut unravelers = Vec::with_capacity(self.tasks.len());
3274 for task in self.tasks {
3275 let decoded = task.decode()?;
3276 unravelers.push(decoded.repdef);
3277
3278 let array = make_array(
3279 decoded
3280 .data
3281 .into_arrow(self.data_type.clone(), self.should_validate)?,
3282 );
3283
3284 arrays.push(array);
3285 }
3286 let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
3287 let array = arrow_select::concat::concat(&array_refs)?;
3288 let mut repdef = CompositeRepDefUnraveler::new(unravelers);
3289
3290 let array = Self::restore_validity(array, &mut repdef);
3291
3292 Ok(DecodedArray { array, repdef })
3293 }
3294}
3295
3296#[derive(Debug)]
3297pub struct StructuralPrimitiveFieldDecoder {
3298 field: Arc<ArrowField>,
3299 page_decoders: VecDeque<Box<dyn StructuralPageDecoder>>,
3300 should_validate: bool,
3301 rows_drained_in_current: u64,
3302}
3303
3304impl StructuralPrimitiveFieldDecoder {
3305 pub fn new(field: &Arc<ArrowField>, should_validate: bool) -> Self {
3306 Self {
3307 field: field.clone(),
3308 page_decoders: VecDeque::new(),
3309 should_validate,
3310 rows_drained_in_current: 0,
3311 }
3312 }
3313}
3314
3315impl StructuralFieldDecoder for StructuralPrimitiveFieldDecoder {
3316 fn accept_page(&mut self, child: LoadedPageShard) -> Result<()> {
3317 assert!(child.path.is_empty());
3318 self.page_decoders.push_back(child.decoder);
3319 Ok(())
3320 }
3321
3322 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
3323 let mut remaining = num_rows;
3324 let mut tasks = Vec::new();
3325 while remaining > 0 {
3326 let cur_page = self.page_decoders.front_mut().unwrap();
3327 let num_in_page = cur_page.num_rows() - self.rows_drained_in_current;
3328 let to_take = num_in_page.min(remaining);
3329
3330 let task = cur_page.drain(to_take)?;
3331 tasks.push(task);
3332
3333 if to_take == num_in_page {
3334 self.page_decoders.pop_front();
3335 self.rows_drained_in_current = 0;
3336 } else {
3337 self.rows_drained_in_current += to_take;
3338 }
3339
3340 remaining -= to_take;
3341 }
3342 Ok(Box::new(StructuralCompositeDecodeArrayTask {
3343 tasks,
3344 should_validate: self.should_validate,
3345 data_type: self.field.data_type().clone(),
3346 }))
3347 }
3348
3349 fn data_type(&self) -> &DataType {
3350 self.field.data_type()
3351 }
3352}
3353
3354struct SerializedFullZip {
3356 values: LanceBuffer,
3358 repetition_index: Option<LanceBuffer>,
3360}
3361
3362const MINIBLOCK_ALIGNMENT: usize = 8;
3382
3383pub struct PrimitiveStructuralEncoder {
3410 accumulation_queue: AccumulationQueue,
3412
3413 keep_original_array: bool,
3414 support_large_chunk: bool,
3415 accumulated_repdefs: Vec<RepDefBuilder>,
3416 compression_strategy: Arc<dyn CompressionStrategy>,
3418 column_index: u32,
3419 field: Field,
3420 encoding_metadata: Arc<HashMap<String, String>>,
3421 version: LanceFileVersion,
3422}
3423
3424struct CompressedLevelsChunk {
3425 data: LanceBuffer,
3426 num_levels: u16,
3427}
3428
3429struct CompressedLevels {
3430 data: Vec<CompressedLevelsChunk>,
3431 compression: CompressiveEncoding,
3432 rep_index: Option<LanceBuffer>,
3433}
3434
3435struct SerializedMiniBlockPage {
3436 num_buffers: u64,
3437 data: LanceBuffer,
3438 metadata: LanceBuffer,
3439}
3440
3441impl PrimitiveStructuralEncoder {
3442 pub fn try_new(
3443 options: &EncodingOptions,
3444 compression_strategy: Arc<dyn CompressionStrategy>,
3445 column_index: u32,
3446 field: Field,
3447 encoding_metadata: Arc<HashMap<String, String>>,
3448 ) -> Result<Self> {
3449 Ok(Self {
3450 accumulation_queue: AccumulationQueue::new(
3451 options.cache_bytes_per_column,
3452 column_index,
3453 options.keep_original_array,
3454 ),
3455 support_large_chunk: options.support_large_chunk(),
3456 keep_original_array: options.keep_original_array,
3457 accumulated_repdefs: Vec::new(),
3458 column_index,
3459 compression_strategy,
3460 field,
3461 encoding_metadata,
3462 version: options.version,
3463 })
3464 }
3465
3466 fn is_narrow(data_block: &DataBlock) -> bool {
3474 const MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE: u64 = 256;
3475
3476 if let Some(max_len_array) = data_block.get_stat(Stat::MaxLength) {
3477 let max_len_array = max_len_array
3478 .as_any()
3479 .downcast_ref::<PrimitiveArray<UInt64Type>>()
3480 .unwrap();
3481 if max_len_array.value(0) < MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE {
3482 return true;
3483 }
3484 }
3485 false
3486 }
3487
3488 fn prefers_miniblock(
3489 data_block: &DataBlock,
3490 encoding_metadata: &HashMap<String, String>,
3491 ) -> bool {
3492 if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3494 return user_requested.to_lowercase() == STRUCTURAL_ENCODING_MINIBLOCK;
3495 }
3496 Self::is_narrow(data_block)
3498 }
3499
3500 fn prefers_fullzip(encoding_metadata: &HashMap<String, String>) -> bool {
3501 if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3505 return user_requested.to_lowercase() == STRUCTURAL_ENCODING_FULLZIP;
3506 }
3507 true
3508 }
3509
3510 fn serialize_miniblocks(
3557 miniblocks: MiniBlockCompressed,
3558 rep: Option<Vec<CompressedLevelsChunk>>,
3559 def: Option<Vec<CompressedLevelsChunk>>,
3560 support_large_chunk: bool,
3561 ) -> SerializedMiniBlockPage {
3562 let bytes_rep = rep
3563 .as_ref()
3564 .map(|rep| rep.iter().map(|r| r.data.len()).sum::<usize>())
3565 .unwrap_or(0);
3566 let bytes_def = def
3567 .as_ref()
3568 .map(|def| def.iter().map(|d| d.data.len()).sum::<usize>())
3569 .unwrap_or(0);
3570 let bytes_data = miniblocks.data.iter().map(|d| d.len()).sum::<usize>();
3571 let mut num_buffers = miniblocks.data.len();
3572 if rep.is_some() {
3573 num_buffers += 1;
3574 }
3575 if def.is_some() {
3576 num_buffers += 1;
3577 }
3578 let max_extra = 9 * num_buffers;
3580 let mut data_buffer = Vec::with_capacity(bytes_rep + bytes_def + bytes_data + max_extra);
3581 let chunk_size_bytes = if support_large_chunk { 4 } else { 2 };
3582 let mut meta_buffer = Vec::with_capacity(miniblocks.chunks.len() * chunk_size_bytes);
3583
3584 let mut rep_iter = rep.map(|r| r.into_iter());
3585 let mut def_iter = def.map(|d| d.into_iter());
3586
3587 let mut buffer_offsets = vec![0; miniblocks.data.len()];
3588 for chunk in miniblocks.chunks {
3589 let start_pos = data_buffer.len();
3590 debug_assert_eq!(start_pos % MINIBLOCK_ALIGNMENT, 0);
3592
3593 let rep = rep_iter.as_mut().map(|r| r.next().unwrap());
3594 let def = def_iter.as_mut().map(|d| d.next().unwrap());
3595
3596 let num_levels = rep
3598 .as_ref()
3599 .map(|r| r.num_levels)
3600 .unwrap_or(def.as_ref().map(|d| d.num_levels).unwrap_or(0));
3601 data_buffer.extend_from_slice(&num_levels.to_le_bytes());
3602
3603 if let Some(rep) = rep.as_ref() {
3605 let bytes_rep = u16::try_from(rep.data.len()).unwrap();
3606 data_buffer.extend_from_slice(&bytes_rep.to_le_bytes());
3607 }
3608 if let Some(def) = def.as_ref() {
3609 let bytes_def = u16::try_from(def.data.len()).unwrap();
3610 data_buffer.extend_from_slice(&bytes_def.to_le_bytes());
3611 }
3612
3613 if support_large_chunk {
3614 for &buffer_size in &chunk.buffer_sizes {
3615 data_buffer.extend_from_slice(&buffer_size.to_le_bytes());
3616 }
3617 } else {
3618 for &buffer_size in &chunk.buffer_sizes {
3619 data_buffer.extend_from_slice(&(buffer_size as u16).to_le_bytes());
3620 }
3621 }
3622
3623 let add_padding = |data_buffer: &mut Vec<u8>| {
3625 let pad = pad_bytes::<MINIBLOCK_ALIGNMENT>(data_buffer.len());
3626 data_buffer.extend(iter::repeat_n(FILL_BYTE, pad));
3627 };
3628 add_padding(&mut data_buffer);
3629
3630 if let Some(rep) = rep.as_ref() {
3632 data_buffer.extend_from_slice(&rep.data);
3633 add_padding(&mut data_buffer);
3634 }
3635 if let Some(def) = def.as_ref() {
3636 data_buffer.extend_from_slice(&def.data);
3637 add_padding(&mut data_buffer);
3638 }
3639 for (buffer_size, (buffer, buffer_offset)) in chunk
3640 .buffer_sizes
3641 .iter()
3642 .zip(miniblocks.data.iter().zip(buffer_offsets.iter_mut()))
3643 {
3644 let start = *buffer_offset;
3645 let end = start + *buffer_size as usize;
3646 *buffer_offset += *buffer_size as usize;
3647 data_buffer.extend_from_slice(&buffer[start..end]);
3648 add_padding(&mut data_buffer);
3649 }
3650
3651 let chunk_bytes = data_buffer.len() - start_pos;
3652 let max_chunk_size = if support_large_chunk {
3653 4 * 1024 * 1024 * 1024 } else {
3655 32 * 1024 };
3657 assert!(chunk_bytes <= max_chunk_size);
3658 assert!(chunk_bytes > 0);
3659 assert_eq!(chunk_bytes % 8, 0);
3660 assert!(chunk.log_num_values <= 12);
3662 let divided_bytes = chunk_bytes / MINIBLOCK_ALIGNMENT;
3666 let divided_bytes_minus_one = (divided_bytes - 1) as u64;
3667
3668 let metadata = (divided_bytes_minus_one << 4) | chunk.log_num_values as u64;
3669 if support_large_chunk {
3670 meta_buffer.extend_from_slice(&(metadata as u32).to_le_bytes());
3671 } else {
3672 meta_buffer.extend_from_slice(&(metadata as u16).to_le_bytes());
3673 }
3674 }
3675
3676 let data_buffer = LanceBuffer::from(data_buffer);
3677 let metadata_buffer = LanceBuffer::from(meta_buffer);
3678
3679 SerializedMiniBlockPage {
3680 num_buffers: miniblocks.data.len() as u64,
3681 data: data_buffer,
3682 metadata: metadata_buffer,
3683 }
3684 }
3685
3686 fn compress_levels(
3691 mut levels: RepDefSlicer<'_>,
3692 num_elements: u64,
3693 compression_strategy: &dyn CompressionStrategy,
3694 chunks: &[MiniBlockChunk],
3695 max_rep: u16,
3697 ) -> Result<CompressedLevels> {
3698 let mut rep_index = if max_rep > 0 {
3699 Vec::with_capacity(chunks.len())
3700 } else {
3701 vec![]
3702 };
3703 let num_levels = levels.num_levels() as u64;
3705 let levels_buf = levels.all_levels().clone();
3706
3707 let mut fixed_width_block = FixedWidthDataBlock {
3708 data: levels_buf,
3709 bits_per_value: 16,
3710 num_values: num_levels,
3711 block_info: BlockInfo::new(),
3712 };
3713 fixed_width_block.compute_stat();
3715
3716 let levels_block = DataBlock::FixedWidth(fixed_width_block);
3717 let levels_field = Field::new_arrow("", DataType::UInt16, false)?;
3718 let (compressor, compressor_desc) =
3720 compression_strategy.create_block_compressor(&levels_field, &levels_block)?;
3721 let mut level_chunks = Vec::with_capacity(chunks.len());
3723 let mut values_counter = 0;
3724 for (chunk_idx, chunk) in chunks.iter().enumerate() {
3725 let chunk_num_values = chunk.num_values(values_counter, num_elements);
3726 debug_assert!(chunk_num_values > 0);
3727 values_counter += chunk_num_values;
3728 let chunk_levels = if chunk_idx < chunks.len() - 1 {
3729 levels.slice_next(chunk_num_values as usize)
3730 } else {
3731 levels.slice_rest()
3732 };
3733 let num_chunk_levels = (chunk_levels.len() / 2) as u64;
3734 if max_rep > 0 {
3735 let rep_values = chunk_levels.borrow_to_typed_slice::<u16>();
3745 let rep_values = rep_values.as_ref();
3746
3747 let mut num_rows = rep_values.iter().skip(1).filter(|v| **v == max_rep).count();
3750 let num_leftovers = if chunk_idx < chunks.len() - 1 {
3751 rep_values
3752 .iter()
3753 .rev()
3754 .position(|v| *v == max_rep)
3755 .map(|pos| pos + 1)
3757 .unwrap_or(rep_values.len())
3758 } else {
3759 0
3761 };
3762
3763 if chunk_idx != 0 && rep_values.first() == Some(&max_rep) {
3764 let rep_len = rep_index.len();
3768 if rep_index[rep_len - 1] != 0 {
3769 rep_index[rep_len - 2] += 1;
3771 rep_index[rep_len - 1] = 0;
3772 }
3773 }
3774
3775 if chunk_idx == chunks.len() - 1 {
3776 num_rows += 1;
3778 }
3779 rep_index.push(num_rows as u64);
3780 rep_index.push(num_leftovers as u64);
3781 }
3782 let mut chunk_fixed_width = FixedWidthDataBlock {
3783 data: chunk_levels,
3784 bits_per_value: 16,
3785 num_values: num_chunk_levels,
3786 block_info: BlockInfo::new(),
3787 };
3788 chunk_fixed_width.compute_stat();
3789 let chunk_levels_block = DataBlock::FixedWidth(chunk_fixed_width);
3790 let compressed_levels = compressor.compress(chunk_levels_block)?;
3791 level_chunks.push(CompressedLevelsChunk {
3792 data: compressed_levels,
3793 num_levels: num_chunk_levels as u16,
3794 });
3795 }
3796 debug_assert_eq!(levels.num_levels_remaining(), 0);
3797 let rep_index = if rep_index.is_empty() {
3798 None
3799 } else {
3800 Some(LanceBuffer::reinterpret_vec(rep_index))
3801 };
3802 Ok(CompressedLevels {
3803 data: level_chunks,
3804 compression: compressor_desc,
3805 rep_index,
3806 })
3807 }
3808
3809 fn encode_simple_all_null(
3810 column_idx: u32,
3811 num_rows: u64,
3812 row_number: u64,
3813 ) -> Result<EncodedPage> {
3814 let description = ProtobufUtils21::simple_all_null_layout();
3815 Ok(EncodedPage {
3816 column_idx,
3817 data: vec![],
3818 description: PageEncoding::Structural(description),
3819 num_rows,
3820 row_number,
3821 })
3822 }
3823
3824 fn encode_complex_all_null(
3828 column_idx: u32,
3829 repdefs: Vec<RepDefBuilder>,
3830 row_number: u64,
3831 num_rows: u64,
3832 ) -> Result<EncodedPage> {
3833 let repdef = RepDefBuilder::serialize(repdefs);
3834
3835 let rep_bytes = if let Some(rep) = repdef.repetition_levels.as_ref() {
3837 LanceBuffer::reinterpret_slice(rep.clone())
3838 } else {
3839 LanceBuffer::empty()
3840 };
3841
3842 let def_bytes = if let Some(def) = repdef.definition_levels.as_ref() {
3843 LanceBuffer::reinterpret_slice(def.clone())
3844 } else {
3845 LanceBuffer::empty()
3846 };
3847
3848 let description = ProtobufUtils21::all_null_layout(&repdef.def_meaning);
3849 Ok(EncodedPage {
3850 column_idx,
3851 data: vec![rep_bytes, def_bytes],
3852 description: PageEncoding::Structural(description),
3853 num_rows,
3854 row_number,
3855 })
3856 }
3857
3858 #[allow(clippy::too_many_arguments)]
3859 fn encode_miniblock(
3860 column_idx: u32,
3861 field: &Field,
3862 compression_strategy: &dyn CompressionStrategy,
3863 data: DataBlock,
3864 repdefs: Vec<RepDefBuilder>,
3865 row_number: u64,
3866 dictionary_data: Option<DataBlock>,
3867 num_rows: u64,
3868 support_large_chunk: bool,
3869 ) -> Result<EncodedPage> {
3870 let repdef = RepDefBuilder::serialize(repdefs);
3871
3872 if let DataBlock::AllNull(_null_block) = data {
3873 unreachable!()
3876 }
3877
3878 let num_items = data.num_values();
3879
3880 let compressor = compression_strategy.create_miniblock_compressor(field, &data)?;
3881 let (compressed_data, value_encoding) = compressor.compress(data)?;
3882
3883 let max_rep = repdef.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
3884
3885 let mut compressed_rep = repdef
3886 .rep_slicer()
3887 .map(|rep_slicer| {
3888 Self::compress_levels(
3889 rep_slicer,
3890 num_items,
3891 compression_strategy,
3892 &compressed_data.chunks,
3893 max_rep,
3894 )
3895 })
3896 .transpose()?;
3897
3898 let (rep_index, rep_index_depth) =
3899 match compressed_rep.as_mut().and_then(|cr| cr.rep_index.as_mut()) {
3900 Some(rep_index) => (Some(rep_index.clone()), 1),
3901 None => (None, 0),
3902 };
3903
3904 let mut compressed_def = repdef
3905 .def_slicer()
3906 .map(|def_slicer| {
3907 Self::compress_levels(
3908 def_slicer,
3909 num_items,
3910 compression_strategy,
3911 &compressed_data.chunks,
3912 0,
3913 )
3914 })
3915 .transpose()?;
3916
3917 let rep_data = compressed_rep
3923 .as_mut()
3924 .map(|cr| std::mem::take(&mut cr.data));
3925 let def_data = compressed_def
3926 .as_mut()
3927 .map(|cd| std::mem::take(&mut cd.data));
3928
3929 let serialized =
3930 Self::serialize_miniblocks(compressed_data, rep_data, def_data, support_large_chunk);
3931
3932 let mut data = Vec::with_capacity(4);
3934 data.push(serialized.metadata);
3935 data.push(serialized.data);
3936
3937 if let Some(dictionary_data) = dictionary_data {
3938 let num_dictionary_items = dictionary_data.num_values();
3939 let dummy_dictionary_field = Field::new_arrow("", DataType::UInt16, false)?;
3941
3942 let (compressor, dictionary_encoding) = compression_strategy
3943 .create_block_compressor(&dummy_dictionary_field, &dictionary_data)?;
3944 let dictionary_buffer = compressor.compress(dictionary_data)?;
3945
3946 data.push(dictionary_buffer);
3947 if let Some(rep_index) = rep_index {
3948 data.push(rep_index);
3949 }
3950
3951 let description = ProtobufUtils21::miniblock_layout(
3952 compressed_rep.map(|cr| cr.compression),
3953 compressed_def.map(|cd| cd.compression),
3954 value_encoding,
3955 rep_index_depth,
3956 serialized.num_buffers,
3957 Some((dictionary_encoding, num_dictionary_items)),
3958 &repdef.def_meaning,
3959 num_items,
3960 support_large_chunk,
3961 );
3962 Ok(EncodedPage {
3963 num_rows,
3964 column_idx,
3965 data,
3966 description: PageEncoding::Structural(description),
3967 row_number,
3968 })
3969 } else {
3970 let description = ProtobufUtils21::miniblock_layout(
3971 compressed_rep.map(|cr| cr.compression),
3972 compressed_def.map(|cd| cd.compression),
3973 value_encoding,
3974 rep_index_depth,
3975 serialized.num_buffers,
3976 None,
3977 &repdef.def_meaning,
3978 num_items,
3979 support_large_chunk,
3980 );
3981
3982 if let Some(rep_index) = rep_index {
3983 let view = rep_index.borrow_to_typed_slice::<u64>();
3984 let total = view.chunks_exact(2).map(|c| c[0]).sum::<u64>();
3985 debug_assert_eq!(total, num_rows);
3986
3987 data.push(rep_index);
3988 }
3989
3990 Ok(EncodedPage {
3991 num_rows,
3992 column_idx,
3993 data,
3994 description: PageEncoding::Structural(description),
3995 row_number,
3996 })
3997 }
3998 }
3999
4000 fn serialize_full_zip_fixed(
4002 fixed: FixedWidthDataBlock,
4003 mut repdef: ControlWordIterator,
4004 num_values: u64,
4005 ) -> SerializedFullZip {
4006 let len = fixed.data.len() + repdef.bytes_per_word() * num_values as usize;
4007 let mut zipped_data = Vec::with_capacity(len);
4008
4009 let max_rep_index_val = if repdef.has_repetition() {
4010 len as u64
4011 } else {
4012 0
4014 };
4015 let mut rep_index_builder =
4016 BytepackedIntegerEncoder::with_capacity(num_values as usize + 1, max_rep_index_val);
4017
4018 assert_eq!(
4021 fixed.bits_per_value % 8,
4022 0,
4023 "Non-byte aligned full-zip compression not yet supported"
4024 );
4025
4026 let bytes_per_value = fixed.bits_per_value as usize / 8;
4027 let mut offset = 0;
4028
4029 if bytes_per_value == 0 {
4030 while let Some(control) = repdef.append_next(&mut zipped_data) {
4032 if control.is_new_row {
4033 debug_assert!(offset <= len);
4035 unsafe { rep_index_builder.append(offset as u64) };
4037 }
4038 offset = zipped_data.len();
4039 }
4040 } else {
4041 let mut data_iter = fixed.data.chunks_exact(bytes_per_value);
4043 while let Some(control) = repdef.append_next(&mut zipped_data) {
4044 if control.is_new_row {
4045 debug_assert!(offset <= len);
4047 unsafe { rep_index_builder.append(offset as u64) };
4049 }
4050 if control.is_visible {
4051 let value = data_iter.next().unwrap();
4052 zipped_data.extend_from_slice(value);
4053 }
4054 offset = zipped_data.len();
4055 }
4056 }
4057
4058 debug_assert_eq!(zipped_data.len(), len);
4059 unsafe {
4062 rep_index_builder.append(zipped_data.len() as u64);
4063 }
4064
4065 let zipped_data = LanceBuffer::from(zipped_data);
4066 let rep_index = rep_index_builder.into_data();
4067 let rep_index = if rep_index.is_empty() {
4068 None
4069 } else {
4070 Some(LanceBuffer::from(rep_index))
4071 };
4072 SerializedFullZip {
4073 values: zipped_data,
4074 repetition_index: rep_index,
4075 }
4076 }
4077
4078 fn serialize_full_zip_variable(
4082 variable: VariableWidthBlock,
4083 mut repdef: ControlWordIterator,
4084 num_items: u64,
4085 ) -> SerializedFullZip {
4086 let bytes_per_offset = variable.bits_per_offset as usize / 8;
4087 assert_eq!(
4088 variable.bits_per_offset % 8,
4089 0,
4090 "Only byte-aligned offsets supported"
4091 );
4092 let len = variable.data.len()
4093 + repdef.bytes_per_word() * num_items as usize
4094 + bytes_per_offset * variable.num_values as usize;
4095 let mut buf = Vec::with_capacity(len);
4096
4097 let max_rep_index_val = len as u64;
4098 let mut rep_index_builder =
4099 BytepackedIntegerEncoder::with_capacity(num_items as usize + 1, max_rep_index_val);
4100
4101 match bytes_per_offset {
4103 4 => {
4104 let offs = variable.offsets.borrow_to_typed_slice::<u32>();
4105 let mut rep_offset = 0;
4106 let mut windows_iter = offs.as_ref().windows(2);
4107 while let Some(control) = repdef.append_next(&mut buf) {
4108 if control.is_new_row {
4109 debug_assert!(rep_offset <= len);
4111 unsafe { rep_index_builder.append(rep_offset as u64) };
4113 }
4114 if control.is_visible {
4115 let window = windows_iter.next().unwrap();
4116 if control.is_valid_item {
4117 buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
4118 buf.extend_from_slice(
4119 &variable.data[window[0] as usize..window[1] as usize],
4120 );
4121 }
4122 }
4123 rep_offset = buf.len();
4124 }
4125 }
4126 8 => {
4127 let offs = variable.offsets.borrow_to_typed_slice::<u64>();
4128 let mut rep_offset = 0;
4129 let mut windows_iter = offs.as_ref().windows(2);
4130 while let Some(control) = repdef.append_next(&mut buf) {
4131 if control.is_new_row {
4132 debug_assert!(rep_offset <= len);
4134 unsafe { rep_index_builder.append(rep_offset as u64) };
4136 }
4137 if control.is_visible {
4138 let window = windows_iter.next().unwrap();
4139 if control.is_valid_item {
4140 buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
4141 buf.extend_from_slice(
4142 &variable.data[window[0] as usize..window[1] as usize],
4143 );
4144 }
4145 }
4146 rep_offset = buf.len();
4147 }
4148 }
4149 _ => panic!("Unsupported offset size"),
4150 }
4151
4152 debug_assert!(buf.len() <= len);
4155 unsafe {
4158 rep_index_builder.append(buf.len() as u64);
4159 }
4160
4161 let zipped_data = LanceBuffer::from(buf);
4162 let rep_index = rep_index_builder.into_data();
4163 debug_assert!(!rep_index.is_empty());
4164 let rep_index = Some(LanceBuffer::from(rep_index));
4165 SerializedFullZip {
4166 values: zipped_data,
4167 repetition_index: rep_index,
4168 }
4169 }
4170
4171 fn serialize_full_zip(
4174 compressed_data: PerValueDataBlock,
4175 repdef: ControlWordIterator,
4176 num_items: u64,
4177 ) -> SerializedFullZip {
4178 match compressed_data {
4179 PerValueDataBlock::Fixed(fixed) => {
4180 Self::serialize_full_zip_fixed(fixed, repdef, num_items)
4181 }
4182 PerValueDataBlock::Variable(var) => {
4183 Self::serialize_full_zip_variable(var, repdef, num_items)
4184 }
4185 }
4186 }
4187
4188 fn encode_full_zip(
4189 column_idx: u32,
4190 field: &Field,
4191 compression_strategy: &dyn CompressionStrategy,
4192 data: DataBlock,
4193 repdefs: Vec<RepDefBuilder>,
4194 row_number: u64,
4195 num_lists: u64,
4196 ) -> Result<EncodedPage> {
4197 let repdef = RepDefBuilder::serialize(repdefs);
4198 let max_rep = repdef
4199 .repetition_levels
4200 .as_ref()
4201 .map_or(0, |r| r.iter().max().copied().unwrap_or(0));
4202 let max_def = repdef
4203 .definition_levels
4204 .as_ref()
4205 .map_or(0, |d| d.iter().max().copied().unwrap_or(0));
4206
4207 let (num_items, num_visible_items) =
4211 if let Some(rep_levels) = repdef.repetition_levels.as_ref() {
4212 (rep_levels.len() as u64, data.num_values())
4215 } else {
4216 (data.num_values(), data.num_values())
4218 };
4219
4220 let max_visible_def = repdef.max_visible_level.unwrap_or(u16::MAX);
4221
4222 let repdef_iter = build_control_word_iterator(
4223 repdef.repetition_levels.as_deref(),
4224 max_rep,
4225 repdef.definition_levels.as_deref(),
4226 max_def,
4227 max_visible_def,
4228 num_items as usize,
4229 );
4230 let bits_rep = repdef_iter.bits_rep();
4231 let bits_def = repdef_iter.bits_def();
4232
4233 let compressor = compression_strategy.create_per_value(field, &data)?;
4234 let (compressed_data, value_encoding) = compressor.compress(data)?;
4235
4236 let description = match &compressed_data {
4237 PerValueDataBlock::Fixed(fixed) => ProtobufUtils21::fixed_full_zip_layout(
4238 bits_rep,
4239 bits_def,
4240 fixed.bits_per_value as u32,
4241 value_encoding,
4242 &repdef.def_meaning,
4243 num_items as u32,
4244 num_visible_items as u32,
4245 ),
4246 PerValueDataBlock::Variable(variable) => ProtobufUtils21::variable_full_zip_layout(
4247 bits_rep,
4248 bits_def,
4249 variable.bits_per_offset as u32,
4250 value_encoding,
4251 &repdef.def_meaning,
4252 num_items as u32,
4253 num_visible_items as u32,
4254 ),
4255 };
4256
4257 let zipped = Self::serialize_full_zip(compressed_data, repdef_iter, num_items);
4258
4259 let data = if let Some(repindex) = zipped.repetition_index {
4260 vec![zipped.values, repindex]
4261 } else {
4262 vec![zipped.values]
4263 };
4264
4265 Ok(EncodedPage {
4266 num_rows: num_lists,
4267 column_idx,
4268 data,
4269 description: PageEncoding::Structural(description),
4270 row_number,
4271 })
4272 }
4273
4274 fn estimate_dict_size(data_block: &DataBlock, version: LanceFileVersion) -> Option<u64> {
4289 let cardinality = if let Some(cardinality_array) = data_block.get_stat(Stat::Cardinality) {
4290 cardinality_array.as_primitive::<UInt64Type>().value(0)
4291 } else {
4292 return None;
4293 };
4294
4295 let num_values = data_block.num_values();
4296
4297 match data_block {
4298 DataBlock::FixedWidth(fixed) => {
4299 if fixed.bits_per_value == 64 && version < LanceFileVersion::V2_2 {
4300 return None;
4301 }
4302 if cardinality > i32::MAX as u64 {
4304 return None;
4305 }
4306 if fixed.bits_per_value != 64 && fixed.bits_per_value != 128 {
4308 return None;
4309 }
4310 if fixed.bits_per_value % 8 != 0 {
4311 return None;
4312 }
4313 let dict_size = cardinality * (fixed.bits_per_value / 8);
4315 let indices_size = num_values * (DICT_INDICES_BITS_PER_VALUE / 8);
4317 Some(dict_size + indices_size)
4318 }
4319 DataBlock::VariableWidth(var) => {
4320 if var.bits_per_offset != 32 && var.bits_per_offset != 64 {
4322 return None;
4323 }
4324 let bits_per_offset = var.bits_per_offset as u64;
4325 if (bits_per_offset == 32 && cardinality > i32::MAX as u64)
4326 || (bits_per_offset == 64 && cardinality > i64::MAX as u64)
4327 {
4328 return None;
4329 }
4330
4331 let data_size = data_block.data_size();
4332 let avg_value_size = data_size / num_values;
4333
4334 let dict_values_size = cardinality * avg_value_size;
4336 let dict_offsets_size = cardinality * (bits_per_offset / 8);
4338 let indices_size = num_values * (bits_per_offset / 8);
4340
4341 Some(dict_values_size + dict_offsets_size + indices_size)
4342 }
4343 _ => None,
4344 }
4345 }
4346
4347 fn should_dictionary_encode(
4348 data_block: &DataBlock,
4349 field: &Field,
4350 version: LanceFileVersion,
4351 ) -> bool {
4352 match data_block {
4355 DataBlock::FixedWidth(fixed) => {
4356 if fixed.bits_per_value == 64 && version < LanceFileVersion::V2_2 {
4357 return false;
4358 }
4359 if fixed.bits_per_value != 64 && fixed.bits_per_value != 128 {
4360 return false;
4361 }
4362 }
4363 DataBlock::VariableWidth(_) => {}
4364 _ => return false,
4365 }
4366
4367 let too_small = env::var("LANCE_ENCODING_DICT_TOO_SMALL")
4369 .ok()
4370 .and_then(|val| val.parse().ok())
4371 .unwrap_or(100);
4372 if data_block.num_values() < too_small {
4373 return false;
4374 }
4375
4376 let threshold_ratio = field
4378 .metadata
4379 .get(DICT_SIZE_RATIO_META_KEY)
4380 .and_then(|val| val.parse::<f64>().ok())
4381 .or_else(|| {
4382 env::var("LANCE_ENCODING_DICT_SIZE_RATIO")
4383 .ok()
4384 .and_then(|val| val.parse().ok())
4385 })
4386 .unwrap_or(0.8);
4387
4388 if threshold_ratio <= 0.0 || threshold_ratio > 1.0 {
4390 panic!(
4391 "Invalid parameter: dict-size-ratio is {} which is not in the range (0, 1].",
4392 threshold_ratio
4393 );
4394 }
4395
4396 let data_size = data_block.data_size();
4398
4399 let Some(encoded_size) = Self::estimate_dict_size(data_block, version) else {
4401 return false;
4402 };
4403
4404 let size_ratio_actual = if data_size > 0 {
4405 encoded_size as f64 / data_size as f64
4406 } else {
4407 return false;
4408 };
4409 size_ratio_actual < threshold_ratio
4410 }
4411
4412 fn do_flush(
4414 &mut self,
4415 arrays: Vec<ArrayRef>,
4416 repdefs: Vec<RepDefBuilder>,
4417 row_number: u64,
4418 num_rows: u64,
4419 ) -> Result<Vec<EncodeTask>> {
4420 let column_idx = self.column_index;
4421 let compression_strategy = self.compression_strategy.clone();
4422 let field = self.field.clone();
4423 let encoding_metadata = self.encoding_metadata.clone();
4424 let support_large_chunk = self.support_large_chunk;
4425 let version = self.version;
4426 let task = spawn_cpu(move || {
4427 let num_values = arrays.iter().map(|arr| arr.len() as u64).sum();
4428
4429 if num_values == 0 {
4430 log::debug!("Encoding column {} with {} items ({} rows) using complex-null layout", column_idx, num_values, num_rows);
4434 return Self::encode_complex_all_null(column_idx, repdefs, row_number, num_rows);
4435 }
4436 let num_nulls = arrays
4437 .iter()
4438 .map(|arr| arr.logical_nulls().map(|n| n.null_count()).unwrap_or(0) as u64)
4439 .sum::<u64>();
4440
4441 if num_values == num_nulls {
4442 return if repdefs.iter().all(|rd| rd.is_simple_validity()) {
4443 log::debug!(
4444 "Encoding column {} with {} items ({} rows) using simple-null layout",
4445 column_idx,
4446 num_values,
4447 num_rows
4448 );
4449 Self::encode_simple_all_null(column_idx, num_values, row_number)
4451 } else {
4452 log::debug!(
4453 "Encoding column {} with {} items ({} rows) using complex-null layout",
4454 column_idx,
4455 num_values,
4456 num_rows
4457 );
4458 Self::encode_complex_all_null(column_idx, repdefs, row_number, num_rows)
4460 };
4461 }
4462
4463 if let DataType::Struct(fields) = &field.data_type() {
4464 if fields.is_empty() {
4465 if repdefs.iter().any(|rd| !rd.is_empty()) {
4466 return Err(Error::InvalidInput { source: format!("Empty structs with rep/def information are not yet supported. The field {} is an empty struct that either has nulls or is in a list.", field.name).into(), location: location!() });
4467 }
4468 return Self::encode_simple_all_null(column_idx, num_values, row_number);
4471 }
4472 }
4473
4474 let data_block = DataBlock::from_arrays(&arrays, num_values);
4475
4476 let requires_full_zip_packed_struct =
4477 if let DataBlock::Struct(ref struct_data_block) = data_block {
4478 struct_data_block.has_variable_width_child()
4479 } else {
4480 false
4481 };
4482
4483 if requires_full_zip_packed_struct {
4484 log::debug!(
4485 "Encoding column {} with {} items using full-zip packed struct layout",
4486 column_idx,
4487 num_values
4488 );
4489 return Self::encode_full_zip(
4490 column_idx,
4491 &field,
4492 compression_strategy.as_ref(),
4493 data_block,
4494 repdefs,
4495 row_number,
4496 num_rows,
4497 );
4498 }
4499
4500 if let DataBlock::Dictionary(dict) = data_block {
4501 log::debug!("Encoding column {} with {} items using dictionary encoding (already dictionary encoded)", column_idx, num_values);
4502 let (mut indices_data_block, dictionary_data_block) = dict.into_parts();
4503 indices_data_block.compute_stat();
4508 Self::encode_miniblock(
4509 column_idx,
4510 &field,
4511 compression_strategy.as_ref(),
4512 indices_data_block,
4513 repdefs,
4514 row_number,
4515 Some(dictionary_data_block),
4516 num_rows,
4517 support_large_chunk,
4518 )
4519 } else if Self::should_dictionary_encode(&data_block, &field, version) {
4520 log::debug!(
4521 "Encoding column {} with {} items using dictionary encoding (mini-block layout)",
4522 column_idx,
4523 num_values
4524 );
4525 let (indices_data_block, dictionary_data_block) =
4526 dict::dictionary_encode(data_block);
4527 Self::encode_miniblock(
4528 column_idx,
4529 &field,
4530 compression_strategy.as_ref(),
4531 indices_data_block,
4532 repdefs,
4533 row_number,
4534 Some(dictionary_data_block),
4535 num_rows,
4536 support_large_chunk,
4537 )
4538 } else if Self::prefers_miniblock(&data_block, encoding_metadata.as_ref()) {
4539 log::debug!(
4540 "Encoding column {} with {} items using mini-block layout",
4541 column_idx,
4542 num_values
4543 );
4544 Self::encode_miniblock(
4545 column_idx,
4546 &field,
4547 compression_strategy.as_ref(),
4548 data_block,
4549 repdefs,
4550 row_number,
4551 None,
4552 num_rows,
4553 support_large_chunk,
4554 )
4555 } else if Self::prefers_fullzip(encoding_metadata.as_ref()) {
4556 log::debug!(
4557 "Encoding column {} with {} items using full-zip layout",
4558 column_idx,
4559 num_values
4560 );
4561 Self::encode_full_zip(
4562 column_idx,
4563 &field,
4564 compression_strategy.as_ref(),
4565 data_block,
4566 repdefs,
4567 row_number,
4568 num_rows,
4569 )
4570 } else {
4571 Err(Error::InvalidInput { source: format!("Cannot determine structural encoding for field {}. This typically indicates an invalid value of the field metadata key {}", field.name, STRUCTURAL_ENCODING_META_KEY).into(), location: location!() })
4572 }
4573 })
4574 .boxed();
4575 Ok(vec![task])
4576 }
4577
4578 fn extract_validity_buf(
4579 array: Arc<dyn Array>,
4580 repdef: &mut RepDefBuilder,
4581 keep_original_array: bool,
4582 ) -> Result<Arc<dyn Array>> {
4583 if let Some(validity) = array.nulls() {
4584 if keep_original_array {
4585 repdef.add_validity_bitmap(validity.clone());
4586 } else {
4587 repdef.add_validity_bitmap(deep_copy_nulls(Some(validity)).unwrap());
4588 }
4589 let data_no_nulls = array.to_data().into_builder().nulls(None).build()?;
4590 Ok(make_array(data_no_nulls))
4591 } else {
4592 repdef.add_no_null(array.len());
4593 Ok(array)
4594 }
4595 }
4596
4597 fn extract_validity(
4598 mut array: Arc<dyn Array>,
4599 repdef: &mut RepDefBuilder,
4600 keep_original_array: bool,
4601 ) -> Result<Arc<dyn Array>> {
4602 match array.data_type() {
4603 DataType::Null => {
4604 repdef.add_validity_bitmap(NullBuffer::new(BooleanBuffer::new_unset(array.len())));
4605 Ok(array)
4606 }
4607 DataType::Dictionary(_, _) => {
4608 array = dict::normalize_dict_nulls(array)?;
4609 Self::extract_validity_buf(array, repdef, keep_original_array)
4610 }
4611 _ => Self::extract_validity_buf(array, repdef, keep_original_array),
4620 }
4621 }
4622}
4623
4624impl FieldEncoder for PrimitiveStructuralEncoder {
4625 fn maybe_encode(
4627 &mut self,
4628 array: ArrayRef,
4629 _external_buffers: &mut OutOfLineBuffers,
4630 mut repdef: RepDefBuilder,
4631 row_number: u64,
4632 num_rows: u64,
4633 ) -> Result<Vec<EncodeTask>> {
4634 let array = Self::extract_validity(array, &mut repdef, self.keep_original_array)?;
4635 self.accumulated_repdefs.push(repdef);
4636
4637 if let Some((arrays, row_number, num_rows)) =
4638 self.accumulation_queue.insert(array, row_number, num_rows)
4639 {
4640 let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
4641 Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
4642 } else {
4643 Ok(vec![])
4644 }
4645 }
4646
4647 fn flush(&mut self, _external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
4649 if let Some((arrays, row_number, num_rows)) = self.accumulation_queue.flush() {
4650 let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
4651 Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
4652 } else {
4653 Ok(vec![])
4654 }
4655 }
4656
4657 fn num_columns(&self) -> u32 {
4658 1
4659 }
4660
4661 fn finish(
4662 &mut self,
4663 _external_buffers: &mut OutOfLineBuffers,
4664 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
4665 std::future::ready(Ok(vec![EncodedColumn::default()])).boxed()
4666 }
4667}
4668
4669#[cfg(test)]
4670#[allow(clippy::single_range_in_vec_init)]
4671mod tests {
4672 use super::{
4673 ChunkInstructions, DataBlock, DecodeMiniBlockTask, FixedPerValueDecompressor,
4674 FixedWidthDataBlock, FullZipCacheableState, FullZipDecodeDetails, FullZipRepIndexDetails,
4675 FullZipScheduler, MiniBlockRepIndex, PerValueDecompressor, PreambleAction,
4676 StructuralPageScheduler,
4677 };
4678 use crate::constants::{STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK};
4679 use crate::data::BlockInfo;
4680 use crate::decoder::PageEncoding;
4681 use crate::encodings::logical::primitive::{
4682 ChunkDrainInstructions, PrimitiveStructuralEncoder,
4683 };
4684 use crate::format::pb21;
4685 use crate::format::pb21::compressive_encoding::Compression;
4686 use crate::testing::{check_round_trip_encoding_of_data, TestCases};
4687 use crate::version::LanceFileVersion;
4688 use arrow_array::{ArrayRef, Int8Array, StringArray, UInt64Array};
4689 use arrow_schema::DataType;
4690 use std::collections::HashMap;
4691 use std::{collections::VecDeque, sync::Arc};
4692
4693 #[test]
4694 fn test_is_narrow() {
4695 let int8_array = Int8Array::from(vec![1, 2, 3]);
4696 let array_ref: ArrayRef = Arc::new(int8_array);
4697 let block = DataBlock::from_array(array_ref);
4698
4699 assert!(PrimitiveStructuralEncoder::is_narrow(&block));
4700
4701 let string_array = StringArray::from(vec![Some("hello"), Some("world")]);
4702 let block = DataBlock::from_array(string_array);
4703 assert!(PrimitiveStructuralEncoder::is_narrow(&block));
4704
4705 let string_array = StringArray::from(vec![
4706 Some("hello world".repeat(100)),
4707 Some("world".to_string()),
4708 ]);
4709 let block = DataBlock::from_array(string_array);
4710 assert!((!PrimitiveStructuralEncoder::is_narrow(&block)));
4711 }
4712
4713 #[test]
4714 fn test_map_range() {
4715 let rep = Some(vec![1, 0, 0, 1, 0, 1, 1, 0, 0]);
4718 let def = Some(vec![0, 0, 0, 0, 0, 1, 0, 0, 0]);
4719 let max_visible_def = 0;
4720 let total_items = 8;
4721 let max_rep = 1;
4722
4723 let check = |range, expected_item_range, expected_level_range| {
4724 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4725 range,
4726 rep.as_ref(),
4727 def.as_ref(),
4728 max_rep,
4729 max_visible_def,
4730 total_items,
4731 PreambleAction::Absent,
4732 );
4733 assert_eq!(item_range, expected_item_range);
4734 assert_eq!(level_range, expected_level_range);
4735 };
4736
4737 check(0..1, 0..3, 0..3);
4738 check(1..2, 3..5, 3..5);
4739 check(2..3, 5..5, 5..6);
4740 check(3..4, 5..8, 6..9);
4741 check(0..2, 0..5, 0..5);
4742 check(1..3, 3..5, 3..6);
4743 check(2..4, 5..8, 5..9);
4744 check(0..3, 0..5, 0..6);
4745 check(1..4, 3..8, 3..9);
4746 check(0..4, 0..8, 0..9);
4747
4748 let rep = Some(vec![1, 1, 0, 1]);
4751 let def = Some(vec![1, 0, 0, 0]);
4752 let max_visible_def = 0;
4753 let total_items = 3;
4754
4755 let check = |range, expected_item_range, expected_level_range| {
4756 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4757 range,
4758 rep.as_ref(),
4759 def.as_ref(),
4760 max_rep,
4761 max_visible_def,
4762 total_items,
4763 PreambleAction::Absent,
4764 );
4765 assert_eq!(item_range, expected_item_range);
4766 assert_eq!(level_range, expected_level_range);
4767 };
4768
4769 check(0..1, 0..0, 0..1);
4770 check(1..2, 0..2, 1..3);
4771 check(2..3, 2..3, 3..4);
4772 check(0..2, 0..2, 0..3);
4773 check(1..3, 0..3, 1..4);
4774 check(0..3, 0..3, 0..4);
4775
4776 let rep = Some(vec![1, 1, 0, 1]);
4779 let def = Some(vec![0, 0, 0, 1]);
4780 let max_visible_def = 0;
4781 let total_items = 3;
4782
4783 let check = |range, expected_item_range, expected_level_range| {
4784 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4785 range,
4786 rep.as_ref(),
4787 def.as_ref(),
4788 max_rep,
4789 max_visible_def,
4790 total_items,
4791 PreambleAction::Absent,
4792 );
4793 assert_eq!(item_range, expected_item_range);
4794 assert_eq!(level_range, expected_level_range);
4795 };
4796
4797 check(0..1, 0..1, 0..1);
4798 check(1..2, 1..3, 1..3);
4799 check(2..3, 3..3, 3..4);
4800 check(0..2, 0..3, 0..3);
4801 check(1..3, 1..3, 1..4);
4802 check(0..3, 0..3, 0..4);
4803
4804 let rep = Some(vec![1, 0, 1, 0, 1, 0]);
4807 let def: Option<&[u16]> = None;
4808 let max_visible_def = 0;
4809 let total_items = 6;
4810
4811 let check = |range, expected_item_range, expected_level_range| {
4812 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4813 range,
4814 rep.as_ref(),
4815 def.as_ref(),
4816 max_rep,
4817 max_visible_def,
4818 total_items,
4819 PreambleAction::Absent,
4820 );
4821 assert_eq!(item_range, expected_item_range);
4822 assert_eq!(level_range, expected_level_range);
4823 };
4824
4825 check(0..1, 0..2, 0..2);
4826 check(1..2, 2..4, 2..4);
4827 check(2..3, 4..6, 4..6);
4828 check(0..2, 0..4, 0..4);
4829 check(1..3, 2..6, 2..6);
4830 check(0..3, 0..6, 0..6);
4831
4832 let rep: Option<&[u16]> = None;
4835 let def = Some(vec![0, 0, 1, 0]);
4836 let max_visible_def = 1;
4837 let total_items = 4;
4838
4839 let check = |range, expected_item_range, expected_level_range| {
4840 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4841 range,
4842 rep.as_ref(),
4843 def.as_ref(),
4844 max_rep,
4845 max_visible_def,
4846 total_items,
4847 PreambleAction::Absent,
4848 );
4849 assert_eq!(item_range, expected_item_range);
4850 assert_eq!(level_range, expected_level_range);
4851 };
4852
4853 check(0..1, 0..1, 0..1);
4854 check(1..2, 1..2, 1..2);
4855 check(2..3, 2..3, 2..3);
4856 check(0..2, 0..2, 0..2);
4857 check(1..3, 1..3, 1..3);
4858 check(0..3, 0..3, 0..3);
4859
4860 let rep = Some(vec![0, 1, 0, 1]);
4865 let def = Some(vec![0, 0, 0, 1]);
4866 let max_visible_def = 0;
4867 let total_items = 3;
4868
4869 let check = |range, expected_item_range, expected_level_range| {
4870 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4871 range,
4872 rep.as_ref(),
4873 def.as_ref(),
4874 max_rep,
4875 max_visible_def,
4876 total_items,
4877 PreambleAction::Take,
4878 );
4879 assert_eq!(item_range, expected_item_range);
4880 assert_eq!(level_range, expected_level_range);
4881 };
4882
4883 check(0..1, 0..3, 0..3);
4885 check(0..2, 0..3, 0..4);
4886
4887 let check = |range, expected_item_range, expected_level_range| {
4888 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4889 range,
4890 rep.as_ref(),
4891 def.as_ref(),
4892 max_rep,
4893 max_visible_def,
4894 total_items,
4895 PreambleAction::Skip,
4896 );
4897 assert_eq!(item_range, expected_item_range);
4898 assert_eq!(level_range, expected_level_range);
4899 };
4900
4901 check(0..1, 1..3, 1..3);
4902 check(1..2, 3..3, 3..4);
4903 check(0..2, 1..3, 1..4);
4904
4905 let rep = Some(vec![0, 1, 1, 0]);
4910 let def = Some(vec![0, 1, 0, 0]);
4911 let max_visible_def = 0;
4912 let total_items = 4;
4913
4914 let check = |range, expected_item_range, expected_level_range| {
4915 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4916 range,
4917 rep.as_ref(),
4918 def.as_ref(),
4919 max_rep,
4920 max_visible_def,
4921 total_items,
4922 PreambleAction::Take,
4923 );
4924 assert_eq!(item_range, expected_item_range);
4925 assert_eq!(level_range, expected_level_range);
4926 };
4927
4928 check(0..1, 0..1, 0..2);
4930 check(0..2, 0..3, 0..4);
4931
4932 let check = |range, expected_item_range, expected_level_range| {
4933 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4934 range,
4935 rep.as_ref(),
4936 def.as_ref(),
4937 max_rep,
4938 max_visible_def,
4939 total_items,
4940 PreambleAction::Skip,
4941 );
4942 assert_eq!(item_range, expected_item_range);
4943 assert_eq!(level_range, expected_level_range);
4944 };
4945
4946 check(0..1, 1..1, 1..2);
4948 check(1..2, 1..3, 2..4);
4949 check(0..2, 1..3, 1..4);
4950
4951 let rep = Some(vec![0, 1, 0, 1]);
4954 let def: Option<Vec<u16>> = None;
4955 let max_visible_def = 0;
4956 let total_items = 4;
4957
4958 let check = |range, expected_item_range, expected_level_range| {
4959 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4960 range,
4961 rep.as_ref(),
4962 def.as_ref(),
4963 max_rep,
4964 max_visible_def,
4965 total_items,
4966 PreambleAction::Take,
4967 );
4968 assert_eq!(item_range, expected_item_range);
4969 assert_eq!(level_range, expected_level_range);
4970 };
4971
4972 check(0..1, 0..3, 0..3);
4974 check(0..2, 0..4, 0..4);
4975
4976 let check = |range, expected_item_range, expected_level_range| {
4977 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4978 range,
4979 rep.as_ref(),
4980 def.as_ref(),
4981 max_rep,
4982 max_visible_def,
4983 total_items,
4984 PreambleAction::Skip,
4985 );
4986 assert_eq!(item_range, expected_item_range);
4987 assert_eq!(level_range, expected_level_range);
4988 };
4989
4990 check(0..1, 1..3, 1..3);
4991 check(1..2, 3..4, 3..4);
4992 check(0..2, 1..4, 1..4);
4993
4994 let rep = Some(vec![2, 1, 2, 0, 1, 2]);
4998 let def = Some(vec![0, 1, 2, 0, 0, 0]);
4999 let max_rep = 2;
5000 let max_visible_def = 0;
5001 let total_items = 4;
5002
5003 let check = |range, expected_item_range, expected_level_range| {
5004 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5005 range,
5006 rep.as_ref(),
5007 def.as_ref(),
5008 max_rep,
5009 max_visible_def,
5010 total_items,
5011 PreambleAction::Absent,
5012 );
5013 assert_eq!(item_range, expected_item_range);
5014 assert_eq!(level_range, expected_level_range);
5015 };
5016
5017 check(0..3, 0..4, 0..6);
5018 check(0..1, 0..1, 0..2);
5019 check(1..2, 1..3, 2..5);
5020 check(2..3, 3..4, 5..6);
5021
5022 let rep = Some(vec![0, 0, 1, 0, 1, 1]);
5024 let def = Some(vec![0, 1, 0, 0, 0, 0]);
5025 let max_rep = 1;
5026 let max_visible_def = 0;
5027 let total_items = 5;
5028
5029 let check = |range, expected_item_range, expected_level_range| {
5030 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5031 range,
5032 rep.as_ref(),
5033 def.as_ref(),
5034 max_rep,
5035 max_visible_def,
5036 total_items,
5037 PreambleAction::Take,
5038 );
5039 assert_eq!(item_range, expected_item_range);
5040 assert_eq!(level_range, expected_level_range);
5041 };
5042
5043 check(0..0, 0..1, 0..2);
5044 check(0..1, 0..3, 0..4);
5045 check(0..2, 0..4, 0..5);
5046
5047 let rep = Some(vec![0, 1, 0, 1, 0, 1, 0, 1]);
5050 let def = Some(vec![1, 0, 1, 1, 0, 0, 0, 0]);
5051 let max_rep = 1;
5052 let max_visible_def = 0;
5053 let total_items = 5;
5054
5055 let check = |range, expected_item_range, expected_level_range| {
5056 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
5057 range,
5058 rep.as_ref(),
5059 def.as_ref(),
5060 max_rep,
5061 max_visible_def,
5062 total_items,
5063 PreambleAction::Skip,
5064 );
5065 assert_eq!(item_range, expected_item_range);
5066 assert_eq!(level_range, expected_level_range);
5067 };
5068
5069 check(2..3, 2..4, 5..7);
5070 }
5071
5072 #[test]
5073 fn test_schedule_instructions() {
5074 let rep_data: Vec<u64> = vec![5, 2, 3, 0, 4, 7, 2, 0];
5076 let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
5077 let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
5078
5079 let check = |user_ranges, expected_instructions| {
5080 let instructions =
5081 ChunkInstructions::schedule_instructions(&repetition_index, user_ranges);
5082 assert_eq!(instructions, expected_instructions);
5083 };
5084
5085 let expected_take_all = vec![
5087 ChunkInstructions {
5088 chunk_idx: 0,
5089 preamble: PreambleAction::Absent,
5090 rows_to_skip: 0,
5091 rows_to_take: 6,
5092 take_trailer: true,
5093 },
5094 ChunkInstructions {
5095 chunk_idx: 1,
5096 preamble: PreambleAction::Take,
5097 rows_to_skip: 0,
5098 rows_to_take: 2,
5099 take_trailer: false,
5100 },
5101 ChunkInstructions {
5102 chunk_idx: 2,
5103 preamble: PreambleAction::Absent,
5104 rows_to_skip: 0,
5105 rows_to_take: 5,
5106 take_trailer: true,
5107 },
5108 ChunkInstructions {
5109 chunk_idx: 3,
5110 preamble: PreambleAction::Take,
5111 rows_to_skip: 0,
5112 rows_to_take: 1,
5113 take_trailer: false,
5114 },
5115 ];
5116
5117 check(&[0..14], expected_take_all.clone());
5119
5120 check(
5122 &[
5123 0..1,
5124 1..2,
5125 2..3,
5126 3..4,
5127 4..5,
5128 5..6,
5129 6..7,
5130 7..8,
5131 8..9,
5132 9..10,
5133 10..11,
5134 11..12,
5135 12..13,
5136 13..14,
5137 ],
5138 expected_take_all,
5139 );
5140
5141 check(
5145 &[0..1, 3..4],
5146 vec![
5147 ChunkInstructions {
5148 chunk_idx: 0,
5149 preamble: PreambleAction::Absent,
5150 rows_to_skip: 0,
5151 rows_to_take: 1,
5152 take_trailer: false,
5153 },
5154 ChunkInstructions {
5155 chunk_idx: 0,
5156 preamble: PreambleAction::Absent,
5157 rows_to_skip: 3,
5158 rows_to_take: 1,
5159 take_trailer: false,
5160 },
5161 ],
5162 );
5163
5164 check(
5166 &[5..6],
5167 vec![
5168 ChunkInstructions {
5169 chunk_idx: 0,
5170 preamble: PreambleAction::Absent,
5171 rows_to_skip: 5,
5172 rows_to_take: 1,
5173 take_trailer: true,
5174 },
5175 ChunkInstructions {
5176 chunk_idx: 1,
5177 preamble: PreambleAction::Take,
5178 rows_to_skip: 0,
5179 rows_to_take: 0,
5180 take_trailer: false,
5181 },
5182 ],
5183 );
5184
5185 check(
5187 &[7..10],
5188 vec![
5189 ChunkInstructions {
5190 chunk_idx: 1,
5191 preamble: PreambleAction::Skip,
5192 rows_to_skip: 1,
5193 rows_to_take: 1,
5194 take_trailer: false,
5195 },
5196 ChunkInstructions {
5197 chunk_idx: 2,
5198 preamble: PreambleAction::Absent,
5199 rows_to_skip: 0,
5200 rows_to_take: 2,
5201 take_trailer: false,
5202 },
5203 ],
5204 );
5205 }
5206
5207 #[test]
5208 fn test_drain_instructions() {
5209 fn drain_from_instructions(
5210 instructions: &mut VecDeque<ChunkInstructions>,
5211 mut rows_desired: u64,
5212 need_preamble: &mut bool,
5213 skip_in_chunk: &mut u64,
5214 ) -> Vec<ChunkDrainInstructions> {
5215 let mut drain_instructions = Vec::with_capacity(instructions.len());
5217 while rows_desired > 0 || *need_preamble {
5218 let (next_instructions, consumed_chunk) = instructions
5219 .front()
5220 .unwrap()
5221 .drain_from_instruction(&mut rows_desired, need_preamble, skip_in_chunk);
5222 if consumed_chunk {
5223 instructions.pop_front();
5224 }
5225 drain_instructions.push(next_instructions);
5226 }
5227 drain_instructions
5228 }
5229
5230 let rep_data: Vec<u64> = vec![5, 2, 3, 0, 4, 7, 2, 0];
5232 let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
5233 let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
5234 let user_ranges = vec![1..7, 10..14];
5235
5236 let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
5238
5239 let mut to_drain = VecDeque::from(scheduled.clone());
5240
5241 let mut need_preamble = false;
5244 let mut skip_in_chunk = 0;
5245
5246 let next_batch =
5247 drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
5248
5249 assert!(!need_preamble);
5250 assert_eq!(skip_in_chunk, 4);
5251 assert_eq!(
5252 next_batch,
5253 vec![ChunkDrainInstructions {
5254 chunk_instructions: scheduled[0].clone(),
5255 rows_to_take: 4,
5256 rows_to_skip: 0,
5257 preamble_action: PreambleAction::Absent,
5258 }]
5259 );
5260
5261 let next_batch =
5262 drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
5263
5264 assert!(!need_preamble);
5265 assert_eq!(skip_in_chunk, 2);
5266
5267 assert_eq!(
5268 next_batch,
5269 vec![
5270 ChunkDrainInstructions {
5271 chunk_instructions: scheduled[0].clone(),
5272 rows_to_take: 1,
5273 rows_to_skip: 4,
5274 preamble_action: PreambleAction::Absent,
5275 },
5276 ChunkDrainInstructions {
5277 chunk_instructions: scheduled[1].clone(),
5278 rows_to_take: 1,
5279 rows_to_skip: 0,
5280 preamble_action: PreambleAction::Take,
5281 },
5282 ChunkDrainInstructions {
5283 chunk_instructions: scheduled[2].clone(),
5284 rows_to_take: 2,
5285 rows_to_skip: 0,
5286 preamble_action: PreambleAction::Absent,
5287 }
5288 ]
5289 );
5290
5291 let next_batch =
5292 drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
5293
5294 assert!(!need_preamble);
5295 assert_eq!(skip_in_chunk, 0);
5296
5297 assert_eq!(
5298 next_batch,
5299 vec![
5300 ChunkDrainInstructions {
5301 chunk_instructions: scheduled[2].clone(),
5302 rows_to_take: 1,
5303 rows_to_skip: 2,
5304 preamble_action: PreambleAction::Absent,
5305 },
5306 ChunkDrainInstructions {
5307 chunk_instructions: scheduled[3].clone(),
5308 rows_to_take: 1,
5309 rows_to_skip: 0,
5310 preamble_action: PreambleAction::Take,
5311 },
5312 ]
5313 );
5314
5315 let rep_data: Vec<u64> = vec![5, 2, 3, 3, 20, 0];
5317 let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
5318 let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
5319 let user_ranges = vec![0..28];
5320
5321 let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
5323
5324 let mut to_drain = VecDeque::from(scheduled.clone());
5325
5326 let mut need_preamble = false;
5329 let mut skip_in_chunk = 0;
5330
5331 let next_batch =
5332 drain_from_instructions(&mut to_drain, 7, &mut need_preamble, &mut skip_in_chunk);
5333
5334 assert_eq!(
5335 next_batch,
5336 vec![
5337 ChunkDrainInstructions {
5338 chunk_instructions: scheduled[0].clone(),
5339 rows_to_take: 6,
5340 rows_to_skip: 0,
5341 preamble_action: PreambleAction::Absent,
5342 },
5343 ChunkDrainInstructions {
5344 chunk_instructions: scheduled[1].clone(),
5345 rows_to_take: 1,
5346 rows_to_skip: 0,
5347 preamble_action: PreambleAction::Take,
5348 },
5349 ]
5350 );
5351
5352 assert!(!need_preamble);
5353 assert_eq!(skip_in_chunk, 1);
5354
5355 let next_batch =
5358 drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
5359
5360 assert_eq!(
5361 next_batch,
5362 vec![
5363 ChunkDrainInstructions {
5364 chunk_instructions: scheduled[1].clone(),
5365 rows_to_take: 2,
5366 rows_to_skip: 1,
5367 preamble_action: PreambleAction::Skip,
5368 },
5369 ChunkDrainInstructions {
5370 chunk_instructions: scheduled[2].clone(),
5371 rows_to_take: 0,
5372 rows_to_skip: 0,
5373 preamble_action: PreambleAction::Take,
5374 },
5375 ]
5376 );
5377
5378 assert!(!need_preamble);
5379 assert_eq!(skip_in_chunk, 0);
5380 }
5381
5382 #[tokio::test]
5383 async fn test_fullzip_repetition_index_caching() {
5384 use crate::testing::SimulatedScheduler;
5385
5386 #[derive(Debug)]
5388 struct TestFixedDecompressor;
5389
5390 impl FixedPerValueDecompressor for TestFixedDecompressor {
5391 fn decompress(
5392 &self,
5393 _data: FixedWidthDataBlock,
5394 _num_rows: u64,
5395 ) -> crate::Result<DataBlock> {
5396 unimplemented!("Test decompressor")
5397 }
5398
5399 fn bits_per_value(&self) -> u64 {
5400 32
5401 }
5402 }
5403
5404 let rows_in_page = 100u64;
5406 let bytes_per_value = 4u64;
5407 let _rep_index_size = (rows_in_page + 1) * bytes_per_value;
5408
5409 let mut rep_index_data = Vec::new();
5411 for i in 0..=rows_in_page {
5412 let offset = (i * 100) as u32; rep_index_data.extend_from_slice(&offset.to_le_bytes());
5414 }
5415
5416 let mut full_data = vec![0u8; 1000];
5418 full_data.extend_from_slice(&rep_index_data);
5419 full_data.extend_from_slice(&vec![0u8; 10000]); let data = bytes::Bytes::from(full_data);
5422 let io = Arc::new(SimulatedScheduler::new(data));
5423 let _cache = Arc::new(lance_core::cache::LanceCache::with_capacity(1024 * 1024));
5424
5425 let mut scheduler = FullZipScheduler {
5427 data_buf_position: 0,
5428 rep_index: Some(FullZipRepIndexDetails {
5429 buf_position: 1000,
5430 bytes_per_value,
5431 }),
5432 priority: 0,
5433 rows_in_page,
5434 bits_per_offset: 32,
5435 details: Arc::new(FullZipDecodeDetails {
5436 value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
5437 def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
5438 ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
5439 max_rep: 0,
5440 max_visible_def: 0,
5441 }),
5442 cached_state: None,
5443 enable_cache: true, };
5445
5446 let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
5448 let cached_data1 = scheduler.initialize(&io_dyn).await.unwrap();
5449
5450 let is_cached = cached_data1
5452 .clone()
5453 .as_arc_any()
5454 .downcast::<FullZipCacheableState>()
5455 .is_ok();
5456 assert!(
5457 is_cached,
5458 "Expected FullZipCacheableState, got NoCachedPageData"
5459 );
5460
5461 scheduler.load(&cached_data1);
5463
5464 assert!(
5466 scheduler.cached_state.is_some(),
5467 "cached_state should be populated after load"
5468 );
5469
5470 let cached_state = scheduler.cached_state.as_ref().unwrap();
5472
5473 let ranges = vec![0..10, 20..30];
5475 let result = scheduler.schedule_ranges_rep(
5476 &ranges,
5477 &io_dyn,
5478 FullZipRepIndexDetails {
5479 buf_position: 1000,
5480 bytes_per_value,
5481 },
5482 );
5483
5484 assert!(
5486 result.is_ok(),
5487 "schedule_ranges_rep should succeed with cached data"
5488 );
5489
5490 let mut scheduler2 = FullZipScheduler {
5492 data_buf_position: 0,
5493 rep_index: Some(FullZipRepIndexDetails {
5494 buf_position: 1000,
5495 bytes_per_value,
5496 }),
5497 priority: 0,
5498 rows_in_page,
5499 bits_per_offset: 32,
5500 details: scheduler.details.clone(),
5501 cached_state: None,
5502 enable_cache: true, };
5504
5505 scheduler2.load(&cached_data1);
5507 assert!(
5508 scheduler2.cached_state.is_some(),
5509 "Second scheduler should have cached_state after load"
5510 );
5511
5512 let cached_state2 = scheduler2.cached_state.as_ref().unwrap();
5514 assert!(
5515 Arc::ptr_eq(cached_state, cached_state2),
5516 "Both schedulers should share the same cached data"
5517 );
5518 }
5519
5520 #[tokio::test]
5521 async fn test_fullzip_cache_config_controls_caching() {
5522 use crate::testing::SimulatedScheduler;
5523
5524 #[derive(Debug)]
5526 struct TestFixedDecompressor;
5527
5528 impl FixedPerValueDecompressor for TestFixedDecompressor {
5529 fn decompress(
5530 &self,
5531 _data: FixedWidthDataBlock,
5532 _num_rows: u64,
5533 ) -> crate::Result<DataBlock> {
5534 unimplemented!("Test decompressor")
5535 }
5536
5537 fn bits_per_value(&self) -> u64 {
5538 32
5539 }
5540 }
5541
5542 let rows_in_page = 1000_u64;
5544 let bytes_per_value = 4_u64;
5545
5546 let rep_index_data = vec![0u8; ((rows_in_page + 1) * bytes_per_value) as usize];
5548 let value_data = vec![0u8; 4000]; let mut full_data = vec![0u8; 1000]; full_data.extend_from_slice(&rep_index_data);
5551 full_data.extend_from_slice(&value_data);
5552
5553 let data = bytes::Bytes::from(full_data);
5554 let io = Arc::new(SimulatedScheduler::new(data));
5555
5556 let mut scheduler_no_cache = FullZipScheduler {
5558 data_buf_position: 0,
5559 rep_index: Some(FullZipRepIndexDetails {
5560 buf_position: 1000,
5561 bytes_per_value,
5562 }),
5563 priority: 0,
5564 rows_in_page,
5565 bits_per_offset: 32,
5566 details: Arc::new(FullZipDecodeDetails {
5567 value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
5568 def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
5569 ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
5570 max_rep: 0,
5571 max_visible_def: 0,
5572 }),
5573 cached_state: None,
5574 enable_cache: false, };
5576
5577 let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
5578 let cached_data = scheduler_no_cache.initialize(&io_dyn).await.unwrap();
5579
5580 assert!(
5582 cached_data
5583 .as_arc_any()
5584 .downcast_ref::<super::NoCachedPageData>()
5585 .is_some(),
5586 "With enable_cache=false, should return NoCachedPageData"
5587 );
5588
5589 let mut scheduler_with_cache = FullZipScheduler {
5591 data_buf_position: 0,
5592 rep_index: Some(FullZipRepIndexDetails {
5593 buf_position: 1000,
5594 bytes_per_value,
5595 }),
5596 priority: 0,
5597 rows_in_page,
5598 bits_per_offset: 32,
5599 details: Arc::new(FullZipDecodeDetails {
5600 value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
5601 def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
5602 ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
5603 max_rep: 0,
5604 max_visible_def: 0,
5605 }),
5606 cached_state: None,
5607 enable_cache: true, };
5609
5610 let cached_data2 = scheduler_with_cache.initialize(&io_dyn).await.unwrap();
5611
5612 assert!(
5614 cached_data2
5615 .as_arc_any()
5616 .downcast_ref::<super::FullZipCacheableState>()
5617 .is_some(),
5618 "With enable_cache=true, should return FullZipCacheableState"
5619 );
5620 }
5621
5622 #[tokio::test]
5624 async fn test_fuzz_issue_4492_empty_rep_values() {
5625 use lance_datagen::{array, gen_batch, RowCount, Seed};
5626
5627 let seed = 1823859942947654717u64;
5628 let num_rows = 2741usize;
5629
5630 let batch_gen = gen_batch().with_seed(Seed::from(seed));
5632 let base_generator = array::rand_type(&DataType::FixedSizeBinary(32));
5633 let list_generator = array::rand_list_any(base_generator, false);
5634
5635 let batch = batch_gen
5636 .anon_col(list_generator)
5637 .into_batch_rows(RowCount::from(num_rows as u64))
5638 .unwrap();
5639
5640 let list_array = batch.column(0).clone();
5641
5642 let mut metadata = HashMap::new();
5644 metadata.insert(
5645 STRUCTURAL_ENCODING_META_KEY.to_string(),
5646 STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
5647 );
5648
5649 let test_cases = TestCases::default()
5650 .with_min_file_version(LanceFileVersion::V2_1)
5651 .with_batch_size(100)
5652 .with_range(0..num_rows.min(500) as u64)
5653 .with_indices(vec![0, num_rows as u64 / 2, (num_rows - 1) as u64]);
5654
5655 check_round_trip_encoding_of_data(vec![list_array], &test_cases, metadata).await
5656 }
5657
5658 async fn test_minichunk_size_helper(
5659 string_data: Vec<Option<String>>,
5660 minichunk_size: u64,
5661 file_version: LanceFileVersion,
5662 ) {
5663 use crate::constants::MINICHUNK_SIZE_META_KEY;
5664 use crate::testing::{check_round_trip_encoding_of_data, TestCases};
5665 use arrow_array::{ArrayRef, StringArray};
5666 use std::sync::Arc;
5667
5668 let string_array: ArrayRef = Arc::new(StringArray::from(string_data));
5669
5670 let mut metadata = HashMap::new();
5671 metadata.insert(
5672 MINICHUNK_SIZE_META_KEY.to_string(),
5673 minichunk_size.to_string(),
5674 );
5675 metadata.insert(
5676 STRUCTURAL_ENCODING_META_KEY.to_string(),
5677 STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
5678 );
5679
5680 let test_cases = TestCases::default()
5681 .with_min_file_version(file_version)
5682 .with_batch_size(1000);
5683
5684 check_round_trip_encoding_of_data(vec![string_array], &test_cases, metadata).await;
5685 }
5686
5687 #[tokio::test]
5688 async fn test_minichunk_size_roundtrip() {
5689 let mut string_data = Vec::new();
5691 for i in 0..100 {
5692 string_data.push(Some(format!("test_string_{}", i).repeat(50)));
5693 }
5694 test_minichunk_size_helper(string_data, 64, LanceFileVersion::V2_1).await;
5696 }
5697
5698 #[tokio::test]
5699 async fn test_minichunk_size_128kb_v2_2() {
5700 let mut string_data = Vec::new();
5702 for i in 0..10000 {
5704 string_data.push(Some(format!("test_string_{}", i).repeat(50)));
5705 }
5706 test_minichunk_size_helper(string_data, 128 * 1024, LanceFileVersion::V2_2).await;
5707 }
5708
5709 #[tokio::test]
5710 async fn test_binary_large_minichunk_size_over_max_miniblock_values() {
5711 let mut string_data = Vec::new();
5712 for i in 0..10000 {
5714 string_data.push(Some(format!("t_{}", i)));
5715 }
5716 test_minichunk_size_helper(string_data, 128 * 1024, LanceFileVersion::V2_2).await;
5717 }
5718
5719 #[tokio::test]
5720 async fn test_large_dictionary_general_compression() {
5721 use arrow_array::{ArrayRef, StringArray};
5722 use std::collections::HashMap;
5723 use std::sync::Arc;
5724
5725 let unique_values: Vec<String> = (0..100)
5728 .map(|i| format!("value_{:04}_{}", i, "x".repeat(500)))
5729 .collect();
5730
5731 let repeated_strings: Vec<_> = unique_values
5733 .iter()
5734 .cycle()
5735 .take(100_000)
5736 .map(|s| Some(s.as_str()))
5737 .collect();
5738
5739 let string_array = Arc::new(StringArray::from(repeated_strings)) as ArrayRef;
5740
5741 let test_cases = TestCases::default()
5743 .with_min_file_version(LanceFileVersion::V2_2)
5744 .with_verify_encoding(Arc::new(|cols: &[crate::encoder::EncodedColumn], _| {
5745 assert_eq!(cols.len(), 1);
5746 let col = &cols[0];
5747
5748 if let Some(PageEncoding::Structural(page_layout)) = &col.final_pages.first().map(|p| &p.description) {
5750 if let Some(pb21::page_layout::Layout::MiniBlockLayout(mini_block)) = &page_layout.layout {
5752 if let Some(dictionary_encoding) = &mini_block.dictionary {
5753 match dictionary_encoding.compression.as_ref() {
5754 Some(Compression::General(general)) => {
5755 let compression = general.compression.as_ref().unwrap();
5757 assert!(
5758 compression.scheme() == pb21::CompressionScheme::CompressionAlgorithmLz4
5759 || compression.scheme() == pb21::CompressionScheme::CompressionAlgorithmZstd,
5760 "Expected LZ4 or Zstd compression for large dictionary"
5761 );
5762 }
5763 _ => panic!("Expected General compression for large dictionary"),
5764 }
5765 }
5766 }
5767 }
5768 }));
5769
5770 check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
5771 }
5772
5773 #[tokio::test]
5774 async fn test_dictionary_encode_int64() {
5775 use crate::constants::{DICT_SIZE_RATIO_META_KEY, STRUCTURAL_ENCODING_META_KEY};
5776 use crate::testing::{check_round_trip_encoding_of_data, TestCases};
5777 use crate::version::LanceFileVersion;
5778 use arrow_array::{ArrayRef, Int64Array};
5779 use std::collections::HashMap;
5780 use std::sync::Arc;
5781
5782 let values = (0..1000)
5784 .map(|i| match i % 3 {
5785 0 => 10i64,
5786 1 => 20i64,
5787 _ => 30i64,
5788 })
5789 .collect::<Vec<_>>();
5790 let array = Arc::new(Int64Array::from(values)) as ArrayRef;
5791
5792 let mut metadata = HashMap::new();
5793 metadata.insert(
5794 STRUCTURAL_ENCODING_META_KEY.to_string(),
5795 STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
5796 );
5797 metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.99".to_string());
5798
5799 let test_cases = TestCases::default()
5800 .with_min_file_version(LanceFileVersion::V2_2)
5801 .with_batch_size(1000)
5802 .with_range(0..1000)
5803 .with_indices(vec![0, 1, 10, 999])
5804 .with_expected_encoding("dictionary");
5805
5806 check_round_trip_encoding_of_data(vec![array], &test_cases, metadata).await;
5807 }
5808
5809 #[tokio::test]
5810 async fn test_dictionary_encode_float64() {
5811 use crate::constants::{DICT_SIZE_RATIO_META_KEY, STRUCTURAL_ENCODING_META_KEY};
5812 use crate::testing::{check_round_trip_encoding_of_data, TestCases};
5813 use crate::version::LanceFileVersion;
5814 use arrow_array::{ArrayRef, Float64Array};
5815 use std::collections::HashMap;
5816 use std::sync::Arc;
5817
5818 let values = (0..1000)
5820 .map(|i| match i % 3 {
5821 0 => 0.1f64,
5822 1 => 0.2f64,
5823 _ => 0.3f64,
5824 })
5825 .collect::<Vec<_>>();
5826 let array = Arc::new(Float64Array::from(values)) as ArrayRef;
5827
5828 let mut metadata = HashMap::new();
5829 metadata.insert(
5830 STRUCTURAL_ENCODING_META_KEY.to_string(),
5831 STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
5832 );
5833 metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.99".to_string());
5834
5835 let test_cases = TestCases::default()
5836 .with_min_file_version(LanceFileVersion::V2_2)
5837 .with_batch_size(1000)
5838 .with_range(0..1000)
5839 .with_indices(vec![0, 1, 10, 999])
5840 .with_expected_encoding("dictionary");
5841
5842 check_round_trip_encoding_of_data(vec![array], &test_cases, metadata).await;
5843 }
5844
5845 fn create_test_fixed_data_block(
5849 num_values: u64,
5850 cardinality: u64,
5851 bits_per_value: u64,
5852 ) -> DataBlock {
5853 use crate::statistics::Stat;
5854
5855 let block_info = BlockInfo::default();
5856
5857 let cardinality_array = Arc::new(UInt64Array::from(vec![cardinality]));
5859 block_info
5860 .0
5861 .write()
5862 .unwrap()
5863 .insert(Stat::Cardinality, cardinality_array);
5864
5865 assert_eq!(bits_per_value % 8, 0);
5866 let bytes_per_value = bits_per_value / 8;
5867 DataBlock::FixedWidth(FixedWidthDataBlock {
5868 bits_per_value,
5869 data: crate::buffer::LanceBuffer::from(vec![
5870 0u8;
5871 (num_values * bytes_per_value) as usize
5872 ]),
5873 num_values,
5874 block_info,
5875 })
5876 }
5877
5878 fn create_test_variable_width_block(num_values: u64, cardinality: u64) -> DataBlock {
5880 use crate::statistics::Stat;
5881 use arrow_array::StringArray;
5882
5883 assert!(cardinality <= num_values && cardinality > 0);
5884
5885 let mut values = Vec::with_capacity(num_values as usize);
5886 for i in 0..num_values {
5887 values.push(format!("value_{:016}", i % cardinality));
5888 }
5889
5890 let array = StringArray::from(values);
5891 let block = DataBlock::from_array(Arc::new(array) as ArrayRef);
5892
5893 if let DataBlock::VariableWidth(ref var_block) = block {
5895 let mut info = var_block.block_info.0.write().unwrap();
5896 info.insert(
5898 Stat::Cardinality,
5899 Arc::new(UInt64Array::from(vec![cardinality])),
5900 );
5901 }
5902
5903 block
5904 }
5905
5906 #[test]
5907 fn test_estimate_dict_size_fixed_width() {
5908 use crate::encodings::logical::primitive::dict::DICT_INDICES_BITS_PER_VALUE;
5909
5910 let bits_per_value = 128;
5911 let block = create_test_fixed_data_block(1000, 400, bits_per_value);
5912 let estimated_size =
5913 PrimitiveStructuralEncoder::estimate_dict_size(&block, LanceFileVersion::V2_1).unwrap();
5914
5915 let expected_dict_size = 400 * (bits_per_value / 8);
5918 let expected_indices_size = 1000 * (DICT_INDICES_BITS_PER_VALUE / 8);
5919 let expected_total = expected_dict_size + expected_indices_size;
5920
5921 assert_eq!(estimated_size, expected_total);
5922 }
5923
5924 #[test]
5925 fn test_estimate_dict_size_variable_width() {
5926 let block = create_test_variable_width_block(1000, 400);
5927 let estimated_size =
5928 PrimitiveStructuralEncoder::estimate_dict_size(&block, LanceFileVersion::V2_1).unwrap();
5929
5930 let data_size = block.data_size();
5932 let avg_value_size = data_size / 1000;
5933
5934 let expected = 400 * avg_value_size + 400 * 4 + 1000 * 4;
5935
5936 assert_eq!(estimated_size, expected);
5937 }
5938
5939 #[test]
5940 fn test_should_dictionary_encode() {
5941 use crate::constants::DICT_SIZE_RATIO_META_KEY;
5942 use lance_core::datatypes::Field as LanceField;
5943
5944 let block = create_test_variable_width_block(1000, 10);
5946
5947 let mut metadata = HashMap::new();
5948 metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.8".to_string());
5949 let arrow_field =
5950 arrow_schema::Field::new("test", DataType::Int32, false).with_metadata(metadata);
5951 let field = LanceField::try_from(&arrow_field).unwrap();
5952
5953 let result = PrimitiveStructuralEncoder::should_dictionary_encode(
5954 &block,
5955 &field,
5956 LanceFileVersion::V2_1,
5957 );
5958
5959 assert!(result, "Should use dictionary encode based on size");
5960 }
5961
5962 #[test]
5963 fn test_should_not_dictionary_encode() {
5964 use crate::constants::DICT_SIZE_RATIO_META_KEY;
5965 use lance_core::datatypes::Field as LanceField;
5966
5967 let block = create_test_fixed_data_block(1000, 1000, 128);
5968
5969 let mut metadata = HashMap::new();
5970 metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.8".to_string());
5971 let arrow_field =
5972 arrow_schema::Field::new("test", DataType::Int32, false).with_metadata(metadata);
5973 let field = LanceField::try_from(&arrow_field).unwrap();
5974
5975 let result = PrimitiveStructuralEncoder::should_dictionary_encode(
5976 &block,
5977 &field,
5978 LanceFileVersion::V2_1,
5979 );
5980
5981 assert!(!result, "Should not use dictionary encode based on size");
5982 }
5983}