1use std::{
5 any::Any,
6 collections::{HashMap, VecDeque},
7 env,
8 fmt::Debug,
9 iter,
10 ops::Range,
11 sync::Arc,
12 vec,
13};
14
15use arrow::array::AsArray;
16use arrow_array::{make_array, types::UInt64Type, Array, ArrayRef, PrimitiveArray};
17use arrow_buffer::{BooleanBuffer, NullBuffer, ScalarBuffer};
18use arrow_schema::{DataType, Field as ArrowField};
19use futures::{future::BoxFuture, stream::FuturesOrdered, FutureExt, TryStreamExt};
20use itertools::Itertools;
21use lance_arrow::deepcopy::deep_copy_nulls;
22use lance_core::{
23 cache::{CacheKey, Context, DeepSizeOf},
24 datatypes::{
25 DICT_DIVISOR_META_KEY, STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY,
26 STRUCTURAL_ENCODING_MINIBLOCK,
27 },
28 error::Error,
29 utils::{bit::pad_bytes, hash::U8SliceKey},
30};
31use log::trace;
32use snafu::location;
33
34use crate::{
35 compression::{
36 BlockDecompressor, CompressionStrategy, DecompressionStrategy, MiniBlockDecompressor,
37 },
38 data::{AllNullDataBlock, DataBlock, VariableWidthBlock},
39 utils::bytepack::BytepackedIntegerEncoder,
40};
41use crate::{
42 compression::{FixedPerValueDecompressor, VariablePerValueDecompressor},
43 encodings::logical::primitive::fullzip::PerValueDataBlock,
44};
45use crate::{
46 encodings::logical::primitive::miniblock::MiniBlockChunk, utils::bytepack::ByteUnpacker,
47};
48use crate::{
49 encodings::logical::primitive::miniblock::MiniBlockCompressed,
50 statistics::{ComputeStat, GetStat, Stat},
51};
52use crate::{
53 repdef::{
54 build_control_word_iterator, CompositeRepDefUnraveler, ControlWordIterator,
55 ControlWordParser, DefinitionInterpretation, RepDefSlicer,
56 },
57 utils::accumulation::AccumulationQueue,
58};
59use lance_core::{datatypes::Field, utils::tokio::spawn_cpu, Result};
60
61use crate::{
62 buffer::LanceBuffer,
63 data::{BlockInfo, DataBlockBuilder, FixedWidthDataBlock},
64 decoder::{
65 ColumnInfo, DecodePageTask, DecodedArray, DecodedPage, FilterExpression, LoadedPage,
66 MessageType, PageEncoding, PageInfo, ScheduledScanLine, SchedulerContext,
67 StructuralDecodeArrayTask, StructuralFieldDecoder, StructuralFieldScheduler,
68 StructuralPageDecoder, StructuralSchedulingJob, UnloadedPage,
69 },
70 encoder::{
71 EncodeTask, EncodedColumn, EncodedPage, EncodingOptions, FieldEncoder, OutOfLineBuffers,
72 },
73 format::{pb, ProtobufUtils},
74 repdef::{LevelBuffer, RepDefBuilder, RepDefUnraveler},
75 EncodingsIo,
76};
77
78pub mod fullzip;
79pub mod miniblock;
80
81const FILL_BYTE: u8 = 0xFE;
82
83trait StructuralPageScheduler: std::fmt::Debug + Send {
86 fn initialize<'a>(
88 &'a mut self,
89 io: &Arc<dyn EncodingsIo>,
90 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>>;
91 fn load(&mut self, data: &Arc<dyn CachedPageData>);
93 fn schedule_ranges(
95 &self,
96 ranges: &[Range<u64>],
97 io: &Arc<dyn EncodingsIo>,
98 ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>>;
99}
100
101#[derive(Debug)]
103struct ChunkMeta {
104 num_values: u64,
105 chunk_size_bytes: u64,
106 offset_bytes: u64,
107}
108
109#[derive(Debug)]
111struct DecodedMiniBlockChunk {
112 rep: Option<ScalarBuffer<u16>>,
113 def: Option<ScalarBuffer<u16>>,
114 values: DataBlock,
115}
116
117#[derive(Debug)]
125struct DecodeMiniBlockTask {
126 rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
127 def_decompressor: Option<Arc<dyn BlockDecompressor>>,
128 value_decompressor: Arc<dyn MiniBlockDecompressor>,
129 dictionary_data: Option<Arc<DataBlock>>,
130 def_meaning: Arc<[DefinitionInterpretation]>,
131 num_buffers: u64,
132 max_visible_level: u16,
133 instructions: Vec<(ChunkDrainInstructions, LoadedChunk)>,
134}
135
136impl DecodeMiniBlockTask {
137 fn decode_levels(
138 rep_decompressor: &dyn BlockDecompressor,
139 levels: LanceBuffer,
140 num_levels: u16,
141 ) -> Result<ScalarBuffer<u16>> {
142 let rep = rep_decompressor.decompress(levels, num_levels as u64)?;
143 let mut rep = rep.as_fixed_width().unwrap();
144 debug_assert_eq!(rep.num_values, num_levels as u64);
145 debug_assert_eq!(rep.bits_per_value, 16);
146 Ok(rep.data.borrow_to_typed_slice::<u16>())
147 }
148
149 fn extend_levels(
156 range: Range<u64>,
157 levels: &mut Option<LevelBuffer>,
158 level_buf: &Option<impl AsRef<[u16]>>,
159 dest_offset: usize,
160 ) {
161 if let Some(level_buf) = level_buf {
162 if levels.is_none() {
163 let mut new_levels_vec =
166 LevelBuffer::with_capacity(dest_offset + (range.end - range.start) as usize);
167 new_levels_vec.extend(iter::repeat_n(0, dest_offset));
168 *levels = Some(new_levels_vec);
169 }
170 levels.as_mut().unwrap().extend(
171 level_buf.as_ref()[range.start as usize..range.end as usize]
172 .iter()
173 .copied(),
174 );
175 } else if let Some(levels) = levels {
176 let num_values = (range.end - range.start) as usize;
177 levels.extend(iter::repeat_n(0, num_values));
180 }
181 }
182
183 fn map_range(
220 range: Range<u64>,
221 rep: Option<&impl AsRef<[u16]>>,
222 def: Option<&impl AsRef<[u16]>>,
223 max_rep: u16,
224 max_visible_def: u16,
225 total_items: u64,
228 preamble_action: PreambleAction,
229 ) -> (Range<u64>, Range<u64>) {
230 if let Some(rep) = rep {
231 let mut rep = rep.as_ref();
232 let mut items_in_preamble = 0;
235 let first_row_start = match preamble_action {
236 PreambleAction::Skip | PreambleAction::Take => {
237 let first_row_start = if let Some(def) = def.as_ref() {
238 let mut first_row_start = None;
239 for (idx, (rep, def)) in rep.iter().zip(def.as_ref()).enumerate() {
240 if *rep == max_rep {
241 first_row_start = Some(idx);
242 break;
243 }
244 if *def <= max_visible_def {
245 items_in_preamble += 1;
246 }
247 }
248 first_row_start
249 } else {
250 let first_row_start = rep.iter().position(|&r| r == max_rep);
251 items_in_preamble = first_row_start.unwrap_or(rep.len());
252 first_row_start
253 };
254 if first_row_start.is_none() {
257 assert!(preamble_action == PreambleAction::Take);
258 return (0..total_items, 0..rep.len() as u64);
259 }
260 let first_row_start = first_row_start.unwrap() as u64;
261 rep = &rep[first_row_start as usize..];
262 first_row_start
263 }
264 PreambleAction::Absent => {
265 debug_assert!(rep[0] == max_rep);
266 0
267 }
268 };
269
270 if range.start == range.end {
272 debug_assert!(preamble_action == PreambleAction::Take);
273 return (0..items_in_preamble as u64, 0..first_row_start);
274 }
275 assert!(range.start < range.end);
276
277 let mut rows_seen = 0;
278 let mut new_start = 0;
279 let mut new_levels_start = 0;
280
281 if let Some(def) = def {
282 let def = &def.as_ref()[first_row_start as usize..];
283
284 let mut lead_invis_seen = 0;
286
287 if range.start > 0 {
288 if def[0] > max_visible_def {
289 lead_invis_seen += 1;
290 }
291 for (idx, (rep, def)) in rep.iter().zip(def).skip(1).enumerate() {
292 if *rep == max_rep {
293 rows_seen += 1;
294 if rows_seen == range.start {
295 new_start = idx as u64 + 1 - lead_invis_seen;
296 new_levels_start = idx as u64 + 1;
297 break;
298 }
299 if *def > max_visible_def {
300 lead_invis_seen += 1;
301 }
302 }
303 }
304 }
305
306 rows_seen += 1;
307
308 let mut new_end = u64::MAX;
309 let mut new_levels_end = rep.len() as u64;
310 let new_start_is_visible = def[new_levels_start as usize] <= max_visible_def;
311 let mut tail_invis_seen = if new_start_is_visible { 0 } else { 1 };
312 for (idx, (rep, def)) in rep[(new_levels_start + 1) as usize..]
313 .iter()
314 .zip(&def[(new_levels_start + 1) as usize..])
315 .enumerate()
316 {
317 if *rep == max_rep {
318 rows_seen += 1;
319 if rows_seen == range.end + 1 {
320 new_end = idx as u64 + new_start + 1 - tail_invis_seen;
321 new_levels_end = idx as u64 + new_levels_start + 1;
322 break;
323 }
324 if *def > max_visible_def {
325 tail_invis_seen += 1;
326 }
327 }
328 }
329
330 if new_end == u64::MAX {
331 new_levels_end = rep.len() as u64;
332 let total_invis_seen = lead_invis_seen + tail_invis_seen;
334 new_end = rep.len() as u64 - total_invis_seen;
335 }
336
337 assert_ne!(new_end, u64::MAX);
338
339 if preamble_action == PreambleAction::Skip {
341 new_start += first_row_start;
344 new_end += first_row_start;
345 new_levels_start += first_row_start;
346 new_levels_end += first_row_start;
347 } else if preamble_action == PreambleAction::Take {
348 debug_assert_eq!(new_start, 0);
349 debug_assert_eq!(new_levels_start, 0);
350 new_end += first_row_start;
351 new_levels_end += first_row_start;
352 }
353
354 (new_start..new_end, new_levels_start..new_levels_end)
355 } else {
356 if range.start > 0 {
362 for (idx, rep) in rep.iter().skip(1).enumerate() {
363 if *rep == max_rep {
364 rows_seen += 1;
365 if rows_seen == range.start {
366 new_start = idx as u64 + 1;
367 break;
368 }
369 }
370 }
371 }
372 let mut new_end = rep.len() as u64;
373 if range.end < total_items {
375 for (idx, rep) in rep[(new_start + 1) as usize..].iter().enumerate() {
376 if *rep == max_rep {
377 rows_seen += 1;
378 if rows_seen == range.end {
379 new_end = idx as u64 + new_start + 1;
380 break;
381 }
382 }
383 }
384 }
385
386 if preamble_action == PreambleAction::Skip {
388 new_start += first_row_start;
389 new_end += first_row_start;
390 } else if preamble_action == PreambleAction::Take {
391 debug_assert_eq!(new_start, 0);
392 new_end += first_row_start;
393 }
394
395 (new_start..new_end, new_start..new_end)
396 }
397 } else {
398 (range.clone(), range)
401 }
402 }
403
404 fn decode_miniblock_chunk(
406 &self,
407 buf: &LanceBuffer,
408 items_in_chunk: u64,
409 ) -> Result<DecodedMiniBlockChunk> {
410 let mut offset = 0;
411 let num_levels = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
412 offset += 2;
413
414 let rep_size = if self.rep_decompressor.is_some() {
415 let rep_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
416 offset += 2;
417 Some(rep_size)
418 } else {
419 None
420 };
421 let def_size = if self.def_decompressor.is_some() {
422 let def_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
423 offset += 2;
424 Some(def_size)
425 } else {
426 None
427 };
428 let buffer_sizes = (0..self.num_buffers)
429 .map(|_| {
430 let size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
431 offset += 2;
432 size
433 })
434 .collect::<Vec<_>>();
435
436 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
437
438 let rep = rep_size.map(|rep_size| {
439 let rep = buf.slice_with_length(offset, rep_size as usize);
440 offset += rep_size as usize;
441 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
442 rep
443 });
444
445 let def = def_size.map(|def_size| {
446 let def = buf.slice_with_length(offset, def_size as usize);
447 offset += def_size as usize;
448 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
449 def
450 });
451
452 let buffers = buffer_sizes
453 .into_iter()
454 .map(|buf_size| {
455 let buf = buf.slice_with_length(offset, buf_size as usize);
456 offset += buf_size as usize;
457 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
458 buf
459 })
460 .collect::<Vec<_>>();
461
462 let values = self
463 .value_decompressor
464 .decompress(buffers, items_in_chunk)?;
465
466 let rep = rep
467 .map(|rep| {
468 Self::decode_levels(
469 self.rep_decompressor.as_ref().unwrap().as_ref(),
470 rep,
471 num_levels,
472 )
473 })
474 .transpose()?;
475 let def = def
476 .map(|def| {
477 Self::decode_levels(
478 self.def_decompressor.as_ref().unwrap().as_ref(),
479 def,
480 num_levels,
481 )
482 })
483 .transpose()?;
484
485 Ok(DecodedMiniBlockChunk { rep, def, values })
486 }
487}
488
489impl DecodePageTask for DecodeMiniBlockTask {
490 fn decode(self: Box<Self>) -> Result<DecodedPage> {
491 let mut repbuf: Option<LevelBuffer> = None;
493 let mut defbuf: Option<LevelBuffer> = None;
494
495 let max_rep = self.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
496
497 let estimated_size_bytes = self
499 .instructions
500 .iter()
501 .map(|(_, chunk)| chunk.data.len())
502 .sum::<usize>()
503 * 2;
504 let mut data_builder =
505 DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
506
507 let mut level_offset = 0;
509 for (instructions, chunk) in self.instructions.iter() {
511 let DecodedMiniBlockChunk { rep, def, values } =
515 self.decode_miniblock_chunk(&chunk.data, chunk.items_in_chunk)?;
516
517 let row_range_start =
519 instructions.rows_to_skip + instructions.chunk_instructions.rows_to_skip;
520 let row_range_end = row_range_start + instructions.rows_to_take;
521
522 let (item_range, level_range) = Self::map_range(
524 row_range_start..row_range_end,
525 rep.as_ref(),
526 def.as_ref(),
527 max_rep,
528 self.max_visible_level,
529 chunk.items_in_chunk,
530 instructions.preamble_action,
531 );
532
533 Self::extend_levels(level_range.clone(), &mut repbuf, &rep, level_offset);
535 Self::extend_levels(level_range.clone(), &mut defbuf, &def, level_offset);
536 level_offset += (level_range.end - level_range.start) as usize;
537 data_builder.append(&values, item_range);
538 }
539
540 let data = data_builder.finish();
541
542 let unraveler = RepDefUnraveler::new(repbuf, defbuf, self.def_meaning.clone());
543
544 if let Some(dictionary) = &self.dictionary_data {
546 let estimated_size_bytes = dictionary.data_size()
548 * (data.num_values() + dictionary.num_values() - 1)
549 / dictionary.num_values();
550 let mut data_builder = DataBlockBuilder::with_capacity_estimate(estimated_size_bytes);
551
552 if let DataBlock::FixedWidth(mut fixed_width_data_block) = data {
554 match fixed_width_data_block.bits_per_value {
555 32 => {
556 let indices = fixed_width_data_block.data.borrow_to_typed_slice::<i32>();
557 let indices = indices.as_ref();
558
559 indices.iter().for_each(|&idx| {
560 data_builder.append(dictionary, idx as u64..idx as u64 + 1);
561 });
562 }
563 64 => {
564 let indices = fixed_width_data_block.data.borrow_to_typed_slice::<i64>();
565 let indices = indices.as_ref();
566
567 indices.iter().for_each(|&idx| {
568 data_builder.append(dictionary, idx as u64..idx as u64 + 1);
569 });
570 }
571 _ => {
572 return Err(lance_core::Error::Internal {
573 message: format!(
574 "Unsupported dictionary index bit width: {} bits",
575 fixed_width_data_block.bits_per_value
576 ),
577 location: location!(),
578 });
579 }
580 }
581
582 let data = data_builder.finish();
583 return Ok(DecodedPage {
584 data,
585 repdef: unraveler,
586 });
587 }
588 }
589
590 Ok(DecodedPage {
591 data,
592 repdef: unraveler,
593 })
594 }
595}
596
597#[derive(Debug)]
600struct LoadedChunk {
601 data: LanceBuffer,
602 items_in_chunk: u64,
603 byte_range: Range<u64>,
604 chunk_idx: usize,
605}
606
607impl Clone for LoadedChunk {
608 fn clone(&self) -> Self {
609 Self {
610 data: self.data.try_clone().unwrap(),
612 items_in_chunk: self.items_in_chunk,
613 byte_range: self.byte_range.clone(),
614 chunk_idx: self.chunk_idx,
615 }
616 }
617}
618
619#[derive(Debug)]
622struct MiniBlockDecoder {
623 rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
624 def_decompressor: Option<Arc<dyn BlockDecompressor>>,
625 value_decompressor: Arc<dyn MiniBlockDecompressor>,
626 def_meaning: Arc<[DefinitionInterpretation]>,
627 loaded_chunks: VecDeque<LoadedChunk>,
628 instructions: VecDeque<ChunkInstructions>,
629 offset_in_current_chunk: u64,
630 num_rows: u64,
631 num_buffers: u64,
632 dictionary: Option<Arc<DataBlock>>,
633}
634
635impl StructuralPageDecoder for MiniBlockDecoder {
638 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
639 let mut items_desired = num_rows;
640 let mut need_preamble = false;
641 let mut skip_in_chunk = self.offset_in_current_chunk;
642 let mut drain_instructions = Vec::new();
643 while items_desired > 0 || need_preamble {
644 let (instructions, consumed) = self
645 .instructions
646 .front()
647 .unwrap()
648 .drain_from_instruction(&mut items_desired, &mut need_preamble, &mut skip_in_chunk);
649
650 while self.loaded_chunks.front().unwrap().chunk_idx
651 != instructions.chunk_instructions.chunk_idx
652 {
653 self.loaded_chunks.pop_front();
654 }
655 drain_instructions.push((instructions, self.loaded_chunks.front().unwrap().clone()));
656 if consumed {
657 self.instructions.pop_front();
658 }
659 }
660 self.offset_in_current_chunk = skip_in_chunk;
663
664 let max_visible_level = self
665 .def_meaning
666 .iter()
667 .take_while(|l| !l.is_list())
668 .map(|l| l.num_def_levels())
669 .sum::<u16>();
670
671 Ok(Box::new(DecodeMiniBlockTask {
672 instructions: drain_instructions,
673 def_decompressor: self.def_decompressor.clone(),
674 rep_decompressor: self.rep_decompressor.clone(),
675 value_decompressor: self.value_decompressor.clone(),
676 dictionary_data: self.dictionary.clone(),
677 def_meaning: self.def_meaning.clone(),
678 num_buffers: self.num_buffers,
679 max_visible_level,
680 }))
681 }
682
683 fn num_rows(&self) -> u64 {
684 self.num_rows
685 }
686}
687
688#[derive(Debug)]
689struct CachedComplexAllNullState {
690 rep: Option<ScalarBuffer<u16>>,
691 def: Option<ScalarBuffer<u16>>,
692}
693
694impl DeepSizeOf for CachedComplexAllNullState {
695 fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
696 self.rep.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
697 + self.def.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
698 }
699}
700
701impl CachedPageData for CachedComplexAllNullState {
702 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
703 self
704 }
705}
706
707#[derive(Debug)]
716pub struct ComplexAllNullScheduler {
717 buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
719 def_meaning: Arc<[DefinitionInterpretation]>,
720 repdef: Option<Arc<CachedComplexAllNullState>>,
721}
722
723impl ComplexAllNullScheduler {
724 pub fn new(
725 buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
726 def_meaning: Arc<[DefinitionInterpretation]>,
727 ) -> Self {
728 Self {
729 buffer_offsets_and_sizes,
730 def_meaning,
731 repdef: None,
732 }
733 }
734}
735
736impl StructuralPageScheduler for ComplexAllNullScheduler {
737 fn initialize<'a>(
738 &'a mut self,
739 io: &Arc<dyn EncodingsIo>,
740 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
741 let (rep_pos, rep_size) = self.buffer_offsets_and_sizes[0];
743 let (def_pos, def_size) = self.buffer_offsets_and_sizes[1];
744 let has_rep = rep_size > 0;
745 let has_def = def_size > 0;
746
747 let mut reads = Vec::with_capacity(2);
748 if has_rep {
749 reads.push(rep_pos..rep_pos + rep_size);
750 }
751 if has_def {
752 reads.push(def_pos..def_pos + def_size);
753 }
754
755 let data = io.submit_request(reads, 0);
756
757 async move {
758 let data = data.await?;
759 let mut data_iter = data.into_iter();
760
761 let rep = if has_rep {
762 let rep = data_iter.next().unwrap();
763 let mut rep = LanceBuffer::from_bytes(rep, 2);
764 let rep = rep.borrow_to_typed_slice::<u16>();
765 Some(rep)
766 } else {
767 None
768 };
769
770 let def = if has_def {
771 let def = data_iter.next().unwrap();
772 let mut def = LanceBuffer::from_bytes(def, 2);
773 let def = def.borrow_to_typed_slice::<u16>();
774 Some(def)
775 } else {
776 None
777 };
778
779 let repdef = Arc::new(CachedComplexAllNullState { rep, def });
780
781 self.repdef = Some(repdef.clone());
782
783 Ok(repdef as Arc<dyn CachedPageData>)
784 }
785 .boxed()
786 }
787
788 fn load(&mut self, data: &Arc<dyn CachedPageData>) {
789 self.repdef = Some(
790 data.clone()
791 .as_arc_any()
792 .downcast::<CachedComplexAllNullState>()
793 .unwrap(),
794 );
795 }
796
797 fn schedule_ranges(
798 &self,
799 ranges: &[Range<u64>],
800 _io: &Arc<dyn EncodingsIo>,
801 ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
802 let ranges = VecDeque::from_iter(ranges.iter().cloned());
803 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
804 Ok(std::future::ready(Ok(Box::new(ComplexAllNullPageDecoder {
805 ranges,
806 rep: self.repdef.as_ref().unwrap().rep.clone(),
807 def: self.repdef.as_ref().unwrap().def.clone(),
808 num_rows,
809 def_meaning: self.def_meaning.clone(),
810 }) as Box<dyn StructuralPageDecoder>))
811 .boxed())
812 }
813}
814
815#[derive(Debug)]
816pub struct ComplexAllNullPageDecoder {
817 ranges: VecDeque<Range<u64>>,
818 rep: Option<ScalarBuffer<u16>>,
819 def: Option<ScalarBuffer<u16>>,
820 num_rows: u64,
821 def_meaning: Arc<[DefinitionInterpretation]>,
822}
823
824impl ComplexAllNullPageDecoder {
825 fn drain_ranges(&mut self, num_rows: u64) -> Vec<Range<u64>> {
826 let mut rows_desired = num_rows;
827 let mut ranges = Vec::with_capacity(self.ranges.len());
828 while rows_desired > 0 {
829 let front = self.ranges.front_mut().unwrap();
830 let avail = front.end - front.start;
831 if avail > rows_desired {
832 ranges.push(front.start..front.start + rows_desired);
833 front.start += rows_desired;
834 rows_desired = 0;
835 } else {
836 ranges.push(self.ranges.pop_front().unwrap());
837 rows_desired -= avail;
838 }
839 }
840 ranges
841 }
842}
843
844impl StructuralPageDecoder for ComplexAllNullPageDecoder {
845 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
846 let drained_ranges = self.drain_ranges(num_rows);
847 Ok(Box::new(DecodeComplexAllNullTask {
848 ranges: drained_ranges,
849 rep: self.rep.clone(),
850 def: self.def.clone(),
851 def_meaning: self.def_meaning.clone(),
852 }))
853 }
854
855 fn num_rows(&self) -> u64 {
856 self.num_rows
857 }
858}
859
860#[derive(Debug)]
863pub struct DecodeComplexAllNullTask {
864 ranges: Vec<Range<u64>>,
865 rep: Option<ScalarBuffer<u16>>,
866 def: Option<ScalarBuffer<u16>>,
867 def_meaning: Arc<[DefinitionInterpretation]>,
868}
869
870impl DecodeComplexAllNullTask {
871 fn decode_level(
872 &self,
873 levels: &Option<ScalarBuffer<u16>>,
874 num_values: u64,
875 ) -> Option<Vec<u16>> {
876 levels.as_ref().map(|levels| {
877 let mut referenced_levels = Vec::with_capacity(num_values as usize);
878 for range in &self.ranges {
879 referenced_levels.extend(
880 levels[range.start as usize..range.end as usize]
881 .iter()
882 .copied(),
883 );
884 }
885 referenced_levels
886 })
887 }
888}
889
890impl DecodePageTask for DecodeComplexAllNullTask {
891 fn decode(self: Box<Self>) -> Result<DecodedPage> {
892 let num_values = self.ranges.iter().map(|r| r.end - r.start).sum::<u64>();
893 let data = DataBlock::AllNull(AllNullDataBlock { num_values });
894 let rep = self.decode_level(&self.rep, num_values);
895 let def = self.decode_level(&self.def, num_values);
896 let unraveler = RepDefUnraveler::new(rep, def, self.def_meaning);
897 Ok(DecodedPage {
898 data,
899 repdef: unraveler,
900 })
901 }
902}
903
904#[derive(Debug, Default)]
909pub struct SimpleAllNullScheduler {}
910
911impl StructuralPageScheduler for SimpleAllNullScheduler {
912 fn initialize<'a>(
913 &'a mut self,
914 _io: &Arc<dyn EncodingsIo>,
915 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
916 std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
917 }
918
919 fn load(&mut self, _cache: &Arc<dyn CachedPageData>) {}
920
921 fn schedule_ranges(
922 &self,
923 ranges: &[Range<u64>],
924 _io: &Arc<dyn EncodingsIo>,
925 ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
926 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
927 Ok(std::future::ready(Ok(
928 Box::new(SimpleAllNullPageDecoder { num_rows }) as Box<dyn StructuralPageDecoder>
929 ))
930 .boxed())
931 }
932}
933
934#[derive(Debug)]
937struct SimpleAllNullDecodePageTask {
938 num_values: u64,
939}
940impl DecodePageTask for SimpleAllNullDecodePageTask {
941 fn decode(self: Box<Self>) -> Result<DecodedPage> {
942 let unraveler = RepDefUnraveler::new(
943 None,
944 Some(vec![1; self.num_values as usize]),
945 Arc::new([DefinitionInterpretation::NullableItem]),
946 );
947 Ok(DecodedPage {
948 data: DataBlock::AllNull(AllNullDataBlock {
949 num_values: self.num_values,
950 }),
951 repdef: unraveler,
952 })
953 }
954}
955
956#[derive(Debug)]
957pub struct SimpleAllNullPageDecoder {
958 num_rows: u64,
959}
960
961impl StructuralPageDecoder for SimpleAllNullPageDecoder {
962 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
963 Ok(Box::new(SimpleAllNullDecodePageTask {
964 num_values: num_rows,
965 }))
966 }
967
968 fn num_rows(&self) -> u64 {
969 self.num_rows
970 }
971}
972
973#[derive(Debug, Clone)]
974struct MiniBlockSchedulerDictionary {
975 dictionary_decompressor: Arc<dyn BlockDecompressor>,
977 dictionary_buf_position_and_size: (u64, u64),
978 dictionary_data_alignment: u64,
979 num_dictionary_items: u64,
980}
981
982#[derive(Debug)]
984struct MiniBlockRepIndexBlock {
985 first_row: u64,
989 starts_including_trailer: u64,
992 has_preamble: bool,
994 has_trailer: bool,
996}
997
998impl DeepSizeOf for MiniBlockRepIndexBlock {
999 fn deep_size_of_children(&self, _context: &mut Context) -> usize {
1000 0
1001 }
1002}
1003
1004#[derive(Debug)]
1009struct MiniBlockRepIndex {
1010 blocks: Vec<MiniBlockRepIndexBlock>,
1011}
1012
1013impl DeepSizeOf for MiniBlockRepIndex {
1014 fn deep_size_of_children(&self, context: &mut Context) -> usize {
1015 self.blocks.deep_size_of_children(context)
1016 }
1017}
1018
1019impl MiniBlockRepIndex {
1020 fn decode(rep_index: &[Vec<u64>]) -> Self {
1021 let mut chunk_has_preamble = false;
1022 let mut offset = 0;
1023 let mut blocks = Vec::with_capacity(rep_index.len());
1024 for chunk_rep in rep_index {
1025 let ends_count = chunk_rep[0];
1026 let partial_count = chunk_rep[1];
1027
1028 let chunk_has_trailer = partial_count > 0;
1029 let mut starts_including_trailer = ends_count;
1030 if chunk_has_trailer {
1031 starts_including_trailer += 1;
1032 }
1033 if chunk_has_preamble {
1034 starts_including_trailer -= 1;
1035 }
1036
1037 blocks.push(MiniBlockRepIndexBlock {
1038 first_row: offset,
1039 starts_including_trailer,
1040 has_preamble: chunk_has_preamble,
1041 has_trailer: chunk_has_trailer,
1042 });
1043
1044 chunk_has_preamble = chunk_has_trailer;
1045 offset += starts_including_trailer;
1046 }
1047
1048 Self { blocks }
1049 }
1050}
1051
1052#[derive(Debug)]
1054struct MiniBlockCacheableState {
1055 chunk_meta: Vec<ChunkMeta>,
1057 rep_index: MiniBlockRepIndex,
1059 dictionary: Option<Arc<DataBlock>>,
1061}
1062
1063impl DeepSizeOf for MiniBlockCacheableState {
1064 fn deep_size_of_children(&self, context: &mut Context) -> usize {
1065 self.rep_index.deep_size_of_children(context)
1066 + self
1067 .dictionary
1068 .as_ref()
1069 .map(|dict| dict.data_size() as usize)
1070 .unwrap_or(0)
1071 }
1072}
1073
1074impl CachedPageData for MiniBlockCacheableState {
1075 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
1076 self
1077 }
1078}
1079
1080#[derive(Debug)]
1107pub struct MiniBlockScheduler {
1108 buffer_offsets_and_sizes: Vec<(u64, u64)>,
1110 priority: u64,
1111 items_in_page: u64,
1112 repetition_index_depth: u16,
1113 num_buffers: u64,
1114 rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
1115 def_decompressor: Option<Arc<dyn BlockDecompressor>>,
1116 value_decompressor: Arc<dyn MiniBlockDecompressor>,
1117 def_meaning: Arc<[DefinitionInterpretation]>,
1118 dictionary: Option<MiniBlockSchedulerDictionary>,
1119 page_meta: Option<Arc<MiniBlockCacheableState>>,
1121}
1122
1123impl MiniBlockScheduler {
1124 fn try_new(
1125 buffer_offsets_and_sizes: &[(u64, u64)],
1126 priority: u64,
1127 items_in_page: u64,
1128 layout: &pb::MiniBlockLayout,
1129 decompressors: &dyn DecompressionStrategy,
1130 ) -> Result<Self> {
1131 let rep_decompressor = layout
1132 .rep_compression
1133 .as_ref()
1134 .map(|rep_compression| {
1135 decompressors
1136 .create_block_decompressor(rep_compression)
1137 .map(Arc::from)
1138 })
1139 .transpose()?;
1140 let def_decompressor = layout
1141 .def_compression
1142 .as_ref()
1143 .map(|def_compression| {
1144 decompressors
1145 .create_block_decompressor(def_compression)
1146 .map(Arc::from)
1147 })
1148 .transpose()?;
1149 let def_meaning = layout
1150 .layers
1151 .iter()
1152 .map(|l| ProtobufUtils::repdef_layer_to_def_interp(*l))
1153 .collect::<Vec<_>>();
1154 let value_decompressor = decompressors.create_miniblock_decompressor(
1155 layout.value_compression.as_ref().unwrap(),
1156 decompressors,
1157 )?;
1158
1159 let dictionary = if let Some(dictionary_encoding) = layout.dictionary.as_ref() {
1160 let num_dictionary_items = layout.num_dictionary_items;
1161 match dictionary_encoding.array_encoding.as_ref().unwrap() {
1162 pb::array_encoding::ArrayEncoding::Variable(_) => {
1163 Some(MiniBlockSchedulerDictionary {
1164 dictionary_decompressor: decompressors
1165 .create_block_decompressor(dictionary_encoding)?
1166 .into(),
1167 dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1168 dictionary_data_alignment: 4,
1169 num_dictionary_items,
1170 })
1171 }
1172 pb::array_encoding::ArrayEncoding::Flat(_) => Some(MiniBlockSchedulerDictionary {
1173 dictionary_decompressor: decompressors
1174 .create_block_decompressor(dictionary_encoding)?
1175 .into(),
1176 dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1177 dictionary_data_alignment: 16,
1178 num_dictionary_items,
1179 }),
1180 _ => {
1181 unreachable!("Currently only encodings `BinaryBlock` and `Flat` used for encoding MiniBlock dictionary.")
1182 }
1183 }
1184 } else {
1185 None
1186 };
1187
1188 Ok(Self {
1189 buffer_offsets_and_sizes: buffer_offsets_and_sizes.to_vec(),
1190 rep_decompressor,
1191 def_decompressor,
1192 value_decompressor: value_decompressor.into(),
1193 repetition_index_depth: layout.repetition_index_depth as u16,
1194 num_buffers: layout.num_buffers,
1195 priority,
1196 items_in_page,
1197 dictionary,
1198 def_meaning: def_meaning.into(),
1199 page_meta: None,
1200 })
1201 }
1202
1203 fn lookup_chunks(&self, chunk_indices: &[usize]) -> Vec<LoadedChunk> {
1204 let page_meta = self.page_meta.as_ref().unwrap();
1205 chunk_indices
1206 .iter()
1207 .map(|&chunk_idx| {
1208 let chunk_meta = &page_meta.chunk_meta[chunk_idx];
1209 let bytes_start = chunk_meta.offset_bytes;
1210 let bytes_end = bytes_start + chunk_meta.chunk_size_bytes;
1211 LoadedChunk {
1212 byte_range: bytes_start..bytes_end,
1213 items_in_chunk: chunk_meta.num_values,
1214 chunk_idx,
1215 data: LanceBuffer::empty(),
1216 }
1217 })
1218 .collect()
1219 }
1220}
1221
1222#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1223enum PreambleAction {
1224 Take,
1225 Skip,
1226 Absent,
1227}
1228
1229#[derive(Clone, Debug, PartialEq, Eq)]
1235struct ChunkInstructions {
1236 chunk_idx: usize,
1238 preamble: PreambleAction,
1244 rows_to_skip: u64,
1248 rows_to_take: u64,
1250 take_trailer: bool,
1258}
1259
1260#[derive(Debug, PartialEq, Eq)]
1278struct ChunkDrainInstructions {
1279 chunk_instructions: ChunkInstructions,
1280 rows_to_skip: u64,
1281 rows_to_take: u64,
1282 preamble_action: PreambleAction,
1283}
1284
1285impl ChunkInstructions {
1286 fn schedule_instructions(
1292 rep_index: &MiniBlockRepIndex,
1293 user_ranges: &[Range<u64>],
1294 ) -> Vec<Self> {
1295 let mut chunk_instructions = Vec::with_capacity(user_ranges.len());
1299
1300 for user_range in user_ranges {
1301 let mut rows_needed = user_range.end - user_range.start;
1302 let mut need_preamble = false;
1303
1304 let mut block_index = match rep_index
1307 .blocks
1308 .binary_search_by_key(&user_range.start, |block| block.first_row)
1309 {
1310 Ok(idx) => {
1311 let mut idx = idx;
1314 while idx > 0 && rep_index.blocks[idx - 1].first_row == user_range.start {
1315 idx -= 1;
1316 }
1317 idx
1318 }
1319 Err(idx) => idx - 1,
1321 };
1322
1323 let mut to_skip = user_range.start - rep_index.blocks[block_index].first_row;
1324
1325 while rows_needed > 0 || need_preamble {
1326 let chunk = &rep_index.blocks[block_index];
1327 let rows_avail = chunk.starts_including_trailer - to_skip;
1328 debug_assert!(rows_avail > 0);
1329
1330 let rows_to_take = rows_avail.min(rows_needed);
1331 rows_needed -= rows_to_take;
1332
1333 let mut take_trailer = false;
1334 let preamble = if chunk.has_preamble {
1335 if need_preamble {
1336 PreambleAction::Take
1337 } else {
1338 PreambleAction::Skip
1339 }
1340 } else {
1341 PreambleAction::Absent
1342 };
1343 let mut rows_to_take_no_trailer = rows_to_take;
1344
1345 if rows_to_take == rows_avail && chunk.has_trailer {
1347 take_trailer = true;
1348 need_preamble = true;
1349 rows_to_take_no_trailer -= 1;
1350 } else {
1351 need_preamble = false;
1352 };
1353
1354 chunk_instructions.push(Self {
1355 preamble,
1356 chunk_idx: block_index,
1357 rows_to_skip: to_skip,
1358 rows_to_take: rows_to_take_no_trailer,
1359 take_trailer,
1360 });
1361
1362 to_skip = 0;
1363 block_index += 1;
1364 }
1365 }
1366
1367 if user_ranges.len() > 1 {
1371 let mut merged_instructions = Vec::with_capacity(chunk_instructions.len());
1373 let mut instructions_iter = chunk_instructions.into_iter();
1374 merged_instructions.push(instructions_iter.next().unwrap());
1375 for instruction in instructions_iter {
1376 let last = merged_instructions.last_mut().unwrap();
1377 if last.chunk_idx == instruction.chunk_idx
1378 && last.rows_to_take + last.rows_to_skip == instruction.rows_to_skip
1379 {
1380 last.rows_to_take += instruction.rows_to_take;
1381 last.take_trailer |= instruction.take_trailer;
1382 } else {
1383 merged_instructions.push(instruction);
1384 }
1385 }
1386 merged_instructions
1387 } else {
1388 chunk_instructions
1389 }
1390 }
1391
1392 fn drain_from_instruction(
1393 &self,
1394 rows_desired: &mut u64,
1395 need_preamble: &mut bool,
1396 skip_in_chunk: &mut u64,
1397 ) -> (ChunkDrainInstructions, bool) {
1398 debug_assert!(!*need_preamble || *skip_in_chunk == 0);
1400 let mut rows_avail = self.rows_to_take - *skip_in_chunk;
1401 let has_preamble = self.preamble != PreambleAction::Absent;
1402 let preamble_action = match (*need_preamble, has_preamble) {
1403 (true, true) => PreambleAction::Take,
1404 (true, false) => panic!("Need preamble but there isn't one"),
1405 (false, true) => PreambleAction::Skip,
1406 (false, false) => PreambleAction::Absent,
1407 };
1408
1409 if self.take_trailer {
1411 rows_avail += 1;
1412 }
1413
1414 let rows_taking = if *rows_desired >= rows_avail {
1417 *need_preamble = self.take_trailer;
1420 rows_avail
1421 } else {
1422 *need_preamble = false;
1425 *rows_desired
1426 };
1427 let rows_skipped = *skip_in_chunk;
1428
1429 let consumed_chunk = if *rows_desired >= rows_avail {
1431 *rows_desired -= rows_avail;
1432 *skip_in_chunk = 0;
1433 true
1434 } else {
1435 *skip_in_chunk += *rows_desired;
1436 *rows_desired = 0;
1437 false
1438 };
1439
1440 (
1441 ChunkDrainInstructions {
1442 chunk_instructions: self.clone(),
1443 rows_to_skip: rows_skipped,
1444 rows_to_take: rows_taking,
1445 preamble_action,
1446 },
1447 consumed_chunk,
1448 )
1449 }
1450}
1451
1452impl StructuralPageScheduler for MiniBlockScheduler {
1453 fn initialize<'a>(
1454 &'a mut self,
1455 io: &Arc<dyn EncodingsIo>,
1456 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
1457 let (meta_buf_position, meta_buf_size) = self.buffer_offsets_and_sizes[0];
1461 let value_buf_position = self.buffer_offsets_and_sizes[1].0;
1462 let mut bufs_needed = 1;
1463 if self.dictionary.is_some() {
1464 bufs_needed += 1;
1465 }
1466 if self.repetition_index_depth > 0 {
1467 bufs_needed += 1;
1468 }
1469 let mut required_ranges = Vec::with_capacity(bufs_needed);
1470 required_ranges.push(meta_buf_position..meta_buf_position + meta_buf_size);
1471 if let Some(ref dictionary) = self.dictionary {
1472 required_ranges.push(
1473 dictionary.dictionary_buf_position_and_size.0
1474 ..dictionary.dictionary_buf_position_and_size.0
1475 + dictionary.dictionary_buf_position_and_size.1,
1476 );
1477 }
1478 if self.repetition_index_depth > 0 {
1479 let (rep_index_pos, rep_index_size) = self.buffer_offsets_and_sizes.last().unwrap();
1480 required_ranges.push(*rep_index_pos..*rep_index_pos + *rep_index_size);
1481 }
1482 let io_req = io.submit_request(required_ranges, 0);
1483
1484 async move {
1485 let mut buffers = io_req.await?.into_iter().fuse();
1486 let meta_bytes = buffers.next().unwrap();
1487 let dictionary_bytes = self.dictionary.as_ref().and_then(|_| buffers.next());
1488 let rep_index_bytes = buffers.next();
1489
1490 assert!(meta_bytes.len() % 2 == 0);
1492 let mut bytes = LanceBuffer::from_bytes(meta_bytes, 2);
1493 let words = bytes.borrow_to_typed_slice::<u16>();
1494 let words = words.as_ref();
1495
1496 let mut chunk_meta = Vec::with_capacity(words.len());
1497
1498 let mut rows_counter = 0;
1499 let mut offset_bytes = value_buf_position;
1500 for (word_idx, word) in words.iter().enumerate() {
1501 let log_num_values = word & 0x0F;
1502 let divided_bytes = word >> 4;
1503 let num_bytes = (divided_bytes as usize + 1) * MINIBLOCK_ALIGNMENT;
1504 debug_assert!(num_bytes > 0);
1505 let num_values = if word_idx < words.len() - 1 {
1506 debug_assert!(log_num_values > 0);
1507 1 << log_num_values
1508 } else {
1509 debug_assert!(
1510 log_num_values == 0
1511 || (1 << log_num_values) == (self.items_in_page - rows_counter)
1512 );
1513 self.items_in_page - rows_counter
1514 };
1515 rows_counter += num_values;
1516
1517 chunk_meta.push(ChunkMeta {
1518 num_values,
1519 chunk_size_bytes: num_bytes as u64,
1520 offset_bytes,
1521 });
1522 offset_bytes += num_bytes as u64;
1523 }
1524
1525 let rep_index = if let Some(rep_index_data) = rep_index_bytes {
1527 assert!(rep_index_data.len() % 8 == 0);
1530 let mut repetition_index_vals = LanceBuffer::from_bytes(rep_index_data, 8);
1531 let repetition_index_vals = repetition_index_vals.borrow_to_typed_slice::<u64>();
1532 repetition_index_vals
1534 .as_ref()
1535 .chunks_exact(self.repetition_index_depth as usize + 1)
1536 .map(|c| c.to_vec())
1537 .collect::<Vec<_>>()
1538 } else {
1539 chunk_meta
1542 .iter()
1543 .map(|c| vec![c.num_values, 0])
1544 .collect::<Vec<_>>()
1545 };
1546
1547 let mut page_meta = MiniBlockCacheableState {
1548 chunk_meta,
1549 rep_index: MiniBlockRepIndex::decode(&rep_index),
1550 dictionary: None,
1551 };
1552
1553 if let Some(ref mut dictionary) = self.dictionary {
1555 let dictionary_data = dictionary_bytes.unwrap();
1556 page_meta.dictionary =
1557 Some(Arc::new(dictionary.dictionary_decompressor.decompress(
1558 LanceBuffer::from_bytes(
1559 dictionary_data,
1560 dictionary.dictionary_data_alignment,
1561 ),
1562 dictionary.num_dictionary_items,
1563 )?));
1564 };
1565 let page_meta = Arc::new(page_meta);
1566 self.page_meta = Some(page_meta.clone());
1567 Ok(page_meta as Arc<dyn CachedPageData>)
1568 }
1569 .boxed()
1570 }
1571
1572 fn load(&mut self, data: &Arc<dyn CachedPageData>) {
1573 self.page_meta = Some(
1574 data.clone()
1575 .as_arc_any()
1576 .downcast::<MiniBlockCacheableState>()
1577 .unwrap(),
1578 );
1579 }
1580
1581 fn schedule_ranges(
1582 &self,
1583 ranges: &[Range<u64>],
1584 io: &Arc<dyn EncodingsIo>,
1585 ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
1586 let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
1587
1588 let page_meta = self.page_meta.as_ref().unwrap();
1589
1590 let chunk_instructions =
1591 ChunkInstructions::schedule_instructions(&page_meta.rep_index, ranges);
1592
1593 debug_assert_eq!(
1594 num_rows,
1595 chunk_instructions
1596 .iter()
1597 .map(|ci| {
1598 let taken = ci.rows_to_take;
1599 if ci.take_trailer {
1600 taken + 1
1601 } else {
1602 taken
1603 }
1604 })
1605 .sum::<u64>()
1606 );
1607
1608 let chunks_needed = chunk_instructions
1609 .iter()
1610 .map(|ci| ci.chunk_idx)
1611 .unique()
1612 .collect::<Vec<_>>();
1613 let mut loaded_chunks = self.lookup_chunks(&chunks_needed);
1614 let chunk_ranges = loaded_chunks
1615 .iter()
1616 .map(|c| c.byte_range.clone())
1617 .collect::<Vec<_>>();
1618 let loaded_chunk_data = io.submit_request(chunk_ranges, self.priority);
1619
1620 let rep_decompressor = self.rep_decompressor.clone();
1621 let def_decompressor = self.def_decompressor.clone();
1622 let value_decompressor = self.value_decompressor.clone();
1623 let num_buffers = self.num_buffers;
1624 let dictionary = page_meta
1625 .dictionary
1626 .as_ref()
1627 .map(|dictionary| dictionary.clone());
1628 let def_meaning = self.def_meaning.clone();
1629
1630 let res = async move {
1631 let loaded_chunk_data = loaded_chunk_data.await?;
1632 for (loaded_chunk, chunk_data) in loaded_chunks.iter_mut().zip(loaded_chunk_data) {
1633 loaded_chunk.data = LanceBuffer::from_bytes(chunk_data, 1);
1634 }
1635
1636 Ok(Box::new(MiniBlockDecoder {
1637 rep_decompressor,
1638 def_decompressor,
1639 value_decompressor,
1640 def_meaning,
1641 loaded_chunks: VecDeque::from_iter(loaded_chunks),
1642 instructions: VecDeque::from(chunk_instructions),
1643 offset_in_current_chunk: 0,
1644 dictionary,
1645 num_rows,
1646 num_buffers,
1647 }) as Box<dyn StructuralPageDecoder>)
1648 }
1649 .boxed();
1650 Ok(res)
1651 }
1652}
1653
1654#[derive(Debug, Clone, Copy)]
1655struct FullZipRepIndexDetails {
1656 buf_position: u64,
1657 bytes_per_value: u64, }
1659
1660#[derive(Debug)]
1661enum PerValueDecompressor {
1662 Fixed(Arc<dyn FixedPerValueDecompressor>),
1663 Variable(Arc<dyn VariablePerValueDecompressor>),
1664}
1665
1666#[derive(Debug)]
1667struct FullZipDecodeDetails {
1668 value_decompressor: PerValueDecompressor,
1669 def_meaning: Arc<[DefinitionInterpretation]>,
1670 ctrl_word_parser: ControlWordParser,
1671 max_rep: u16,
1672 max_visible_def: u16,
1673}
1674
1675#[derive(Debug)]
1683pub struct FullZipScheduler {
1684 data_buf_position: u64,
1685 rep_index: Option<FullZipRepIndexDetails>,
1686 priority: u64,
1687 rows_in_page: u64,
1688 bits_per_offset: u8,
1689 details: Arc<FullZipDecodeDetails>,
1690 cached_state: Option<Arc<FullZipCacheableState>>,
1692 enable_cache: bool,
1694}
1695
1696impl FullZipScheduler {
1697 fn try_new(
1698 buffer_offsets_and_sizes: &[(u64, u64)],
1699 priority: u64,
1700 rows_in_page: u64,
1701 layout: &pb::FullZipLayout,
1702 decompressors: &dyn DecompressionStrategy,
1703 ) -> Result<Self> {
1704 let (data_buf_position, _) = buffer_offsets_and_sizes[0];
1708 let rep_index = buffer_offsets_and_sizes.get(1).map(|(pos, len)| {
1709 let num_reps = rows_in_page + 1;
1710 let bytes_per_rep = len / num_reps;
1711 debug_assert_eq!(len % num_reps, 0);
1712 debug_assert!(
1713 bytes_per_rep == 1
1714 || bytes_per_rep == 2
1715 || bytes_per_rep == 4
1716 || bytes_per_rep == 8
1717 );
1718 FullZipRepIndexDetails {
1719 buf_position: *pos,
1720 bytes_per_value: bytes_per_rep,
1721 }
1722 });
1723
1724 let value_decompressor = match layout.details {
1725 Some(pb::full_zip_layout::Details::BitsPerValue(_)) => {
1726 let decompressor = decompressors.create_fixed_per_value_decompressor(
1727 layout.value_compression.as_ref().unwrap(),
1728 )?;
1729 PerValueDecompressor::Fixed(decompressor.into())
1730 }
1731 Some(pb::full_zip_layout::Details::BitsPerOffset(_)) => {
1732 let decompressor = decompressors.create_variable_per_value_decompressor(
1733 layout.value_compression.as_ref().unwrap(),
1734 )?;
1735 PerValueDecompressor::Variable(decompressor.into())
1736 }
1737 None => {
1738 panic!("Full-zip layout must have a `details` field");
1739 }
1740 };
1741 let ctrl_word_parser = ControlWordParser::new(
1742 layout.bits_rep.try_into().unwrap(),
1743 layout.bits_def.try_into().unwrap(),
1744 );
1745 let def_meaning = layout
1746 .layers
1747 .iter()
1748 .map(|l| ProtobufUtils::repdef_layer_to_def_interp(*l))
1749 .collect::<Vec<_>>();
1750
1751 let max_rep = def_meaning.iter().filter(|d| d.is_list()).count() as u16;
1752 let max_visible_def = def_meaning
1753 .iter()
1754 .filter(|d| !d.is_list())
1755 .map(|d| d.num_def_levels())
1756 .sum();
1757
1758 let bits_per_offset = match layout.details {
1759 Some(pb::full_zip_layout::Details::BitsPerValue(_)) => 32,
1760 Some(pb::full_zip_layout::Details::BitsPerOffset(bits_per_offset)) => {
1761 bits_per_offset as u8
1762 }
1763 None => panic!("Full-zip layout must have a `details` field"),
1764 };
1765
1766 let details = Arc::new(FullZipDecodeDetails {
1767 value_decompressor,
1768 def_meaning: def_meaning.into(),
1769 ctrl_word_parser,
1770 max_rep,
1771 max_visible_def,
1772 });
1773 Ok(Self {
1774 data_buf_position,
1775 rep_index,
1776 details,
1777 priority,
1778 rows_in_page,
1779 bits_per_offset,
1780 cached_state: None,
1781 enable_cache: false, })
1783 }
1784
1785 fn create_decoder(
1787 details: Arc<FullZipDecodeDetails>,
1788 data: VecDeque<LanceBuffer>,
1789 num_rows: u64,
1790 bits_per_offset: u8,
1791 ) -> Result<Box<dyn StructuralPageDecoder>> {
1792 match &details.value_decompressor {
1793 PerValueDecompressor::Fixed(decompressor) => {
1794 let bits_per_value = decompressor.bits_per_value();
1795 if bits_per_value == 0 {
1796 return Err(lance_core::Error::Internal {
1797 message: "Invalid encoding: bits_per_value must be greater than 0".into(),
1798 location: location!(),
1799 });
1800 }
1801 if bits_per_value % 8 != 0 {
1802 return Err(lance_core::Error::NotSupported {
1803 source: "Bit-packed full-zip encoding (non-byte-aligned values) is not yet implemented".into(),
1804 location: location!(),
1805 });
1806 }
1807 let bytes_per_value = bits_per_value / 8;
1808 let total_bytes_per_value =
1809 bytes_per_value as usize + details.ctrl_word_parser.bytes_per_word();
1810 Ok(Box::new(FixedFullZipDecoder {
1811 details,
1812 data,
1813 num_rows,
1814 offset_in_current: 0,
1815 bytes_per_value: bytes_per_value as usize,
1816 total_bytes_per_value,
1817 }) as Box<dyn StructuralPageDecoder>)
1818 }
1819 PerValueDecompressor::Variable(_decompressor) => {
1820 Ok(Box::new(VariableFullZipDecoder::new(
1821 details,
1822 data,
1823 num_rows,
1824 bits_per_offset,
1825 bits_per_offset,
1826 )))
1827 }
1828 }
1829 }
1830
1831 fn extract_byte_ranges_from_pairs(
1834 buffer: LanceBuffer,
1835 bytes_per_value: u64,
1836 data_buf_position: u64,
1837 ) -> Vec<Range<u64>> {
1838 ByteUnpacker::new(buffer, bytes_per_value as usize)
1839 .chunks(2)
1840 .into_iter()
1841 .map(|mut c| {
1842 let start = c.next().unwrap() + data_buf_position;
1843 let end = c.next().unwrap() + data_buf_position;
1844 start..end
1845 })
1846 .collect::<Vec<_>>()
1847 }
1848
1849 fn extract_byte_ranges_from_cached(
1852 buffer: &LanceBuffer,
1853 ranges: &[Range<u64>],
1854 bytes_per_value: u64,
1855 data_buf_position: u64,
1856 ) -> Vec<Range<u64>> {
1857 ranges
1858 .iter()
1859 .map(|r| {
1860 let start_offset = (r.start * bytes_per_value) as usize;
1861 let end_offset = (r.end * bytes_per_value) as usize;
1862
1863 let start_slice = &buffer[start_offset..start_offset + bytes_per_value as usize];
1864 let start_val =
1865 ByteUnpacker::new(start_slice.iter().copied(), bytes_per_value as usize)
1866 .next()
1867 .unwrap();
1868
1869 let end_slice = &buffer[end_offset..end_offset + bytes_per_value as usize];
1870 let end_val =
1871 ByteUnpacker::new(end_slice.iter().copied(), bytes_per_value as usize)
1872 .next()
1873 .unwrap();
1874
1875 (data_buf_position + start_val)..(data_buf_position + end_val)
1876 })
1877 .collect()
1878 }
1879
1880 fn compute_rep_index_ranges(
1882 ranges: &[Range<u64>],
1883 rep_index: &FullZipRepIndexDetails,
1884 ) -> Vec<Range<u64>> {
1885 ranges
1886 .iter()
1887 .flat_map(|r| {
1888 let first_val_start =
1889 rep_index.buf_position + (r.start * rep_index.bytes_per_value);
1890 let first_val_end = first_val_start + rep_index.bytes_per_value;
1891 let last_val_start = rep_index.buf_position + (r.end * rep_index.bytes_per_value);
1892 let last_val_end = last_val_start + rep_index.bytes_per_value;
1893 [first_val_start..first_val_end, last_val_start..last_val_end]
1894 })
1895 .collect()
1896 }
1897
1898 async fn resolve_byte_ranges(
1900 data_buf_position: u64,
1901 ranges: &[Range<u64>],
1902 io: &Arc<dyn EncodingsIo>,
1903 rep_index: &FullZipRepIndexDetails,
1904 cached_state: Option<&Arc<FullZipCacheableState>>,
1905 priority: u64,
1906 ) -> Result<Vec<Range<u64>>> {
1907 if let Some(cached_state) = cached_state {
1908 Ok(Self::extract_byte_ranges_from_cached(
1910 &cached_state.rep_index_buffer,
1911 ranges,
1912 rep_index.bytes_per_value,
1913 data_buf_position,
1914 ))
1915 } else {
1916 let rep_ranges = Self::compute_rep_index_ranges(ranges, rep_index);
1918 let rep_data = io.submit_request(rep_ranges, priority).await?;
1919 let rep_buffer = LanceBuffer::concat(
1920 &rep_data
1921 .into_iter()
1922 .map(|d| LanceBuffer::from_bytes(d, 1))
1923 .collect::<Vec<_>>(),
1924 );
1925 Ok(Self::extract_byte_ranges_from_pairs(
1926 rep_buffer,
1927 rep_index.bytes_per_value,
1928 data_buf_position,
1929 ))
1930 }
1931 }
1932
1933 fn schedule_ranges_rep(
1935 &self,
1936 ranges: &[Range<u64>],
1937 io: &Arc<dyn EncodingsIo>,
1938 rep_index: FullZipRepIndexDetails,
1939 ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
1940 let data_buf_position = self.data_buf_position;
1942 let cached_state = self.cached_state.clone();
1943 let priority = self.priority;
1944 let details = self.details.clone();
1945 let bits_per_offset = self.bits_per_offset;
1946 let ranges = ranges.to_vec();
1947 let io_clone = io.clone();
1948
1949 Ok(async move {
1950 let byte_ranges = Self::resolve_byte_ranges(
1952 data_buf_position,
1953 &ranges,
1954 &io_clone,
1955 &rep_index,
1956 cached_state.as_ref(),
1957 priority,
1958 )
1959 .await?;
1960
1961 let data = io_clone.submit_request(byte_ranges, priority).await?;
1963 let data = data
1964 .into_iter()
1965 .map(|d| LanceBuffer::from_bytes(d, 1))
1966 .collect::<VecDeque<_>>();
1967
1968 let num_rows: u64 = ranges.iter().map(|r| r.end - r.start).sum();
1970
1971 Self::create_decoder(details, data, num_rows, bits_per_offset)
1973 }
1974 .boxed())
1975 }
1976
1977 fn schedule_ranges_simple(
1981 &self,
1982 ranges: &[Range<u64>],
1983 io: &dyn EncodingsIo,
1984 ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
1985 let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
1987
1988 let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor else {
1989 unreachable!()
1990 };
1991
1992 let bits_per_value = decompressor.bits_per_value();
1994 assert_eq!(bits_per_value % 8, 0);
1995 let bytes_per_value = bits_per_value / 8;
1996 let bytes_per_cw = self.details.ctrl_word_parser.bytes_per_word();
1997 let total_bytes_per_value = bytes_per_value + bytes_per_cw as u64;
1998 let byte_ranges = ranges.iter().map(|r| {
1999 debug_assert!(r.end <= self.rows_in_page);
2000 let start = self.data_buf_position + r.start * total_bytes_per_value;
2001 let end = self.data_buf_position + r.end * total_bytes_per_value;
2002 start..end
2003 });
2004
2005 let data = io.submit_request(byte_ranges.collect(), self.priority);
2007
2008 let details = self.details.clone();
2009
2010 Ok(async move {
2011 let data = data.await?;
2012 let data = data
2013 .into_iter()
2014 .map(|d| LanceBuffer::from_bytes(d, 1))
2015 .collect();
2016 Ok(Box::new(FixedFullZipDecoder {
2017 details,
2018 data,
2019 num_rows,
2020 offset_in_current: 0,
2021 bytes_per_value: bytes_per_value as usize,
2022 total_bytes_per_value: total_bytes_per_value as usize,
2023 }) as Box<dyn StructuralPageDecoder>)
2024 }
2025 .boxed())
2026 }
2027}
2028
2029#[derive(Debug)]
2031struct FullZipCacheableState {
2032 rep_index_buffer: LanceBuffer,
2034}
2035
2036impl DeepSizeOf for FullZipCacheableState {
2037 fn deep_size_of_children(&self, _context: &mut Context) -> usize {
2038 self.rep_index_buffer.len()
2039 }
2040}
2041
2042impl CachedPageData for FullZipCacheableState {
2043 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
2044 self
2045 }
2046}
2047
2048impl StructuralPageScheduler for FullZipScheduler {
2049 fn initialize<'a>(
2052 &'a mut self,
2053 io: &Arc<dyn EncodingsIo>,
2054 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
2055 if self.enable_cache && self.rep_index.is_some() {
2057 let rep_index = self.rep_index.as_ref().unwrap();
2058 let total_size = (self.rows_in_page + 1) * rep_index.bytes_per_value;
2060 let rep_index_range = rep_index.buf_position..(rep_index.buf_position + total_size);
2061
2062 let io_clone = io.clone();
2064 let future = async move {
2065 let rep_index_data = io_clone.submit_request(vec![rep_index_range], 0).await?;
2066 let rep_index_buffer = LanceBuffer::from_bytes(rep_index_data[0].clone(), 1);
2067
2068 Ok(Arc::new(FullZipCacheableState { rep_index_buffer }) as Arc<dyn CachedPageData>)
2070 };
2071
2072 future.boxed()
2073 } else {
2074 std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
2076 }
2077 }
2078
2079 fn load(&mut self, cache: &Arc<dyn CachedPageData>) {
2083 if let Ok(cached_state) = cache
2085 .clone()
2086 .as_arc_any()
2087 .downcast::<FullZipCacheableState>()
2088 {
2089 self.cached_state = Some(cached_state);
2091 }
2092 }
2093
2094 fn schedule_ranges(
2095 &self,
2096 ranges: &[Range<u64>],
2097 io: &Arc<dyn EncodingsIo>,
2098 ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
2099 if let Some(rep_index) = self.rep_index {
2100 self.schedule_ranges_rep(ranges, io, rep_index)
2101 } else {
2102 self.schedule_ranges_simple(ranges, io.as_ref())
2103 }
2104 }
2105}
2106
2107#[derive(Debug)]
2115struct FixedFullZipDecoder {
2116 details: Arc<FullZipDecodeDetails>,
2117 data: VecDeque<LanceBuffer>,
2118 offset_in_current: usize,
2119 bytes_per_value: usize,
2120 total_bytes_per_value: usize,
2121 num_rows: u64,
2122}
2123
2124impl FixedFullZipDecoder {
2125 fn slice_next_task(&mut self, num_rows: u64) -> FullZipDecodeTaskItem {
2126 debug_assert!(num_rows > 0);
2127 let cur_buf = self.data.front_mut().unwrap();
2128 let start = self.offset_in_current;
2129 if self.details.ctrl_word_parser.has_rep() {
2130 let mut rows_started = 0;
2133 let mut num_items = 0;
2136 while self.offset_in_current < cur_buf.len() {
2137 let control = self.details.ctrl_word_parser.parse_desc(
2138 &cur_buf[self.offset_in_current..],
2139 self.details.max_rep,
2140 self.details.max_visible_def,
2141 );
2142 if control.is_new_row {
2143 if rows_started == num_rows {
2144 break;
2145 }
2146 rows_started += 1;
2147 }
2148 num_items += 1;
2149 if control.is_visible {
2150 self.offset_in_current += self.total_bytes_per_value;
2151 } else {
2152 self.offset_in_current += self.details.ctrl_word_parser.bytes_per_word();
2153 }
2154 }
2155
2156 let task_slice = cur_buf.slice_with_length(start, self.offset_in_current - start);
2157 if self.offset_in_current == cur_buf.len() {
2158 self.data.pop_front();
2159 self.offset_in_current = 0;
2160 }
2161
2162 FullZipDecodeTaskItem {
2163 data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2164 data: task_slice,
2165 bits_per_value: self.bytes_per_value as u64 * 8,
2166 num_values: num_items,
2167 block_info: BlockInfo::new(),
2168 }),
2169 rows_in_buf: rows_started,
2170 }
2171 } else {
2172 let cur_buf = self.data.front_mut().unwrap();
2175 let bytes_avail = cur_buf.len() - self.offset_in_current;
2176 let offset_in_cur = self.offset_in_current;
2177
2178 let bytes_needed = num_rows as usize * self.total_bytes_per_value;
2179 let mut rows_taken = num_rows;
2180 let task_slice = if bytes_needed >= bytes_avail {
2181 self.offset_in_current = 0;
2182 rows_taken = bytes_avail as u64 / self.total_bytes_per_value as u64;
2183 self.data
2184 .pop_front()
2185 .unwrap()
2186 .slice_with_length(offset_in_cur, bytes_avail)
2187 } else {
2188 self.offset_in_current += bytes_needed;
2189 cur_buf.slice_with_length(offset_in_cur, bytes_needed)
2190 };
2191 FullZipDecodeTaskItem {
2192 data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2193 data: task_slice,
2194 bits_per_value: self.bytes_per_value as u64 * 8,
2195 num_values: rows_taken,
2196 block_info: BlockInfo::new(),
2197 }),
2198 rows_in_buf: rows_taken,
2199 }
2200 }
2201 }
2202}
2203
2204impl StructuralPageDecoder for FixedFullZipDecoder {
2205 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2206 let mut task_data = Vec::with_capacity(self.data.len());
2207 let mut remaining = num_rows;
2208 while remaining > 0 {
2209 let task_item = self.slice_next_task(remaining);
2210 remaining -= task_item.rows_in_buf;
2211 task_data.push(task_item);
2212 }
2213 Ok(Box::new(FixedFullZipDecodeTask {
2214 details: self.details.clone(),
2215 data: task_data,
2216 bytes_per_value: self.bytes_per_value,
2217 num_rows: num_rows as usize,
2218 }))
2219 }
2220
2221 fn num_rows(&self) -> u64 {
2222 self.num_rows
2223 }
2224}
2225
2226#[derive(Debug)]
2231struct VariableFullZipDecoder {
2232 details: Arc<FullZipDecodeDetails>,
2233 decompressor: Arc<dyn VariablePerValueDecompressor>,
2234 data: LanceBuffer,
2235 offsets: LanceBuffer,
2236 rep: ScalarBuffer<u16>,
2237 def: ScalarBuffer<u16>,
2238 repdef_starts: Vec<usize>,
2239 data_starts: Vec<usize>,
2240 offset_starts: Vec<usize>,
2241 visible_item_counts: Vec<u64>,
2242 bits_per_offset: u8,
2243 current_idx: usize,
2244 num_rows: u64,
2245}
2246
2247impl VariableFullZipDecoder {
2248 fn new(
2249 details: Arc<FullZipDecodeDetails>,
2250 data: VecDeque<LanceBuffer>,
2251 num_rows: u64,
2252 in_bits_per_length: u8,
2253 out_bits_per_offset: u8,
2254 ) -> Self {
2255 let decompressor = match details.value_decompressor {
2256 PerValueDecompressor::Variable(ref d) => d.clone(),
2257 _ => unreachable!(),
2258 };
2259
2260 assert_eq!(in_bits_per_length % 8, 0);
2261 assert!(out_bits_per_offset == 32 || out_bits_per_offset == 64);
2262
2263 let mut decoder = Self {
2264 details,
2265 decompressor,
2266 data: LanceBuffer::empty(),
2267 offsets: LanceBuffer::empty(),
2268 rep: LanceBuffer::empty().borrow_to_typed_slice(),
2269 def: LanceBuffer::empty().borrow_to_typed_slice(),
2270 bits_per_offset: out_bits_per_offset,
2271 repdef_starts: Vec::with_capacity(num_rows as usize + 1),
2272 data_starts: Vec::with_capacity(num_rows as usize + 1),
2273 offset_starts: Vec::with_capacity(num_rows as usize + 1),
2274 visible_item_counts: Vec::with_capacity(num_rows as usize + 1),
2275 current_idx: 0,
2276 num_rows,
2277 };
2278
2279 decoder.unzip(data, in_bits_per_length, out_bits_per_offset, num_rows);
2300
2301 decoder
2302 }
2303
2304 unsafe fn parse_length(data: &[u8], bits_per_offset: u8) -> u64 {
2305 match bits_per_offset {
2306 8 => *data.get_unchecked(0) as u64,
2307 16 => u16::from_le_bytes([*data.get_unchecked(0), *data.get_unchecked(1)]) as u64,
2308 32 => u32::from_le_bytes([
2309 *data.get_unchecked(0),
2310 *data.get_unchecked(1),
2311 *data.get_unchecked(2),
2312 *data.get_unchecked(3),
2313 ]) as u64,
2314 64 => u64::from_le_bytes([
2315 *data.get_unchecked(0),
2316 *data.get_unchecked(1),
2317 *data.get_unchecked(2),
2318 *data.get_unchecked(3),
2319 *data.get_unchecked(4),
2320 *data.get_unchecked(5),
2321 *data.get_unchecked(6),
2322 *data.get_unchecked(7),
2323 ]),
2324 _ => unreachable!(),
2325 }
2326 }
2327
2328 fn unzip(
2329 &mut self,
2330 data: VecDeque<LanceBuffer>,
2331 in_bits_per_length: u8,
2332 out_bits_per_offset: u8,
2333 num_rows: u64,
2334 ) {
2335 let mut rep = Vec::with_capacity(num_rows as usize);
2337 let mut def = Vec::with_capacity(num_rows as usize);
2338 let bytes_cw = self.details.ctrl_word_parser.bytes_per_word() * num_rows as usize;
2339
2340 let bytes_per_offset = out_bits_per_offset as usize / 8;
2343 let bytes_offsets = bytes_per_offset * (num_rows as usize + 1);
2344 let mut offsets_data = Vec::with_capacity(bytes_offsets);
2345
2346 let bytes_per_length = in_bits_per_length as usize / 8;
2347 let bytes_lengths = bytes_per_length * num_rows as usize;
2348
2349 let bytes_data = data.iter().map(|d| d.len()).sum::<usize>();
2350 let mut unzipped_data =
2353 Vec::with_capacity((bytes_data - bytes_cw).saturating_sub(bytes_lengths));
2354
2355 let mut current_offset = 0_u64;
2356 let mut visible_item_count = 0_u64;
2357 for databuf in data.into_iter() {
2358 let mut databuf = databuf.as_ref();
2359 while !databuf.is_empty() {
2360 let data_start = unzipped_data.len();
2361 let offset_start = offsets_data.len();
2362 let repdef_start = rep.len().max(def.len());
2365 let ctrl_desc = self.details.ctrl_word_parser.parse_desc(
2367 databuf,
2368 self.details.max_rep,
2369 self.details.max_visible_def,
2370 );
2371 self.details
2372 .ctrl_word_parser
2373 .parse(databuf, &mut rep, &mut def);
2374 databuf = &databuf[self.details.ctrl_word_parser.bytes_per_word()..];
2375
2376 if ctrl_desc.is_new_row {
2377 self.repdef_starts.push(repdef_start);
2378 self.data_starts.push(data_start);
2379 self.offset_starts.push(offset_start);
2380 self.visible_item_counts.push(visible_item_count);
2381 }
2382 if ctrl_desc.is_visible {
2383 visible_item_count += 1;
2384 if ctrl_desc.is_valid_item {
2385 debug_assert!(databuf.len() >= bytes_per_length);
2387 let length = unsafe { Self::parse_length(databuf, in_bits_per_length) };
2388 match out_bits_per_offset {
2389 32 => offsets_data
2390 .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2391 64 => offsets_data.extend_from_slice(¤t_offset.to_le_bytes()),
2392 _ => unreachable!(),
2393 };
2394 databuf = &databuf[bytes_per_offset..];
2395 unzipped_data.extend_from_slice(&databuf[..length as usize]);
2396 databuf = &databuf[length as usize..];
2397 current_offset += length;
2398 } else {
2399 match out_bits_per_offset {
2401 32 => offsets_data
2402 .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2403 64 => offsets_data.extend_from_slice(¤t_offset.to_le_bytes()),
2404 _ => unreachable!(),
2405 }
2406 }
2407 }
2408 }
2409 }
2410 self.repdef_starts.push(rep.len().max(def.len()));
2411 self.data_starts.push(unzipped_data.len());
2412 self.offset_starts.push(offsets_data.len());
2413 self.visible_item_counts.push(visible_item_count);
2414 match out_bits_per_offset {
2415 32 => offsets_data.extend_from_slice(&(current_offset as u32).to_le_bytes()),
2416 64 => offsets_data.extend_from_slice(¤t_offset.to_le_bytes()),
2417 _ => unreachable!(),
2418 };
2419 self.rep = ScalarBuffer::from(rep);
2420 self.def = ScalarBuffer::from(def);
2421 self.data = LanceBuffer::Owned(unzipped_data);
2422 self.offsets = LanceBuffer::Owned(offsets_data);
2423 }
2424}
2425
2426impl StructuralPageDecoder for VariableFullZipDecoder {
2427 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2428 let start = self.current_idx;
2429 let end = start + num_rows as usize;
2430
2431 let data = self.data.borrow_and_clone();
2439
2440 let offset_start = self.offset_starts[start];
2441 let offset_end = self.offset_starts[end] + (self.bits_per_offset as usize / 8);
2442 let offsets = self
2443 .offsets
2444 .slice_with_length(offset_start, offset_end - offset_start);
2445
2446 let repdef_start = self.repdef_starts[start];
2447 let repdef_end = self.repdef_starts[end];
2448 let rep = if self.rep.is_empty() {
2449 self.rep.clone()
2450 } else {
2451 self.rep.slice(repdef_start, repdef_end - repdef_start)
2452 };
2453 let def = if self.def.is_empty() {
2454 self.def.clone()
2455 } else {
2456 self.def.slice(repdef_start, repdef_end - repdef_start)
2457 };
2458
2459 let visible_item_counts_start = self.visible_item_counts[start];
2460 let visible_item_counts_end = self.visible_item_counts[end];
2461 let num_visible_items = visible_item_counts_end - visible_item_counts_start;
2462
2463 self.current_idx += num_rows as usize;
2464
2465 Ok(Box::new(VariableFullZipDecodeTask {
2466 details: self.details.clone(),
2467 decompressor: self.decompressor.clone(),
2468 data,
2469 offsets,
2470 bits_per_offset: self.bits_per_offset,
2471 num_visible_items,
2472 rep,
2473 def,
2474 }))
2475 }
2476
2477 fn num_rows(&self) -> u64 {
2478 self.num_rows
2479 }
2480}
2481
2482#[derive(Debug)]
2483struct VariableFullZipDecodeTask {
2484 details: Arc<FullZipDecodeDetails>,
2485 decompressor: Arc<dyn VariablePerValueDecompressor>,
2486 data: LanceBuffer,
2487 offsets: LanceBuffer,
2488 bits_per_offset: u8,
2489 num_visible_items: u64,
2490 rep: ScalarBuffer<u16>,
2491 def: ScalarBuffer<u16>,
2492}
2493
2494impl DecodePageTask for VariableFullZipDecodeTask {
2495 fn decode(self: Box<Self>) -> Result<DecodedPage> {
2496 let block = VariableWidthBlock {
2497 data: self.data,
2498 offsets: self.offsets,
2499 bits_per_offset: self.bits_per_offset,
2500 num_values: self.num_visible_items,
2501 block_info: BlockInfo::new(),
2502 };
2503 let decomopressed = self.decompressor.decompress(block)?;
2504 let rep = self.rep.to_vec();
2505 let def = self.def.to_vec();
2506 let unraveler =
2507 RepDefUnraveler::new(Some(rep), Some(def), self.details.def_meaning.clone());
2508 Ok(DecodedPage {
2509 data: decomopressed,
2510 repdef: unraveler,
2511 })
2512 }
2513}
2514
2515#[derive(Debug)]
2516struct FullZipDecodeTaskItem {
2517 data: PerValueDataBlock,
2518 rows_in_buf: u64,
2519}
2520
2521#[derive(Debug)]
2524struct FixedFullZipDecodeTask {
2525 details: Arc<FullZipDecodeDetails>,
2526 data: Vec<FullZipDecodeTaskItem>,
2527 num_rows: usize,
2528 bytes_per_value: usize,
2529}
2530
2531impl DecodePageTask for FixedFullZipDecodeTask {
2532 fn decode(self: Box<Self>) -> Result<DecodedPage> {
2533 let estimated_size_bytes = self
2535 .data
2536 .iter()
2537 .map(|task_item| task_item.data.data_size() as usize)
2538 .sum::<usize>()
2539 * 2;
2540 let mut data_builder =
2541 DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
2542
2543 if self.details.ctrl_word_parser.bytes_per_word() == 0 {
2544 for task_item in self.data.into_iter() {
2548 let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
2549 unreachable!()
2550 };
2551 let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
2552 else {
2553 unreachable!()
2554 };
2555 debug_assert_eq!(fixed_data.num_values, task_item.rows_in_buf);
2556 let decompressed = decompressor.decompress(fixed_data, task_item.rows_in_buf)?;
2557 data_builder.append(&decompressed, 0..task_item.rows_in_buf);
2558 }
2559
2560 let unraveler = RepDefUnraveler::new(None, None, self.details.def_meaning.clone());
2561
2562 Ok(DecodedPage {
2563 data: data_builder.finish(),
2564 repdef: unraveler,
2565 })
2566 } else {
2567 let mut rep = Vec::with_capacity(self.num_rows);
2569 let mut def = Vec::with_capacity(self.num_rows);
2570
2571 for task_item in self.data.into_iter() {
2572 let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
2573 unreachable!()
2574 };
2575 let mut buf_slice = fixed_data.data.as_ref();
2576 let num_values = fixed_data.num_values as usize;
2577 let mut values = Vec::with_capacity(
2580 fixed_data.data.len()
2581 - (self.details.ctrl_word_parser.bytes_per_word() * num_values),
2582 );
2583 let mut visible_items = 0;
2584 for _ in 0..num_values {
2585 self.details
2587 .ctrl_word_parser
2588 .parse(buf_slice, &mut rep, &mut def);
2589 buf_slice = &buf_slice[self.details.ctrl_word_parser.bytes_per_word()..];
2590
2591 let is_visible = def
2592 .last()
2593 .map(|d| *d <= self.details.max_visible_def)
2594 .unwrap_or(true);
2595 if is_visible {
2596 values.extend_from_slice(buf_slice[..self.bytes_per_value].as_ref());
2598 buf_slice = &buf_slice[self.bytes_per_value..];
2599 visible_items += 1;
2600 }
2601 }
2602
2603 let values_buf = LanceBuffer::Owned(values);
2605 let fixed_data = FixedWidthDataBlock {
2606 bits_per_value: self.bytes_per_value as u64 * 8,
2607 block_info: BlockInfo::new(),
2608 data: values_buf,
2609 num_values: visible_items,
2610 };
2611 let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
2612 else {
2613 unreachable!()
2614 };
2615 let decompressed = decompressor.decompress(fixed_data, visible_items)?;
2616 data_builder.append(&decompressed, 0..visible_items);
2617 }
2618
2619 let repetition = if rep.is_empty() { None } else { Some(rep) };
2620 let definition = if def.is_empty() { None } else { Some(def) };
2621
2622 let unraveler =
2623 RepDefUnraveler::new(repetition, definition, self.details.def_meaning.clone());
2624 let data = data_builder.finish();
2625
2626 Ok(DecodedPage {
2627 data,
2628 repdef: unraveler,
2629 })
2630 }
2631 }
2632}
2633
2634#[derive(Debug)]
2635struct StructuralPrimitiveFieldSchedulingJob<'a> {
2636 scheduler: &'a StructuralPrimitiveFieldScheduler,
2637 ranges: Vec<Range<u64>>,
2638 page_idx: usize,
2639 range_idx: usize,
2640 global_row_offset: u64,
2641}
2642
2643impl<'a> StructuralPrimitiveFieldSchedulingJob<'a> {
2644 pub fn new(scheduler: &'a StructuralPrimitiveFieldScheduler, ranges: Vec<Range<u64>>) -> Self {
2645 Self {
2646 scheduler,
2647 ranges,
2648 page_idx: 0,
2649 range_idx: 0,
2650 global_row_offset: 0,
2651 }
2652 }
2653}
2654
2655impl StructuralSchedulingJob for StructuralPrimitiveFieldSchedulingJob<'_> {
2656 fn schedule_next(
2657 &mut self,
2658 context: &mut SchedulerContext,
2659 ) -> Result<Option<ScheduledScanLine>> {
2660 if self.range_idx >= self.ranges.len() {
2661 return Ok(None);
2662 }
2663 let mut range = self.ranges[self.range_idx].clone();
2665 let priority = range.start;
2666
2667 let mut cur_page = &self.scheduler.page_schedulers[self.page_idx];
2668 trace!(
2669 "Current range is {:?} and current page has {} rows",
2670 range,
2671 cur_page.num_rows
2672 );
2673 while cur_page.num_rows + self.global_row_offset <= range.start {
2675 self.global_row_offset += cur_page.num_rows;
2676 self.page_idx += 1;
2677 trace!("Skipping entire page of {} rows", cur_page.num_rows);
2678 cur_page = &self.scheduler.page_schedulers[self.page_idx];
2679 }
2680
2681 let mut ranges_in_page = Vec::new();
2685 while cur_page.num_rows + self.global_row_offset > range.start {
2686 range.start = range.start.max(self.global_row_offset);
2687 let start_in_page = range.start - self.global_row_offset;
2688 let end_in_page = start_in_page + (range.end - range.start);
2689 let end_in_page = end_in_page.min(cur_page.num_rows);
2690 let last_in_range = (end_in_page + self.global_row_offset) >= range.end;
2691
2692 ranges_in_page.push(start_in_page..end_in_page);
2693 if last_in_range {
2694 self.range_idx += 1;
2695 if self.range_idx == self.ranges.len() {
2696 break;
2697 }
2698 range = self.ranges[self.range_idx].clone();
2699 } else {
2700 break;
2701 }
2702 }
2703
2704 let num_rows_in_next = ranges_in_page.iter().map(|r| r.end - r.start).sum();
2705 trace!(
2706 "Scheduling {} rows across {} ranges from page with {} rows (priority={}, column_index={}, page_index={})",
2707 num_rows_in_next,
2708 ranges_in_page.len(),
2709 cur_page.num_rows,
2710 priority,
2711 self.scheduler.column_index,
2712 cur_page.page_index,
2713 );
2714
2715 self.global_row_offset += cur_page.num_rows;
2716 self.page_idx += 1;
2717
2718 let page_decoder = cur_page
2719 .scheduler
2720 .schedule_ranges(&ranges_in_page, context.io())?;
2721
2722 let cur_path = context.current_path();
2723 let page_index = cur_page.page_index;
2724 let unloaded_page = async move {
2725 let page_decoder = page_decoder.await?;
2726 Ok(LoadedPage {
2727 decoder: page_decoder,
2728 path: cur_path,
2729 page_index,
2730 })
2731 }
2732 .boxed();
2733
2734 Ok(Some(ScheduledScanLine {
2735 decoders: vec![MessageType::UnloadedPage(UnloadedPage(unloaded_page))],
2736 rows_scheduled: num_rows_in_next,
2737 }))
2738 }
2739}
2740
2741#[derive(Debug)]
2742struct PageInfoAndScheduler {
2743 page_index: usize,
2744 num_rows: u64,
2745 scheduler: Box<dyn StructuralPageScheduler>,
2746}
2747
2748#[derive(Debug)]
2753pub struct StructuralPrimitiveFieldScheduler {
2754 page_schedulers: Vec<PageInfoAndScheduler>,
2755 column_index: u32,
2756}
2757
2758impl StructuralPrimitiveFieldScheduler {
2759 pub fn try_new(
2760 column_info: &ColumnInfo,
2761 decompressors: &dyn DecompressionStrategy,
2762 cache_repetition_index: bool,
2763 ) -> Result<Self> {
2764 let page_schedulers = column_info
2765 .page_infos
2766 .iter()
2767 .enumerate()
2768 .map(|(page_index, page_info)| {
2769 Self::page_info_to_scheduler(
2770 page_info,
2771 page_index,
2772 column_info.index as usize,
2773 decompressors,
2774 cache_repetition_index,
2775 )
2776 })
2777 .collect::<Result<Vec<_>>>()?;
2778 Ok(Self {
2779 page_schedulers,
2780 column_index: column_info.index,
2781 })
2782 }
2783
2784 fn page_info_to_scheduler(
2785 page_info: &PageInfo,
2786 page_index: usize,
2787 _column_index: usize,
2788 decompressors: &dyn DecompressionStrategy,
2789 cache_repetition_index: bool,
2790 ) -> Result<PageInfoAndScheduler> {
2791 let scheduler: Box<dyn StructuralPageScheduler> =
2792 match page_info.encoding.as_structural().layout.as_ref() {
2793 Some(pb::page_layout::Layout::MiniBlockLayout(mini_block)) => {
2794 Box::new(MiniBlockScheduler::try_new(
2795 &page_info.buffer_offsets_and_sizes,
2796 page_info.priority,
2797 mini_block.num_items,
2798 mini_block,
2799 decompressors,
2800 )?)
2801 }
2802 Some(pb::page_layout::Layout::FullZipLayout(full_zip)) => {
2803 let mut scheduler = FullZipScheduler::try_new(
2804 &page_info.buffer_offsets_and_sizes,
2805 page_info.priority,
2806 page_info.num_rows,
2807 full_zip,
2808 decompressors,
2809 )?;
2810 scheduler.enable_cache = cache_repetition_index;
2811 Box::new(scheduler)
2812 }
2813 Some(pb::page_layout::Layout::AllNullLayout(all_null)) => {
2814 let def_meaning = all_null
2815 .layers
2816 .iter()
2817 .map(|l| ProtobufUtils::repdef_layer_to_def_interp(*l))
2818 .collect::<Vec<_>>();
2819 if def_meaning.len() == 1
2820 && def_meaning[0] == DefinitionInterpretation::NullableItem
2821 {
2822 Box::new(SimpleAllNullScheduler::default())
2823 as Box<dyn StructuralPageScheduler>
2824 } else {
2825 Box::new(ComplexAllNullScheduler::new(
2826 page_info.buffer_offsets_and_sizes.clone(),
2827 def_meaning.into(),
2828 )) as Box<dyn StructuralPageScheduler>
2829 }
2830 }
2831 _ => todo!(),
2832 };
2833 Ok(PageInfoAndScheduler {
2834 page_index,
2835 num_rows: page_info.num_rows,
2836 scheduler,
2837 })
2838 }
2839}
2840
2841pub trait CachedPageData: Any + Send + Sync + DeepSizeOf + 'static {
2842 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static>;
2843}
2844
2845pub struct NoCachedPageData;
2846
2847impl DeepSizeOf for NoCachedPageData {
2848 fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
2849 0
2850 }
2851}
2852impl CachedPageData for NoCachedPageData {
2853 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
2854 self
2855 }
2856}
2857
2858pub struct CachedFieldData {
2859 pages: Vec<Arc<dyn CachedPageData>>,
2860}
2861
2862impl DeepSizeOf for CachedFieldData {
2863 fn deep_size_of_children(&self, ctx: &mut Context) -> usize {
2864 self.pages.deep_size_of_children(ctx)
2865 }
2866}
2867
2868#[derive(Debug, Clone)]
2870pub struct FieldDataCacheKey {
2871 pub column_index: u32,
2872}
2873
2874impl CacheKey for FieldDataCacheKey {
2875 type ValueType = CachedFieldData;
2876
2877 fn key(&self) -> std::borrow::Cow<'_, str> {
2878 self.column_index.to_string().into()
2879 }
2880}
2881
2882impl StructuralFieldScheduler for StructuralPrimitiveFieldScheduler {
2883 fn initialize<'a>(
2884 &'a mut self,
2885 _filter: &'a FilterExpression,
2886 context: &'a SchedulerContext,
2887 ) -> BoxFuture<'a, Result<()>> {
2888 let cache_key = FieldDataCacheKey {
2889 column_index: self.column_index,
2890 };
2891 let cache = context.cache().clone();
2892
2893 async move {
2894 if let Some(cached_data) = cache.get_with_key(&cache_key).await {
2895 self.page_schedulers
2896 .iter_mut()
2897 .zip(cached_data.pages.iter())
2898 .for_each(|(page_scheduler, cached_data)| {
2899 page_scheduler.scheduler.load(cached_data);
2900 });
2901 return Ok(());
2902 }
2903
2904 let page_data = self
2905 .page_schedulers
2906 .iter_mut()
2907 .map(|s| s.scheduler.initialize(context.io()))
2908 .collect::<FuturesOrdered<_>>();
2909
2910 let page_data = page_data.try_collect::<Vec<_>>().await?;
2911 let cached_data = Arc::new(CachedFieldData { pages: page_data });
2912 cache.insert_with_key(&cache_key, cached_data).await;
2913 Ok(())
2914 }
2915 .boxed()
2916 }
2917
2918 fn schedule_ranges<'a>(
2919 &'a self,
2920 ranges: &[Range<u64>],
2921 _filter: &FilterExpression,
2922 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
2923 let ranges = ranges.to_vec();
2924 Ok(Box::new(StructuralPrimitiveFieldSchedulingJob::new(
2925 self, ranges,
2926 )))
2927 }
2928}
2929
2930#[derive(Debug)]
2933pub struct StructuralCompositeDecodeArrayTask {
2934 tasks: Vec<Box<dyn DecodePageTask>>,
2935 should_validate: bool,
2936 data_type: DataType,
2937}
2938
2939impl StructuralCompositeDecodeArrayTask {
2940 fn restore_validity(
2941 array: Arc<dyn Array>,
2942 unraveler: &mut CompositeRepDefUnraveler,
2943 ) -> Arc<dyn Array> {
2944 let validity = unraveler.unravel_validity(array.len());
2945 let Some(validity) = validity else {
2946 return array;
2947 };
2948 if array.data_type() == &DataType::Null {
2949 return array;
2951 }
2952 assert_eq!(validity.len(), array.len());
2953 make_array(unsafe {
2956 array
2957 .to_data()
2958 .into_builder()
2959 .nulls(Some(validity))
2960 .build_unchecked()
2961 })
2962 }
2963}
2964
2965impl StructuralDecodeArrayTask for StructuralCompositeDecodeArrayTask {
2966 fn decode(self: Box<Self>) -> Result<DecodedArray> {
2967 let mut arrays = Vec::with_capacity(self.tasks.len());
2968 let mut unravelers = Vec::with_capacity(self.tasks.len());
2969 for task in self.tasks {
2970 let decoded = task.decode()?;
2971 unravelers.push(decoded.repdef);
2972
2973 let array = make_array(
2974 decoded
2975 .data
2976 .into_arrow(self.data_type.clone(), self.should_validate)?,
2977 );
2978
2979 arrays.push(array);
2980 }
2981 let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
2982 let array = arrow_select::concat::concat(&array_refs)?;
2983 let mut repdef = CompositeRepDefUnraveler::new(unravelers);
2984
2985 let array = Self::restore_validity(array, &mut repdef);
2986
2987 Ok(DecodedArray { array, repdef })
2988 }
2989}
2990
2991#[derive(Debug)]
2992pub struct StructuralPrimitiveFieldDecoder {
2993 field: Arc<ArrowField>,
2994 page_decoders: VecDeque<Box<dyn StructuralPageDecoder>>,
2995 should_validate: bool,
2996 rows_drained_in_current: u64,
2997}
2998
2999impl StructuralPrimitiveFieldDecoder {
3000 pub fn new(field: &Arc<ArrowField>, should_validate: bool) -> Self {
3001 Self {
3002 field: field.clone(),
3003 page_decoders: VecDeque::new(),
3004 should_validate,
3005 rows_drained_in_current: 0,
3006 }
3007 }
3008}
3009
3010impl StructuralFieldDecoder for StructuralPrimitiveFieldDecoder {
3011 fn accept_page(&mut self, child: LoadedPage) -> Result<()> {
3012 assert!(child.path.is_empty());
3013 self.page_decoders.push_back(child.decoder);
3014 Ok(())
3015 }
3016
3017 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
3018 let mut remaining = num_rows;
3019 let mut tasks = Vec::new();
3020 while remaining > 0 {
3021 let cur_page = self.page_decoders.front_mut().unwrap();
3022 let num_in_page = cur_page.num_rows() - self.rows_drained_in_current;
3023 let to_take = num_in_page.min(remaining);
3024
3025 let task = cur_page.drain(to_take)?;
3026 tasks.push(task);
3027
3028 if to_take == num_in_page {
3029 self.page_decoders.pop_front();
3030 self.rows_drained_in_current = 0;
3031 } else {
3032 self.rows_drained_in_current += to_take;
3033 }
3034
3035 remaining -= to_take;
3036 }
3037 Ok(Box::new(StructuralCompositeDecodeArrayTask {
3038 tasks,
3039 should_validate: self.should_validate,
3040 data_type: self.field.data_type().clone(),
3041 }))
3042 }
3043
3044 fn data_type(&self) -> &DataType {
3045 self.field.data_type()
3046 }
3047}
3048
3049struct SerializedFullZip {
3051 values: LanceBuffer,
3053 repetition_index: Option<LanceBuffer>,
3055}
3056
3057const MINIBLOCK_ALIGNMENT: usize = 8;
3077
3078pub struct PrimitiveStructuralEncoder {
3105 accumulation_queue: AccumulationQueue,
3107
3108 keep_original_array: bool,
3109 accumulated_repdefs: Vec<RepDefBuilder>,
3110 compression_strategy: Arc<dyn CompressionStrategy>,
3112 column_index: u32,
3113 field: Field,
3114 encoding_metadata: Arc<HashMap<String, String>>,
3115}
3116
3117struct CompressedLevelsChunk {
3118 data: LanceBuffer,
3119 num_levels: u16,
3120}
3121
3122struct CompressedLevels {
3123 data: Vec<CompressedLevelsChunk>,
3124 compression: pb::ArrayEncoding,
3125 rep_index: Option<LanceBuffer>,
3126}
3127
3128struct SerializedMiniBlockPage {
3129 num_buffers: u64,
3130 data: LanceBuffer,
3131 metadata: LanceBuffer,
3132}
3133
3134impl PrimitiveStructuralEncoder {
3135 pub fn try_new(
3136 options: &EncodingOptions,
3137 compression_strategy: Arc<dyn CompressionStrategy>,
3138 column_index: u32,
3139 field: Field,
3140 encoding_metadata: Arc<HashMap<String, String>>,
3141 ) -> Result<Self> {
3142 Ok(Self {
3143 accumulation_queue: AccumulationQueue::new(
3144 options.cache_bytes_per_column,
3145 column_index,
3146 options.keep_original_array,
3147 ),
3148 keep_original_array: options.keep_original_array,
3149 accumulated_repdefs: Vec::new(),
3150 column_index,
3151 compression_strategy,
3152 field,
3153 encoding_metadata,
3154 })
3155 }
3156
3157 fn is_narrow(data_block: &DataBlock) -> bool {
3165 const MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE: u64 = 256;
3166
3167 if let Some(max_len_array) = data_block.get_stat(Stat::MaxLength) {
3168 let max_len_array = max_len_array
3169 .as_any()
3170 .downcast_ref::<PrimitiveArray<UInt64Type>>()
3171 .unwrap();
3172 if max_len_array.value(0) < MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE {
3173 return true;
3174 }
3175 }
3176 false
3177 }
3178
3179 fn prefers_miniblock(
3180 data_block: &DataBlock,
3181 encoding_metadata: &HashMap<String, String>,
3182 ) -> bool {
3183 if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3185 return user_requested.to_lowercase() == STRUCTURAL_ENCODING_MINIBLOCK;
3186 }
3187 Self::is_narrow(data_block)
3189 }
3190
3191 fn prefers_fullzip(encoding_metadata: &HashMap<String, String>) -> bool {
3192 if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3196 return user_requested.to_lowercase() == STRUCTURAL_ENCODING_FULLZIP;
3197 }
3198 true
3199 }
3200
3201 fn serialize_miniblocks(
3248 miniblocks: MiniBlockCompressed,
3249 rep: Option<Vec<CompressedLevelsChunk>>,
3250 def: Option<Vec<CompressedLevelsChunk>>,
3251 ) -> SerializedMiniBlockPage {
3252 let bytes_rep = rep
3253 .as_ref()
3254 .map(|rep| rep.iter().map(|r| r.data.len()).sum::<usize>())
3255 .unwrap_or(0);
3256 let bytes_def = def
3257 .as_ref()
3258 .map(|def| def.iter().map(|d| d.data.len()).sum::<usize>())
3259 .unwrap_or(0);
3260 let bytes_data = miniblocks.data.iter().map(|d| d.len()).sum::<usize>();
3261 let mut num_buffers = miniblocks.data.len();
3262 if rep.is_some() {
3263 num_buffers += 1;
3264 }
3265 if def.is_some() {
3266 num_buffers += 1;
3267 }
3268 let max_extra = 9 * num_buffers;
3270 let mut data_buffer = Vec::with_capacity(bytes_rep + bytes_def + bytes_data + max_extra);
3271 let mut meta_buffer = Vec::with_capacity(miniblocks.chunks.len() * 2);
3272
3273 let mut rep_iter = rep.map(|r| r.into_iter());
3274 let mut def_iter = def.map(|d| d.into_iter());
3275
3276 let mut buffer_offsets = vec![0; miniblocks.data.len()];
3277 for chunk in miniblocks.chunks {
3278 let start_pos = data_buffer.len();
3279 debug_assert_eq!(start_pos % MINIBLOCK_ALIGNMENT, 0);
3281
3282 let rep = rep_iter.as_mut().map(|r| r.next().unwrap());
3283 let def = def_iter.as_mut().map(|d| d.next().unwrap());
3284
3285 let num_levels = rep
3287 .as_ref()
3288 .map(|r| r.num_levels)
3289 .unwrap_or(def.as_ref().map(|d| d.num_levels).unwrap_or(0));
3290 data_buffer.extend_from_slice(&num_levels.to_le_bytes());
3291
3292 if let Some(rep) = rep.as_ref() {
3294 let bytes_rep = u16::try_from(rep.data.len()).unwrap();
3295 data_buffer.extend_from_slice(&bytes_rep.to_le_bytes());
3296 }
3297 if let Some(def) = def.as_ref() {
3298 let bytes_def = u16::try_from(def.data.len()).unwrap();
3299 data_buffer.extend_from_slice(&bytes_def.to_le_bytes());
3300 }
3301
3302 for buffer_size in &chunk.buffer_sizes {
3303 let bytes = *buffer_size;
3304 data_buffer.extend_from_slice(&bytes.to_le_bytes());
3305 }
3306
3307 let add_padding = |data_buffer: &mut Vec<u8>| {
3309 let pad = pad_bytes::<MINIBLOCK_ALIGNMENT>(data_buffer.len());
3310 data_buffer.extend(iter::repeat_n(FILL_BYTE, pad));
3311 };
3312 add_padding(&mut data_buffer);
3313
3314 if let Some(rep) = rep.as_ref() {
3316 data_buffer.extend_from_slice(&rep.data);
3317 add_padding(&mut data_buffer);
3318 }
3319 if let Some(def) = def.as_ref() {
3320 data_buffer.extend_from_slice(&def.data);
3321 add_padding(&mut data_buffer);
3322 }
3323 for (buffer_size, (buffer, buffer_offset)) in chunk
3324 .buffer_sizes
3325 .iter()
3326 .zip(miniblocks.data.iter().zip(buffer_offsets.iter_mut()))
3327 {
3328 let start = *buffer_offset;
3329 let end = start + *buffer_size as usize;
3330 *buffer_offset += *buffer_size as usize;
3331 data_buffer.extend_from_slice(&buffer[start..end]);
3332 add_padding(&mut data_buffer);
3333 }
3334
3335 let chunk_bytes = data_buffer.len() - start_pos;
3336 assert!(chunk_bytes <= 16 * 1024);
3337 assert!(chunk_bytes > 0);
3338 assert_eq!(chunk_bytes % 8, 0);
3339 let divided_bytes = chunk_bytes / MINIBLOCK_ALIGNMENT;
3343 let divided_bytes_minus_one = (divided_bytes - 1) as u64;
3344
3345 let metadata = ((divided_bytes_minus_one << 4) | chunk.log_num_values as u64) as u16;
3346 meta_buffer.extend_from_slice(&metadata.to_le_bytes());
3347 }
3348
3349 let data_buffer = LanceBuffer::Owned(data_buffer);
3350 let metadata_buffer = LanceBuffer::Owned(meta_buffer);
3351
3352 SerializedMiniBlockPage {
3353 num_buffers: miniblocks.data.len() as u64,
3354 data: data_buffer,
3355 metadata: metadata_buffer,
3356 }
3357 }
3358
3359 fn compress_levels(
3364 mut levels: RepDefSlicer<'_>,
3365 num_elements: u64,
3366 compression_strategy: &dyn CompressionStrategy,
3367 chunks: &[MiniBlockChunk],
3368 max_rep: u16,
3370 ) -> Result<CompressedLevels> {
3371 let mut rep_index = if max_rep > 0 {
3372 Vec::with_capacity(chunks.len())
3373 } else {
3374 vec![]
3375 };
3376 let num_levels = levels.num_levels() as u64;
3378 let mut levels_buf = levels.all_levels().try_clone().unwrap();
3379 let levels_block = DataBlock::FixedWidth(FixedWidthDataBlock {
3380 data: levels_buf.borrow_and_clone(),
3381 bits_per_value: 16,
3382 num_values: num_levels,
3383 block_info: BlockInfo::new(),
3384 });
3385 let levels_field = Field::new_arrow("", DataType::UInt16, false)?;
3386 let (compressor, compressor_desc) =
3388 compression_strategy.create_block_compressor(&levels_field, &levels_block)?;
3389 let mut level_chunks = Vec::with_capacity(chunks.len());
3391 let mut values_counter = 0;
3392 for (chunk_idx, chunk) in chunks.iter().enumerate() {
3393 let chunk_num_values = chunk.num_values(values_counter, num_elements);
3394 values_counter += chunk_num_values;
3395 let mut chunk_levels = if chunk_idx < chunks.len() - 1 {
3396 levels.slice_next(chunk_num_values as usize)
3397 } else {
3398 levels.slice_rest()
3399 };
3400 let num_chunk_levels = (chunk_levels.len() / 2) as u64;
3401 if max_rep > 0 {
3402 let rep_values = chunk_levels.borrow_to_typed_slice::<u16>();
3412 let rep_values = rep_values.as_ref();
3413
3414 let mut num_rows = rep_values.iter().skip(1).filter(|v| **v == max_rep).count();
3417 let num_leftovers = if chunk_idx < chunks.len() - 1 {
3418 rep_values
3419 .iter()
3420 .rev()
3421 .position(|v| *v == max_rep)
3422 .map(|pos| pos + 1)
3424 .unwrap_or(rep_values.len())
3425 } else {
3426 0
3428 };
3429
3430 if chunk_idx != 0 && rep_values[0] == max_rep {
3431 let rep_len = rep_index.len();
3435 if rep_index[rep_len - 1] != 0 {
3436 rep_index[rep_len - 2] += 1;
3438 rep_index[rep_len - 1] = 0;
3439 }
3440 }
3441
3442 if chunk_idx == chunks.len() - 1 {
3443 num_rows += 1;
3445 }
3446 rep_index.push(num_rows as u64);
3447 rep_index.push(num_leftovers as u64);
3448 }
3449 let chunk_levels_block = DataBlock::FixedWidth(FixedWidthDataBlock {
3450 data: chunk_levels,
3451 bits_per_value: 16,
3452 num_values: num_chunk_levels,
3453 block_info: BlockInfo::new(),
3454 });
3455 let compressed_levels = compressor.compress(chunk_levels_block)?;
3456 level_chunks.push(CompressedLevelsChunk {
3457 data: compressed_levels,
3458 num_levels: num_chunk_levels as u16,
3459 });
3460 }
3461 debug_assert_eq!(levels.num_levels_remaining(), 0);
3462 let rep_index = if rep_index.is_empty() {
3463 None
3464 } else {
3465 Some(LanceBuffer::reinterpret_vec(rep_index))
3466 };
3467 Ok(CompressedLevels {
3468 data: level_chunks,
3469 compression: compressor_desc,
3470 rep_index,
3471 })
3472 }
3473
3474 fn encode_simple_all_null(
3475 column_idx: u32,
3476 num_rows: u64,
3477 row_number: u64,
3478 ) -> Result<EncodedPage> {
3479 let description = ProtobufUtils::simple_all_null_layout();
3480 Ok(EncodedPage {
3481 column_idx,
3482 data: vec![],
3483 description: PageEncoding::Structural(description),
3484 num_rows,
3485 row_number,
3486 })
3487 }
3488
3489 fn encode_complex_all_null(
3493 column_idx: u32,
3494 repdefs: Vec<RepDefBuilder>,
3495 row_number: u64,
3496 num_rows: u64,
3497 ) -> Result<EncodedPage> {
3498 let repdef = RepDefBuilder::serialize(repdefs);
3499
3500 let rep_bytes = if let Some(rep) = repdef.repetition_levels.as_ref() {
3502 LanceBuffer::reinterpret_slice(rep.clone())
3503 } else {
3504 LanceBuffer::empty()
3505 };
3506
3507 let def_bytes = if let Some(def) = repdef.definition_levels.as_ref() {
3508 LanceBuffer::reinterpret_slice(def.clone())
3509 } else {
3510 LanceBuffer::empty()
3511 };
3512
3513 let description = ProtobufUtils::all_null_layout(&repdef.def_meaning);
3514 Ok(EncodedPage {
3515 column_idx,
3516 data: vec![rep_bytes, def_bytes],
3517 description: PageEncoding::Structural(description),
3518 num_rows,
3519 row_number,
3520 })
3521 }
3522
3523 #[allow(clippy::too_many_arguments)]
3524 fn encode_miniblock(
3525 column_idx: u32,
3526 field: &Field,
3527 compression_strategy: &dyn CompressionStrategy,
3528 data: DataBlock,
3529 repdefs: Vec<RepDefBuilder>,
3530 row_number: u64,
3531 dictionary_data: Option<DataBlock>,
3532 num_rows: u64,
3533 ) -> Result<EncodedPage> {
3534 let repdef = RepDefBuilder::serialize(repdefs);
3535
3536 if let DataBlock::AllNull(_null_block) = data {
3537 todo!()
3540 }
3541
3542 let num_items = data.num_values();
3543
3544 let compressor = compression_strategy.create_miniblock_compressor(field, &data)?;
3545 let (compressed_data, value_encoding) = compressor.compress(data)?;
3546
3547 let max_rep = repdef.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
3548
3549 let mut compressed_rep = repdef
3550 .rep_slicer()
3551 .map(|rep_slicer| {
3552 Self::compress_levels(
3553 rep_slicer,
3554 num_items,
3555 compression_strategy,
3556 &compressed_data.chunks,
3557 max_rep,
3558 )
3559 })
3560 .transpose()?;
3561
3562 let (rep_index, rep_index_depth) =
3563 match compressed_rep.as_mut().and_then(|cr| cr.rep_index.as_mut()) {
3564 Some(rep_index) => (Some(rep_index.borrow_and_clone()), 1),
3565 None => (None, 0),
3566 };
3567
3568 let mut compressed_def = repdef
3569 .def_slicer()
3570 .map(|def_slicer| {
3571 Self::compress_levels(
3572 def_slicer,
3573 num_items,
3574 compression_strategy,
3575 &compressed_data.chunks,
3576 0,
3577 )
3578 })
3579 .transpose()?;
3580
3581 let rep_data = compressed_rep
3587 .as_mut()
3588 .map(|cr| std::mem::take(&mut cr.data));
3589 let def_data = compressed_def
3590 .as_mut()
3591 .map(|cd| std::mem::take(&mut cd.data));
3592
3593 let serialized = Self::serialize_miniblocks(compressed_data, rep_data, def_data);
3594
3595 let mut data = Vec::with_capacity(4);
3597 data.push(serialized.metadata);
3598 data.push(serialized.data);
3599
3600 if let Some(dictionary_data) = dictionary_data {
3601 let num_dictionary_items = dictionary_data.num_values();
3602 let dummy_dictionary_field = Field::new_arrow("", DataType::UInt16, false)?;
3604
3605 let (compressor, dictionary_encoding) = compression_strategy
3606 .create_block_compressor(&dummy_dictionary_field, &dictionary_data)?;
3607 let dictionary_buffer = compressor.compress(dictionary_data)?;
3608
3609 data.push(dictionary_buffer);
3610 if let Some(rep_index) = rep_index {
3611 data.push(rep_index);
3612 }
3613
3614 let description = ProtobufUtils::miniblock_layout(
3615 compressed_rep.map(|cr| cr.compression),
3616 compressed_def.map(|cd| cd.compression),
3617 value_encoding,
3618 rep_index_depth,
3619 serialized.num_buffers,
3620 Some((dictionary_encoding, num_dictionary_items)),
3621 &repdef.def_meaning,
3622 num_items,
3623 );
3624 Ok(EncodedPage {
3625 num_rows,
3626 column_idx,
3627 data,
3628 description: PageEncoding::Structural(description),
3629 row_number,
3630 })
3631 } else {
3632 let description = ProtobufUtils::miniblock_layout(
3633 compressed_rep.map(|cr| cr.compression),
3634 compressed_def.map(|cd| cd.compression),
3635 value_encoding,
3636 rep_index_depth,
3637 serialized.num_buffers,
3638 None,
3639 &repdef.def_meaning,
3640 num_items,
3641 );
3642
3643 if let Some(mut rep_index) = rep_index {
3644 let view = rep_index.borrow_to_typed_slice::<u64>();
3645 let total = view.chunks_exact(2).map(|c| c[0]).sum::<u64>();
3646 debug_assert_eq!(total, num_rows);
3647
3648 data.push(rep_index);
3649 }
3650
3651 Ok(EncodedPage {
3652 num_rows,
3653 column_idx,
3654 data,
3655 description: PageEncoding::Structural(description),
3656 row_number,
3657 })
3658 }
3659 }
3660
3661 fn serialize_full_zip_fixed(
3663 fixed: FixedWidthDataBlock,
3664 mut repdef: ControlWordIterator,
3665 num_values: u64,
3666 ) -> SerializedFullZip {
3667 let len = fixed.data.len() + repdef.bytes_per_word() * num_values as usize;
3668 let mut zipped_data = Vec::with_capacity(len);
3669
3670 let max_rep_index_val = if repdef.has_repetition() {
3671 len as u64
3672 } else {
3673 0
3675 };
3676 let mut rep_index_builder =
3677 BytepackedIntegerEncoder::with_capacity(num_values as usize + 1, max_rep_index_val);
3678
3679 assert_eq!(
3682 fixed.bits_per_value % 8,
3683 0,
3684 "Non-byte aligned full-zip compression not yet supported"
3685 );
3686
3687 let bytes_per_value = fixed.bits_per_value as usize / 8;
3688
3689 let mut data_iter = fixed.data.chunks_exact(bytes_per_value);
3690 let mut offset = 0;
3691 while let Some(control) = repdef.append_next(&mut zipped_data) {
3692 if control.is_new_row {
3693 debug_assert!(offset <= len);
3695 unsafe { rep_index_builder.append(offset as u64) };
3697 }
3698 if control.is_visible {
3699 let value = data_iter.next().unwrap();
3700 zipped_data.extend_from_slice(value);
3701 }
3702 offset = zipped_data.len();
3703 }
3704
3705 debug_assert_eq!(zipped_data.len(), len);
3706 unsafe {
3709 rep_index_builder.append(zipped_data.len() as u64);
3710 }
3711
3712 let zipped_data = LanceBuffer::Owned(zipped_data);
3713 let rep_index = rep_index_builder.into_data();
3714 let rep_index = if rep_index.is_empty() {
3715 None
3716 } else {
3717 Some(LanceBuffer::Owned(rep_index))
3718 };
3719 SerializedFullZip {
3720 values: zipped_data,
3721 repetition_index: rep_index,
3722 }
3723 }
3724
3725 fn serialize_full_zip_variable(
3729 mut variable: VariableWidthBlock,
3730 mut repdef: ControlWordIterator,
3731 num_items: u64,
3732 ) -> SerializedFullZip {
3733 let bytes_per_offset = variable.bits_per_offset as usize / 8;
3734 assert_eq!(
3735 variable.bits_per_offset % 8,
3736 0,
3737 "Only byte-aligned offsets supported"
3738 );
3739 let len = variable.data.len()
3740 + repdef.bytes_per_word() * num_items as usize
3741 + bytes_per_offset * variable.num_values as usize;
3742 let mut buf = Vec::with_capacity(len);
3743
3744 let max_rep_index_val = len as u64;
3745 let mut rep_index_builder =
3746 BytepackedIntegerEncoder::with_capacity(num_items as usize + 1, max_rep_index_val);
3747
3748 match bytes_per_offset {
3750 4 => {
3751 let offs = variable.offsets.borrow_to_typed_slice::<u32>();
3752 let mut rep_offset = 0;
3753 let mut windows_iter = offs.as_ref().windows(2);
3754 while let Some(control) = repdef.append_next(&mut buf) {
3755 if control.is_new_row {
3756 debug_assert!(rep_offset <= len);
3758 unsafe { rep_index_builder.append(rep_offset as u64) };
3760 }
3761 if control.is_visible {
3762 let window = windows_iter.next().unwrap();
3763 if control.is_valid_item {
3764 buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
3765 buf.extend_from_slice(
3766 &variable.data[window[0] as usize..window[1] as usize],
3767 );
3768 }
3769 }
3770 rep_offset = buf.len();
3771 }
3772 }
3773 8 => {
3774 let offs = variable.offsets.borrow_to_typed_slice::<u64>();
3775 let mut rep_offset = 0;
3776 let mut windows_iter = offs.as_ref().windows(2);
3777 while let Some(control) = repdef.append_next(&mut buf) {
3778 if control.is_new_row {
3779 debug_assert!(rep_offset <= len);
3781 unsafe { rep_index_builder.append(rep_offset as u64) };
3783 }
3784 if control.is_visible {
3785 let window = windows_iter.next().unwrap();
3786 if control.is_valid_item {
3787 buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
3788 buf.extend_from_slice(
3789 &variable.data[window[0] as usize..window[1] as usize],
3790 );
3791 }
3792 }
3793 rep_offset = buf.len();
3794 }
3795 }
3796 _ => panic!("Unsupported offset size"),
3797 }
3798
3799 debug_assert!(buf.len() <= len);
3802 unsafe {
3805 rep_index_builder.append(buf.len() as u64);
3806 }
3807
3808 let zipped_data = LanceBuffer::Owned(buf);
3809 let rep_index = rep_index_builder.into_data();
3810 debug_assert!(!rep_index.is_empty());
3811 let rep_index = Some(LanceBuffer::Owned(rep_index));
3812 SerializedFullZip {
3813 values: zipped_data,
3814 repetition_index: rep_index,
3815 }
3816 }
3817
3818 fn serialize_full_zip(
3821 compressed_data: PerValueDataBlock,
3822 repdef: ControlWordIterator,
3823 num_items: u64,
3824 ) -> SerializedFullZip {
3825 match compressed_data {
3826 PerValueDataBlock::Fixed(fixed) => {
3827 Self::serialize_full_zip_fixed(fixed, repdef, num_items)
3828 }
3829 PerValueDataBlock::Variable(var) => {
3830 Self::serialize_full_zip_variable(var, repdef, num_items)
3831 }
3832 }
3833 }
3834
3835 fn encode_full_zip(
3836 column_idx: u32,
3837 field: &Field,
3838 compression_strategy: &dyn CompressionStrategy,
3839 data: DataBlock,
3840 repdefs: Vec<RepDefBuilder>,
3841 row_number: u64,
3842 num_lists: u64,
3843 ) -> Result<EncodedPage> {
3844 let repdef = RepDefBuilder::serialize(repdefs);
3845 let max_rep = repdef
3846 .repetition_levels
3847 .as_ref()
3848 .map_or(0, |r| r.iter().max().copied().unwrap_or(0));
3849 let max_def = repdef
3850 .definition_levels
3851 .as_ref()
3852 .map_or(0, |d| d.iter().max().copied().unwrap_or(0));
3853
3854 let (num_items, num_visible_items) =
3858 if let Some(rep_levels) = repdef.repetition_levels.as_ref() {
3859 (rep_levels.len() as u64, data.num_values())
3862 } else {
3863 (data.num_values(), data.num_values())
3865 };
3866
3867 let max_visible_def = repdef.max_visible_level.unwrap_or(u16::MAX);
3868
3869 let repdef_iter = build_control_word_iterator(
3870 repdef.repetition_levels.as_deref(),
3871 max_rep,
3872 repdef.definition_levels.as_deref(),
3873 max_def,
3874 max_visible_def,
3875 num_items as usize,
3876 );
3877 let bits_rep = repdef_iter.bits_rep();
3878 let bits_def = repdef_iter.bits_def();
3879
3880 let compressor = compression_strategy.create_per_value(field, &data)?;
3881 let (compressed_data, value_encoding) = compressor.compress(data)?;
3882
3883 let description = match &compressed_data {
3884 PerValueDataBlock::Fixed(fixed) => ProtobufUtils::fixed_full_zip_layout(
3885 bits_rep,
3886 bits_def,
3887 fixed.bits_per_value as u32,
3888 value_encoding,
3889 &repdef.def_meaning,
3890 num_items as u32,
3891 num_visible_items as u32,
3892 ),
3893 PerValueDataBlock::Variable(variable) => ProtobufUtils::variable_full_zip_layout(
3894 bits_rep,
3895 bits_def,
3896 variable.bits_per_offset as u32,
3897 value_encoding,
3898 &repdef.def_meaning,
3899 num_items as u32,
3900 num_visible_items as u32,
3901 ),
3902 };
3903
3904 let zipped = Self::serialize_full_zip(compressed_data, repdef_iter, num_items);
3905
3906 let data = if let Some(repindex) = zipped.repetition_index {
3907 vec![zipped.values, repindex]
3908 } else {
3909 vec![zipped.values]
3910 };
3911
3912 Ok(EncodedPage {
3913 num_rows: num_lists,
3914 column_idx,
3915 data,
3916 description: PageEncoding::Structural(description),
3917 row_number,
3918 })
3919 }
3920
3921 fn dictionary_encode(mut data_block: DataBlock) -> (DataBlock, DataBlock) {
3922 let cardinality = data_block
3923 .get_stat(Stat::Cardinality)
3924 .unwrap()
3925 .as_primitive::<UInt64Type>()
3926 .value(0);
3927 match data_block {
3928 DataBlock::FixedWidth(ref mut fixed_width_data_block) => {
3929 let mut map = HashMap::new();
3932 let u128_slice = fixed_width_data_block.data.borrow_to_typed_slice::<u128>();
3933 let u128_slice = u128_slice.as_ref();
3934 let mut dictionary_buffer = Vec::with_capacity(cardinality as usize);
3935 let mut indices_buffer =
3936 Vec::with_capacity(fixed_width_data_block.num_values as usize);
3937 let mut curr_idx: i32 = 0;
3938 u128_slice.iter().for_each(|&value| {
3939 let idx = *map.entry(value).or_insert_with(|| {
3940 dictionary_buffer.push(value);
3941 curr_idx += 1;
3942 curr_idx - 1
3943 });
3944 indices_buffer.push(idx);
3945 });
3946 let dictionary_data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
3947 data: LanceBuffer::reinterpret_vec(dictionary_buffer),
3948 bits_per_value: 128,
3949 num_values: curr_idx as u64,
3950 block_info: BlockInfo::default(),
3951 });
3952 let mut indices_data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
3953 data: LanceBuffer::reinterpret_vec(indices_buffer),
3954 bits_per_value: 32,
3955 num_values: fixed_width_data_block.num_values,
3956 block_info: BlockInfo::default(),
3957 });
3958 indices_data_block.compute_stat();
3961
3962 (indices_data_block, dictionary_data_block)
3963 }
3964 DataBlock::VariableWidth(ref mut variable_width_data_block) => {
3965 match variable_width_data_block.bits_per_offset {
3966 32 => {
3967 let mut map = HashMap::new();
3968 let offsets = variable_width_data_block
3969 .offsets
3970 .borrow_to_typed_slice::<u32>();
3971 let offsets = offsets.as_ref();
3972
3973 let max_len = variable_width_data_block.get_stat(Stat::MaxLength).expect(
3974 "VariableWidth DataBlock should have valid `Stat::DataSize` statistics",
3975 );
3976 let max_len = max_len.as_primitive::<UInt64Type>().value(0);
3977
3978 let mut dictionary_buffer: Vec<u8> =
3979 Vec::with_capacity((max_len * cardinality) as usize);
3980 let mut dictionary_offsets_buffer = vec![0];
3981 let mut curr_idx = 0;
3982 let mut indices_buffer =
3983 Vec::with_capacity(variable_width_data_block.num_values as usize);
3984
3985 offsets
3986 .iter()
3987 .zip(offsets.iter().skip(1))
3988 .for_each(|(&start, &end)| {
3989 let key =
3990 &variable_width_data_block.data[start as usize..end as usize];
3991 let idx: i32 = *map.entry(U8SliceKey(key)).or_insert_with(|| {
3992 dictionary_buffer.extend_from_slice(key);
3993 dictionary_offsets_buffer.push(dictionary_buffer.len() as u32);
3994 curr_idx += 1;
3995 curr_idx - 1
3996 });
3997 indices_buffer.push(idx);
3998 });
3999
4000 let dictionary_data_block = DataBlock::VariableWidth(VariableWidthBlock {
4001 data: LanceBuffer::reinterpret_vec(dictionary_buffer),
4002 offsets: LanceBuffer::reinterpret_vec(dictionary_offsets_buffer),
4003 bits_per_offset: 32,
4004 num_values: curr_idx as u64,
4005 block_info: BlockInfo::default(),
4006 });
4007
4008 let mut indices_data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
4009 data: LanceBuffer::reinterpret_vec(indices_buffer),
4010 bits_per_value: 32,
4011 num_values: variable_width_data_block.num_values,
4012 block_info: BlockInfo::default(),
4013 });
4014 indices_data_block.compute_stat();
4017
4018 (indices_data_block, dictionary_data_block)
4019 }
4020 64 => {
4021 let mut map = HashMap::new();
4022 let offsets = variable_width_data_block
4023 .offsets
4024 .borrow_to_typed_slice::<u64>();
4025 let offsets = offsets.as_ref();
4026
4027 let max_len = variable_width_data_block.get_stat(Stat::MaxLength).expect(
4028 "VariableWidth DataBlock should have valid `Stat::DataSize` statistics",
4029 );
4030 let max_len = max_len.as_primitive::<UInt64Type>().value(0);
4031
4032 let mut dictionary_buffer: Vec<u8> =
4033 Vec::with_capacity((max_len * cardinality) as usize);
4034 let mut dictionary_offsets_buffer = vec![0];
4035 let mut curr_idx = 0;
4036 let mut indices_buffer =
4037 Vec::with_capacity(variable_width_data_block.num_values as usize);
4038
4039 offsets
4040 .iter()
4041 .zip(offsets.iter().skip(1))
4042 .for_each(|(&start, &end)| {
4043 let key =
4044 &variable_width_data_block.data[start as usize..end as usize];
4045 let idx: i64 = *map.entry(U8SliceKey(key)).or_insert_with(|| {
4046 dictionary_buffer.extend_from_slice(key);
4047 dictionary_offsets_buffer.push(dictionary_buffer.len() as u64);
4048 curr_idx += 1;
4049 curr_idx - 1
4050 });
4051 indices_buffer.push(idx);
4052 });
4053
4054 let dictionary_data_block = DataBlock::VariableWidth(VariableWidthBlock {
4055 data: LanceBuffer::reinterpret_vec(dictionary_buffer),
4056 offsets: LanceBuffer::reinterpret_vec(dictionary_offsets_buffer),
4057 bits_per_offset: 64,
4058 num_values: curr_idx as u64,
4059 block_info: BlockInfo::default(),
4060 });
4061
4062 let mut indices_data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
4063 data: LanceBuffer::reinterpret_vec(indices_buffer),
4064 bits_per_value: 64,
4065 num_values: variable_width_data_block.num_values,
4066 block_info: BlockInfo::default(),
4067 });
4068 indices_data_block.compute_stat();
4071
4072 (indices_data_block, dictionary_data_block)
4073 }
4074 _ => {
4075 unreachable!()
4076 }
4077 }
4078 }
4079 _ => {
4080 unreachable!("dictionary encode called with data block {:?}", data_block)
4081 }
4082 }
4083 }
4084
4085 fn should_dictionary_encode(data_block: &DataBlock, field: &Field) -> bool {
4086 let too_small = env::var("LANCE_ENCODING_DICT_TOO_SMALL")
4088 .ok()
4089 .and_then(|val| val.parse().ok())
4090 .unwrap_or(100);
4091 if data_block.num_values() < too_small {
4092 return false;
4093 }
4094
4095 let divisor: u64 = field
4098 .metadata
4099 .get(DICT_DIVISOR_META_KEY)
4100 .map(|val| val.parse().ok())
4101 .unwrap_or_else(|| {
4102 env::var("LANCE_ENCODING_DICT_DIVISOR")
4103 .ok()
4104 .and_then(|val| val.parse().ok())
4105 })
4106 .unwrap_or(2);
4107
4108 let max_cardinality = env::var("LANCE_ENCODING_DICT_MAX_CARDINALITY")
4111 .ok()
4112 .and_then(|val| val.parse().ok())
4113 .unwrap_or(100000);
4114
4115 let threshold = (data_block.num_values() / divisor).min(max_cardinality);
4116
4117 let cardinality = if let Some(cardinality_array) = data_block.get_stat(Stat::Cardinality) {
4118 cardinality_array.as_primitive::<UInt64Type>().value(0)
4119 } else {
4120 u64::MAX
4121 };
4122
4123 cardinality < threshold
4124 }
4125
4126 fn do_flush(
4128 &mut self,
4129 arrays: Vec<ArrayRef>,
4130 repdefs: Vec<RepDefBuilder>,
4131 row_number: u64,
4132 num_rows: u64,
4133 ) -> Result<Vec<EncodeTask>> {
4134 let column_idx = self.column_index;
4135 let compression_strategy = self.compression_strategy.clone();
4136 let field = self.field.clone();
4137 let encoding_metadata = self.encoding_metadata.clone();
4138 let task = spawn_cpu(move || {
4139 let num_values = arrays.iter().map(|arr| arr.len() as u64).sum();
4140
4141 if num_values == 0 {
4142 return Self::encode_complex_all_null(column_idx, repdefs, row_number, num_rows);
4146 }
4147 let num_nulls = arrays
4148 .iter()
4149 .map(|arr| arr.logical_nulls().map(|n| n.null_count()).unwrap_or(0) as u64)
4150 .sum::<u64>();
4151
4152 if num_values == num_nulls {
4153 return if repdefs.iter().all(|rd| rd.is_simple_validity()) {
4154 log::debug!(
4155 "Encoding column {} with {} items using simple-null layout",
4156 column_idx,
4157 num_values
4158 );
4159 Self::encode_simple_all_null(column_idx, num_values, row_number)
4161 } else {
4162 Self::encode_complex_all_null(column_idx, repdefs, row_number, num_rows)
4164 };
4165 }
4166
4167 let data_block = DataBlock::from_arrays(&arrays, num_values);
4168
4169 if let DataBlock::Struct(ref struct_data_block) = data_block {
4171 if struct_data_block
4172 .children
4173 .iter()
4174 .any(|child| !matches!(child, DataBlock::FixedWidth(_)))
4175 {
4176 panic!("packed struct encoding currently only supports fixed-width fields.")
4177 }
4178 }
4179
4180 let data_block = data_block.remove_outer_validity();
4182
4183
4184 if Self::should_dictionary_encode(&data_block, &field) {
4185 log::debug!(
4186 "Encoding column {} with {} items using dictionary encoding (mini-block layout)",
4187 column_idx,
4188 num_values
4189 );
4190 let (indices_data_block, dictionary_data_block) =
4191 Self::dictionary_encode(data_block);
4192 Self::encode_miniblock(
4193 column_idx,
4194 &field,
4195 compression_strategy.as_ref(),
4196 indices_data_block,
4197 repdefs,
4198 row_number,
4199 Some(dictionary_data_block),
4200 num_rows,
4201 )
4202 } else if Self::prefers_miniblock(&data_block, encoding_metadata.as_ref()) {
4203 log::debug!(
4204 "Encoding column {} with {} items using mini-block layout",
4205 column_idx,
4206 num_values
4207 );
4208 Self::encode_miniblock(
4209 column_idx,
4210 &field,
4211 compression_strategy.as_ref(),
4212 data_block,
4213 repdefs,
4214 row_number,
4215 None,
4216 num_rows,
4217 )
4218 } else if Self::prefers_fullzip(encoding_metadata.as_ref()) {
4219 log::debug!(
4220 "Encoding column {} with {} items using full-zip layout",
4221 column_idx,
4222 num_values
4223 );
4224 Self::encode_full_zip(
4225 column_idx,
4226 &field,
4227 compression_strategy.as_ref(),
4228 data_block,
4229 repdefs,
4230 row_number,
4231 num_rows,
4232 )
4233 } else {
4234 Err(Error::InvalidInput { source: format!("Cannot determine structural encoding for field {}. This typically indicates an invalid value of the field metadata key {}", field.name, STRUCTURAL_ENCODING_META_KEY).into(), location: location!() })
4235 }
4236 })
4237 .boxed();
4238 Ok(vec![task])
4239 }
4240
4241 fn extract_validity_buf(
4242 array: &dyn Array,
4243 repdef: &mut RepDefBuilder,
4244 keep_original_array: bool,
4245 ) {
4246 if let Some(validity) = array.nulls() {
4247 if keep_original_array {
4248 repdef.add_validity_bitmap(validity.clone());
4249 } else {
4250 repdef.add_validity_bitmap(deep_copy_nulls(Some(validity)).unwrap());
4251 }
4252 } else {
4253 repdef.add_no_null(array.len());
4254 }
4255 }
4256
4257 fn extract_validity(array: &dyn Array, repdef: &mut RepDefBuilder, keep_original_array: bool) {
4258 match array.data_type() {
4259 DataType::Null => {
4260 repdef.add_validity_bitmap(NullBuffer::new(BooleanBuffer::new_unset(array.len())));
4261 }
4262 DataType::Dictionary(_, _) => {
4263 unreachable!()
4264 }
4265 _ => Self::extract_validity_buf(array, repdef, keep_original_array),
4274 }
4275 }
4276}
4277
4278impl FieldEncoder for PrimitiveStructuralEncoder {
4279 fn maybe_encode(
4281 &mut self,
4282 array: ArrayRef,
4283 _external_buffers: &mut OutOfLineBuffers,
4284 mut repdef: RepDefBuilder,
4285 row_number: u64,
4286 num_rows: u64,
4287 ) -> Result<Vec<EncodeTask>> {
4288 Self::extract_validity(array.as_ref(), &mut repdef, self.keep_original_array);
4289 self.accumulated_repdefs.push(repdef);
4290
4291 if let Some((arrays, row_number, num_rows)) =
4292 self.accumulation_queue.insert(array, row_number, num_rows)
4293 {
4294 let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
4295 Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
4296 } else {
4297 Ok(vec![])
4298 }
4299 }
4300
4301 fn flush(&mut self, _external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
4303 if let Some((arrays, row_number, num_rows)) = self.accumulation_queue.flush() {
4304 let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
4305 Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
4306 } else {
4307 Ok(vec![])
4308 }
4309 }
4310
4311 fn num_columns(&self) -> u32 {
4312 1
4313 }
4314
4315 fn finish(
4316 &mut self,
4317 _external_buffers: &mut OutOfLineBuffers,
4318 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
4319 std::future::ready(Ok(vec![EncodedColumn::default()])).boxed()
4320 }
4321}
4322
4323#[cfg(test)]
4324#[allow(clippy::single_range_in_vec_init)]
4325mod tests {
4326 use std::{collections::VecDeque, sync::Arc};
4327
4328 use crate::encodings::logical::primitive::{
4329 ChunkDrainInstructions, PrimitiveStructuralEncoder,
4330 };
4331 use arrow_array::{ArrayRef, Int8Array, StringArray};
4332
4333 use super::{
4334 ChunkInstructions, DataBlock, DecodeMiniBlockTask, FixedPerValueDecompressor,
4335 FixedWidthDataBlock, FullZipCacheableState, FullZipDecodeDetails, FullZipRepIndexDetails,
4336 FullZipScheduler, MiniBlockRepIndex, PerValueDecompressor, PreambleAction,
4337 StructuralPageScheduler,
4338 };
4339
4340 #[test]
4341 fn test_is_narrow() {
4342 let int8_array = Int8Array::from(vec![1, 2, 3]);
4343 let array_ref: ArrayRef = Arc::new(int8_array);
4344 let block = DataBlock::from_array(array_ref);
4345
4346 assert!(PrimitiveStructuralEncoder::is_narrow(&block));
4347
4348 let string_array = StringArray::from(vec![Some("hello"), Some("world")]);
4349 let block = DataBlock::from_array(string_array);
4350 assert!(PrimitiveStructuralEncoder::is_narrow(&block));
4351
4352 let string_array = StringArray::from(vec![
4353 Some("hello world".repeat(100)),
4354 Some("world".to_string()),
4355 ]);
4356 let block = DataBlock::from_array(string_array);
4357 assert!((!PrimitiveStructuralEncoder::is_narrow(&block)));
4358 }
4359
4360 #[test]
4361 fn test_map_range() {
4362 let rep = Some(vec![1, 0, 0, 1, 0, 1, 1, 0, 0]);
4365 let def = Some(vec![0, 0, 0, 0, 0, 1, 0, 0, 0]);
4366 let max_visible_def = 0;
4367 let total_items = 8;
4368 let max_rep = 1;
4369
4370 let check = |range, expected_item_range, expected_level_range| {
4371 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4372 range,
4373 rep.as_ref(),
4374 def.as_ref(),
4375 max_rep,
4376 max_visible_def,
4377 total_items,
4378 PreambleAction::Absent,
4379 );
4380 assert_eq!(item_range, expected_item_range);
4381 assert_eq!(level_range, expected_level_range);
4382 };
4383
4384 check(0..1, 0..3, 0..3);
4385 check(1..2, 3..5, 3..5);
4386 check(2..3, 5..5, 5..6);
4387 check(3..4, 5..8, 6..9);
4388 check(0..2, 0..5, 0..5);
4389 check(1..3, 3..5, 3..6);
4390 check(2..4, 5..8, 5..9);
4391 check(0..3, 0..5, 0..6);
4392 check(1..4, 3..8, 3..9);
4393 check(0..4, 0..8, 0..9);
4394
4395 let rep = Some(vec![1, 1, 0, 1]);
4398 let def = Some(vec![1, 0, 0, 0]);
4399 let max_visible_def = 0;
4400 let total_items = 3;
4401
4402 let check = |range, expected_item_range, expected_level_range| {
4403 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4404 range,
4405 rep.as_ref(),
4406 def.as_ref(),
4407 max_rep,
4408 max_visible_def,
4409 total_items,
4410 PreambleAction::Absent,
4411 );
4412 assert_eq!(item_range, expected_item_range);
4413 assert_eq!(level_range, expected_level_range);
4414 };
4415
4416 check(0..1, 0..0, 0..1);
4417 check(1..2, 0..2, 1..3);
4418 check(2..3, 2..3, 3..4);
4419 check(0..2, 0..2, 0..3);
4420 check(1..3, 0..3, 1..4);
4421 check(0..3, 0..3, 0..4);
4422
4423 let rep = Some(vec![1, 1, 0, 1]);
4426 let def = Some(vec![0, 0, 0, 1]);
4427 let max_visible_def = 0;
4428 let total_items = 3;
4429
4430 let check = |range, expected_item_range, expected_level_range| {
4431 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4432 range,
4433 rep.as_ref(),
4434 def.as_ref(),
4435 max_rep,
4436 max_visible_def,
4437 total_items,
4438 PreambleAction::Absent,
4439 );
4440 assert_eq!(item_range, expected_item_range);
4441 assert_eq!(level_range, expected_level_range);
4442 };
4443
4444 check(0..1, 0..1, 0..1);
4445 check(1..2, 1..3, 1..3);
4446 check(2..3, 3..3, 3..4);
4447 check(0..2, 0..3, 0..3);
4448 check(1..3, 1..3, 1..4);
4449 check(0..3, 0..3, 0..4);
4450
4451 let rep = Some(vec![1, 0, 1, 0, 1, 0]);
4454 let def: Option<&[u16]> = None;
4455 let max_visible_def = 0;
4456 let total_items = 6;
4457
4458 let check = |range, expected_item_range, expected_level_range| {
4459 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4460 range,
4461 rep.as_ref(),
4462 def.as_ref(),
4463 max_rep,
4464 max_visible_def,
4465 total_items,
4466 PreambleAction::Absent,
4467 );
4468 assert_eq!(item_range, expected_item_range);
4469 assert_eq!(level_range, expected_level_range);
4470 };
4471
4472 check(0..1, 0..2, 0..2);
4473 check(1..2, 2..4, 2..4);
4474 check(2..3, 4..6, 4..6);
4475 check(0..2, 0..4, 0..4);
4476 check(1..3, 2..6, 2..6);
4477 check(0..3, 0..6, 0..6);
4478
4479 let rep: Option<&[u16]> = None;
4482 let def = Some(vec![0, 0, 1, 0]);
4483 let max_visible_def = 1;
4484 let total_items = 4;
4485
4486 let check = |range, expected_item_range, expected_level_range| {
4487 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4488 range,
4489 rep.as_ref(),
4490 def.as_ref(),
4491 max_rep,
4492 max_visible_def,
4493 total_items,
4494 PreambleAction::Absent,
4495 );
4496 assert_eq!(item_range, expected_item_range);
4497 assert_eq!(level_range, expected_level_range);
4498 };
4499
4500 check(0..1, 0..1, 0..1);
4501 check(1..2, 1..2, 1..2);
4502 check(2..3, 2..3, 2..3);
4503 check(0..2, 0..2, 0..2);
4504 check(1..3, 1..3, 1..3);
4505 check(0..3, 0..3, 0..3);
4506
4507 let rep = Some(vec![0, 1, 0, 1]);
4512 let def = Some(vec![0, 0, 0, 1]);
4513 let max_visible_def = 0;
4514 let total_items = 3;
4515
4516 let check = |range, expected_item_range, expected_level_range| {
4517 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4518 range,
4519 rep.as_ref(),
4520 def.as_ref(),
4521 max_rep,
4522 max_visible_def,
4523 total_items,
4524 PreambleAction::Take,
4525 );
4526 assert_eq!(item_range, expected_item_range);
4527 assert_eq!(level_range, expected_level_range);
4528 };
4529
4530 check(0..1, 0..3, 0..3);
4532 check(0..2, 0..3, 0..4);
4533
4534 let check = |range, expected_item_range, expected_level_range| {
4535 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4536 range,
4537 rep.as_ref(),
4538 def.as_ref(),
4539 max_rep,
4540 max_visible_def,
4541 total_items,
4542 PreambleAction::Skip,
4543 );
4544 assert_eq!(item_range, expected_item_range);
4545 assert_eq!(level_range, expected_level_range);
4546 };
4547
4548 check(0..1, 1..3, 1..3);
4549 check(1..2, 3..3, 3..4);
4550 check(0..2, 1..3, 1..4);
4551
4552 let rep = Some(vec![0, 1, 1, 0]);
4557 let def = Some(vec![0, 1, 0, 0]);
4558 let max_visible_def = 0;
4559 let total_items = 4;
4560
4561 let check = |range, expected_item_range, expected_level_range| {
4562 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4563 range,
4564 rep.as_ref(),
4565 def.as_ref(),
4566 max_rep,
4567 max_visible_def,
4568 total_items,
4569 PreambleAction::Take,
4570 );
4571 assert_eq!(item_range, expected_item_range);
4572 assert_eq!(level_range, expected_level_range);
4573 };
4574
4575 check(0..1, 0..1, 0..2);
4577 check(0..2, 0..3, 0..4);
4578
4579 let check = |range, expected_item_range, expected_level_range| {
4580 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4581 range,
4582 rep.as_ref(),
4583 def.as_ref(),
4584 max_rep,
4585 max_visible_def,
4586 total_items,
4587 PreambleAction::Skip,
4588 );
4589 assert_eq!(item_range, expected_item_range);
4590 assert_eq!(level_range, expected_level_range);
4591 };
4592
4593 check(0..1, 1..1, 1..2);
4595 check(1..2, 1..3, 2..4);
4596 check(0..2, 1..3, 1..4);
4597
4598 let rep = Some(vec![0, 1, 0, 1]);
4601 let def: Option<Vec<u16>> = None;
4602 let max_visible_def = 0;
4603 let total_items = 4;
4604
4605 let check = |range, expected_item_range, expected_level_range| {
4606 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4607 range,
4608 rep.as_ref(),
4609 def.as_ref(),
4610 max_rep,
4611 max_visible_def,
4612 total_items,
4613 PreambleAction::Take,
4614 );
4615 assert_eq!(item_range, expected_item_range);
4616 assert_eq!(level_range, expected_level_range);
4617 };
4618
4619 check(0..1, 0..3, 0..3);
4621 check(0..2, 0..4, 0..4);
4622
4623 let check = |range, expected_item_range, expected_level_range| {
4624 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4625 range,
4626 rep.as_ref(),
4627 def.as_ref(),
4628 max_rep,
4629 max_visible_def,
4630 total_items,
4631 PreambleAction::Skip,
4632 );
4633 assert_eq!(item_range, expected_item_range);
4634 assert_eq!(level_range, expected_level_range);
4635 };
4636
4637 check(0..1, 1..3, 1..3);
4638 check(1..2, 3..4, 3..4);
4639 check(0..2, 1..4, 1..4);
4640 }
4641
4642 #[test]
4643 fn test_schedule_instructions() {
4644 let repetition_index = vec![vec![5, 2], vec![3, 0], vec![4, 7], vec![2, 0]];
4645 let repetition_index = MiniBlockRepIndex::decode(&repetition_index);
4646
4647 let check = |user_ranges, expected_instructions| {
4648 let instructions =
4649 ChunkInstructions::schedule_instructions(&repetition_index, user_ranges);
4650 assert_eq!(instructions, expected_instructions);
4651 };
4652
4653 let expected_take_all = vec![
4655 ChunkInstructions {
4656 chunk_idx: 0,
4657 preamble: PreambleAction::Absent,
4658 rows_to_skip: 0,
4659 rows_to_take: 5,
4660 take_trailer: true,
4661 },
4662 ChunkInstructions {
4663 chunk_idx: 1,
4664 preamble: PreambleAction::Take,
4665 rows_to_skip: 0,
4666 rows_to_take: 2,
4667 take_trailer: false,
4668 },
4669 ChunkInstructions {
4670 chunk_idx: 2,
4671 preamble: PreambleAction::Absent,
4672 rows_to_skip: 0,
4673 rows_to_take: 4,
4674 take_trailer: true,
4675 },
4676 ChunkInstructions {
4677 chunk_idx: 3,
4678 preamble: PreambleAction::Take,
4679 rows_to_skip: 0,
4680 rows_to_take: 1,
4681 take_trailer: false,
4682 },
4683 ];
4684
4685 check(&[0..14], expected_take_all.clone());
4687
4688 check(
4690 &[
4691 0..1,
4692 1..2,
4693 2..3,
4694 3..4,
4695 4..5,
4696 5..6,
4697 6..7,
4698 7..8,
4699 8..9,
4700 9..10,
4701 10..11,
4702 11..12,
4703 12..13,
4704 13..14,
4705 ],
4706 expected_take_all,
4707 );
4708
4709 check(
4713 &[0..1, 3..4],
4714 vec![
4715 ChunkInstructions {
4716 chunk_idx: 0,
4717 preamble: PreambleAction::Absent,
4718 rows_to_skip: 0,
4719 rows_to_take: 1,
4720 take_trailer: false,
4721 },
4722 ChunkInstructions {
4723 chunk_idx: 0,
4724 preamble: PreambleAction::Absent,
4725 rows_to_skip: 3,
4726 rows_to_take: 1,
4727 take_trailer: false,
4728 },
4729 ],
4730 );
4731
4732 check(
4734 &[5..6],
4735 vec![
4736 ChunkInstructions {
4737 chunk_idx: 0,
4738 preamble: PreambleAction::Absent,
4739 rows_to_skip: 5,
4740 rows_to_take: 0,
4741 take_trailer: true,
4742 },
4743 ChunkInstructions {
4744 chunk_idx: 1,
4745 preamble: PreambleAction::Take,
4746 rows_to_skip: 0,
4747 rows_to_take: 0,
4748 take_trailer: false,
4749 },
4750 ],
4751 );
4752
4753 check(
4755 &[7..10],
4756 vec![
4757 ChunkInstructions {
4758 chunk_idx: 1,
4759 preamble: PreambleAction::Skip,
4760 rows_to_skip: 1,
4761 rows_to_take: 1,
4762 take_trailer: false,
4763 },
4764 ChunkInstructions {
4765 chunk_idx: 2,
4766 preamble: PreambleAction::Absent,
4767 rows_to_skip: 0,
4768 rows_to_take: 2,
4769 take_trailer: false,
4770 },
4771 ],
4772 );
4773 }
4774
4775 #[test]
4776 fn test_drain_instructions() {
4777 fn drain_from_instructions(
4778 instructions: &mut VecDeque<ChunkInstructions>,
4779 mut rows_desired: u64,
4780 need_preamble: &mut bool,
4781 skip_in_chunk: &mut u64,
4782 ) -> Vec<ChunkDrainInstructions> {
4783 let mut drain_instructions = Vec::with_capacity(instructions.len());
4785 while rows_desired > 0 || *need_preamble {
4786 let (next_instructions, consumed_chunk) = instructions
4787 .front()
4788 .unwrap()
4789 .drain_from_instruction(&mut rows_desired, need_preamble, skip_in_chunk);
4790 if consumed_chunk {
4791 instructions.pop_front();
4792 }
4793 drain_instructions.push(next_instructions);
4794 }
4795 drain_instructions
4796 }
4797
4798 let repetition_index = vec![vec![5, 2], vec![3, 0], vec![4, 7], vec![2, 0]];
4799 let repetition_index = MiniBlockRepIndex::decode(&repetition_index);
4800 let user_ranges = vec![1..7, 10..14];
4801
4802 let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
4804
4805 let mut to_drain = VecDeque::from(scheduled.clone());
4806
4807 let mut need_preamble = false;
4810 let mut skip_in_chunk = 0;
4811
4812 let next_batch =
4813 drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
4814
4815 assert!(!need_preamble);
4816 assert_eq!(skip_in_chunk, 4);
4817 assert_eq!(
4818 next_batch,
4819 vec![ChunkDrainInstructions {
4820 chunk_instructions: scheduled[0].clone(),
4821 rows_to_take: 4,
4822 rows_to_skip: 0,
4823 preamble_action: PreambleAction::Absent,
4824 }]
4825 );
4826
4827 let next_batch =
4828 drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
4829
4830 assert!(!need_preamble);
4831 assert_eq!(skip_in_chunk, 2);
4832
4833 assert_eq!(
4834 next_batch,
4835 vec![
4836 ChunkDrainInstructions {
4837 chunk_instructions: scheduled[0].clone(),
4838 rows_to_take: 1,
4839 rows_to_skip: 4,
4840 preamble_action: PreambleAction::Absent,
4841 },
4842 ChunkDrainInstructions {
4843 chunk_instructions: scheduled[1].clone(),
4844 rows_to_take: 1,
4845 rows_to_skip: 0,
4846 preamble_action: PreambleAction::Take,
4847 },
4848 ChunkDrainInstructions {
4849 chunk_instructions: scheduled[2].clone(),
4850 rows_to_take: 2,
4851 rows_to_skip: 0,
4852 preamble_action: PreambleAction::Absent,
4853 }
4854 ]
4855 );
4856
4857 let next_batch =
4858 drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
4859
4860 assert!(!need_preamble);
4861 assert_eq!(skip_in_chunk, 0);
4862
4863 assert_eq!(
4864 next_batch,
4865 vec![
4866 ChunkDrainInstructions {
4867 chunk_instructions: scheduled[2].clone(),
4868 rows_to_take: 1,
4869 rows_to_skip: 2,
4870 preamble_action: PreambleAction::Absent,
4871 },
4872 ChunkDrainInstructions {
4873 chunk_instructions: scheduled[3].clone(),
4874 rows_to_take: 1,
4875 rows_to_skip: 0,
4876 preamble_action: PreambleAction::Take,
4877 },
4878 ]
4879 );
4880
4881 let repetition_index = vec![vec![5, 2], vec![3, 3], vec![20, 0]];
4883 let repetition_index = MiniBlockRepIndex::decode(&repetition_index);
4884 let user_ranges = vec![0..28];
4885
4886 let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
4888
4889 let mut to_drain = VecDeque::from(scheduled.clone());
4890
4891 let mut need_preamble = false;
4894 let mut skip_in_chunk = 0;
4895
4896 let next_batch =
4897 drain_from_instructions(&mut to_drain, 7, &mut need_preamble, &mut skip_in_chunk);
4898
4899 assert_eq!(
4900 next_batch,
4901 vec![
4902 ChunkDrainInstructions {
4903 chunk_instructions: scheduled[0].clone(),
4904 rows_to_take: 6,
4905 rows_to_skip: 0,
4906 preamble_action: PreambleAction::Absent,
4907 },
4908 ChunkDrainInstructions {
4909 chunk_instructions: scheduled[1].clone(),
4910 rows_to_take: 1,
4911 rows_to_skip: 0,
4912 preamble_action: PreambleAction::Take,
4913 },
4914 ]
4915 );
4916
4917 assert!(!need_preamble);
4918 assert_eq!(skip_in_chunk, 1);
4919
4920 let next_batch =
4923 drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
4924
4925 assert_eq!(
4926 next_batch,
4927 vec![
4928 ChunkDrainInstructions {
4929 chunk_instructions: scheduled[1].clone(),
4930 rows_to_take: 2,
4931 rows_to_skip: 1,
4932 preamble_action: PreambleAction::Skip,
4933 },
4934 ChunkDrainInstructions {
4935 chunk_instructions: scheduled[2].clone(),
4936 rows_to_take: 0,
4937 rows_to_skip: 0,
4938 preamble_action: PreambleAction::Take,
4939 },
4940 ]
4941 );
4942
4943 assert!(!need_preamble);
4944 assert_eq!(skip_in_chunk, 0);
4945 }
4946
4947 #[tokio::test]
4948 async fn test_fullzip_repetition_index_caching() {
4949 use crate::testing::SimulatedScheduler;
4950 use lance_core::cache::LanceCache;
4951
4952 #[derive(Debug)]
4954 struct TestFixedDecompressor;
4955
4956 impl FixedPerValueDecompressor for TestFixedDecompressor {
4957 fn decompress(
4958 &self,
4959 _data: FixedWidthDataBlock,
4960 _num_rows: u64,
4961 ) -> crate::Result<DataBlock> {
4962 unimplemented!("Test decompressor")
4963 }
4964
4965 fn bits_per_value(&self) -> u64 {
4966 32
4967 }
4968 }
4969
4970 let rows_in_page = 100u64;
4972 let bytes_per_value = 4u64;
4973 let _rep_index_size = (rows_in_page + 1) * bytes_per_value;
4974
4975 let mut rep_index_data = Vec::new();
4977 for i in 0..=rows_in_page {
4978 let offset = (i * 100) as u32; rep_index_data.extend_from_slice(&offset.to_le_bytes());
4980 }
4981
4982 let mut full_data = vec![0u8; 1000];
4984 full_data.extend_from_slice(&rep_index_data);
4985 full_data.extend_from_slice(&vec![0u8; 10000]); let data = bytes::Bytes::from(full_data);
4988 let io = Arc::new(SimulatedScheduler::new(data));
4989 let _cache = Arc::new(LanceCache::with_capacity(1024 * 1024));
4990
4991 let mut scheduler = FullZipScheduler {
4993 data_buf_position: 0,
4994 rep_index: Some(FullZipRepIndexDetails {
4995 buf_position: 1000,
4996 bytes_per_value,
4997 }),
4998 priority: 0,
4999 rows_in_page,
5000 bits_per_offset: 32,
5001 details: Arc::new(FullZipDecodeDetails {
5002 value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
5003 def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
5004 ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
5005 max_rep: 0,
5006 max_visible_def: 0,
5007 }),
5008 cached_state: None,
5009 enable_cache: true, };
5011
5012 let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
5014 let cached_data1 = scheduler.initialize(&io_dyn).await.unwrap();
5015
5016 let is_cached = cached_data1
5018 .clone()
5019 .as_arc_any()
5020 .downcast::<FullZipCacheableState>()
5021 .is_ok();
5022 assert!(
5023 is_cached,
5024 "Expected FullZipCacheableState, got NoCachedPageData"
5025 );
5026
5027 scheduler.load(&cached_data1);
5029
5030 assert!(
5032 scheduler.cached_state.is_some(),
5033 "cached_state should be populated after load"
5034 );
5035
5036 let cached_state = scheduler.cached_state.as_ref().unwrap();
5038
5039 let ranges = vec![0..10, 20..30];
5041 let result = scheduler.schedule_ranges_rep(
5042 &ranges,
5043 &io_dyn,
5044 FullZipRepIndexDetails {
5045 buf_position: 1000,
5046 bytes_per_value,
5047 },
5048 );
5049
5050 assert!(
5052 result.is_ok(),
5053 "schedule_ranges_rep should succeed with cached data"
5054 );
5055
5056 let mut scheduler2 = FullZipScheduler {
5058 data_buf_position: 0,
5059 rep_index: Some(FullZipRepIndexDetails {
5060 buf_position: 1000,
5061 bytes_per_value,
5062 }),
5063 priority: 0,
5064 rows_in_page,
5065 bits_per_offset: 32,
5066 details: scheduler.details.clone(),
5067 cached_state: None,
5068 enable_cache: true, };
5070
5071 scheduler2.load(&cached_data1);
5073 assert!(
5074 scheduler2.cached_state.is_some(),
5075 "Second scheduler should have cached_state after load"
5076 );
5077
5078 let cached_state2 = scheduler2.cached_state.as_ref().unwrap();
5080 assert!(
5081 Arc::ptr_eq(cached_state, cached_state2),
5082 "Both schedulers should share the same cached data"
5083 );
5084 }
5085
5086 #[tokio::test]
5087 async fn test_fullzip_cache_config_controls_caching() {
5088 use crate::testing::SimulatedScheduler;
5089
5090 #[derive(Debug)]
5092 struct TestFixedDecompressor;
5093
5094 impl FixedPerValueDecompressor for TestFixedDecompressor {
5095 fn decompress(
5096 &self,
5097 _data: FixedWidthDataBlock,
5098 _num_rows: u64,
5099 ) -> crate::Result<DataBlock> {
5100 unimplemented!("Test decompressor")
5101 }
5102
5103 fn bits_per_value(&self) -> u64 {
5104 32
5105 }
5106 }
5107
5108 let rows_in_page = 1000_u64;
5110 let bytes_per_value = 4_u64;
5111
5112 let rep_index_data = vec![0u8; ((rows_in_page + 1) * bytes_per_value) as usize];
5114 let value_data = vec![0u8; 4000]; let mut full_data = vec![0u8; 1000]; full_data.extend_from_slice(&rep_index_data);
5117 full_data.extend_from_slice(&value_data);
5118
5119 let data = bytes::Bytes::from(full_data);
5120 let io = Arc::new(SimulatedScheduler::new(data));
5121
5122 let mut scheduler_no_cache = FullZipScheduler {
5124 data_buf_position: 0,
5125 rep_index: Some(FullZipRepIndexDetails {
5126 buf_position: 1000,
5127 bytes_per_value,
5128 }),
5129 priority: 0,
5130 rows_in_page,
5131 bits_per_offset: 32,
5132 details: Arc::new(FullZipDecodeDetails {
5133 value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
5134 def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
5135 ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
5136 max_rep: 0,
5137 max_visible_def: 0,
5138 }),
5139 cached_state: None,
5140 enable_cache: false, };
5142
5143 let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
5144 let cached_data = scheduler_no_cache.initialize(&io_dyn).await.unwrap();
5145
5146 assert!(
5148 cached_data
5149 .as_arc_any()
5150 .downcast_ref::<super::NoCachedPageData>()
5151 .is_some(),
5152 "With enable_cache=false, should return NoCachedPageData"
5153 );
5154
5155 let mut scheduler_with_cache = FullZipScheduler {
5157 data_buf_position: 0,
5158 rep_index: Some(FullZipRepIndexDetails {
5159 buf_position: 1000,
5160 bytes_per_value,
5161 }),
5162 priority: 0,
5163 rows_in_page,
5164 bits_per_offset: 32,
5165 details: Arc::new(FullZipDecodeDetails {
5166 value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
5167 def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
5168 ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
5169 max_rep: 0,
5170 max_visible_def: 0,
5171 }),
5172 cached_state: None,
5173 enable_cache: true, };
5175
5176 let cached_data2 = scheduler_with_cache.initialize(&io_dyn).await.unwrap();
5177
5178 assert!(
5180 cached_data2
5181 .as_arc_any()
5182 .downcast_ref::<super::FullZipCacheableState>()
5183 .is_some(),
5184 "With enable_cache=true, should return FullZipCacheableState"
5185 );
5186 }
5187}