1use std::{
5 any::Any,
6 collections::{HashMap, VecDeque},
7 env,
8 fmt::Debug,
9 iter,
10 ops::Range,
11 sync::Arc,
12 vec,
13};
14
15use arrow::array::AsArray;
16use arrow_array::{make_array, types::UInt64Type, Array, ArrayRef, PrimitiveArray};
17use arrow_buffer::{BooleanBuffer, NullBuffer, ScalarBuffer};
18use arrow_schema::{DataType, Field as ArrowField};
19use futures::{future::BoxFuture, stream::FuturesOrdered, FutureExt, TryStreamExt};
20use itertools::Itertools;
21use lance_arrow::deepcopy::deep_copy_nulls;
22use lance_core::{
23 cache::{Context, DeepSizeOf},
24 datatypes::{
25 STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK,
26 },
27 error::Error,
28 utils::bit::pad_bytes,
29 utils::hash::U8SliceKey,
30};
31use log::trace;
32use snafu::location;
33
34use crate::{
35 compression::{
36 BlockDecompressor, CompressionStrategy, DecompressionStrategy, MiniBlockDecompressor,
37 },
38 data::{AllNullDataBlock, DataBlock, VariableWidthBlock},
39 utils::bytepack::BytepackedIntegerEncoder,
40};
41use crate::{
42 compression::{FixedPerValueDecompressor, VariablePerValueDecompressor},
43 encodings::logical::primitive::fullzip::PerValueDataBlock,
44};
45use crate::{
46 encodings::logical::primitive::miniblock::MiniBlockChunk, utils::bytepack::ByteUnpacker,
47};
48use crate::{
49 encodings::logical::primitive::miniblock::MiniBlockCompressed,
50 statistics::{ComputeStat, GetStat, Stat},
51};
52use crate::{
53 repdef::{
54 build_control_word_iterator, CompositeRepDefUnraveler, ControlWordIterator,
55 ControlWordParser, DefinitionInterpretation, RepDefSlicer,
56 },
57 utils::accumulation::AccumulationQueue,
58};
59use lance_core::{datatypes::Field, utils::tokio::spawn_cpu, Result};
60
61use crate::{
62 buffer::LanceBuffer,
63 data::{BlockInfo, DataBlockBuilder, FixedWidthDataBlock},
64 decoder::{
65 ColumnInfo, DecodePageTask, DecodedArray, DecodedPage, FilterExpression, LoadedPage,
66 MessageType, PageEncoding, PageInfo, ScheduledScanLine, SchedulerContext,
67 StructuralDecodeArrayTask, StructuralFieldDecoder, StructuralFieldScheduler,
68 StructuralPageDecoder, StructuralSchedulingJob, UnloadedPage,
69 },
70 encoder::{
71 EncodeTask, EncodedColumn, EncodedPage, EncodingOptions, FieldEncoder, OutOfLineBuffers,
72 },
73 format::{pb, ProtobufUtils},
74 repdef::{LevelBuffer, RepDefBuilder, RepDefUnraveler},
75 EncodingsIo,
76};
77
78pub mod fullzip;
79pub mod miniblock;
80
81const FILL_BYTE: u8 = 0xFE;
82
83trait StructuralPageScheduler: std::fmt::Debug + Send {
86 fn initialize<'a>(
88 &'a mut self,
89 io: &Arc<dyn EncodingsIo>,
90 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>>;
91 fn load(&mut self, data: &Arc<dyn CachedPageData>);
93 fn schedule_ranges(
95 &self,
96 ranges: &[Range<u64>],
97 io: &Arc<dyn EncodingsIo>,
98 ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>>;
99}
100
101#[derive(Debug)]
103struct ChunkMeta {
104 num_values: u64,
105 chunk_size_bytes: u64,
106 offset_bytes: u64,
107}
108
109#[derive(Debug)]
111struct DecodedMiniBlockChunk {
112 rep: Option<ScalarBuffer<u16>>,
113 def: Option<ScalarBuffer<u16>>,
114 values: DataBlock,
115}
116
117#[derive(Debug)]
125struct DecodeMiniBlockTask {
126 rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
127 def_decompressor: Option<Arc<dyn BlockDecompressor>>,
128 value_decompressor: Arc<dyn MiniBlockDecompressor>,
129 dictionary_data: Option<Arc<DataBlock>>,
130 def_meaning: Arc<[DefinitionInterpretation]>,
131 num_buffers: u64,
132 max_visible_level: u16,
133 instructions: Vec<(ChunkDrainInstructions, LoadedChunk)>,
134}
135
136impl DecodeMiniBlockTask {
137 fn decode_levels(
138 rep_decompressor: &dyn BlockDecompressor,
139 levels: LanceBuffer,
140 num_levels: u16,
141 ) -> Result<ScalarBuffer<u16>> {
142 let rep = rep_decompressor.decompress(levels, num_levels as u64)?;
143 let mut rep = rep.as_fixed_width().unwrap();
144 debug_assert_eq!(rep.num_values, num_levels as u64);
145 debug_assert_eq!(rep.bits_per_value, 16);
146 Ok(rep.data.borrow_to_typed_slice::<u16>())
147 }
148
149 fn extend_levels(
156 range: Range<u64>,
157 levels: &mut Option<LevelBuffer>,
158 level_buf: &Option<impl AsRef<[u16]>>,
159 dest_offset: usize,
160 ) {
161 if let Some(level_buf) = level_buf {
162 if levels.is_none() {
163 let mut new_levels_vec =
166 LevelBuffer::with_capacity(dest_offset + (range.end - range.start) as usize);
167 new_levels_vec.extend(iter::repeat_n(0, dest_offset));
168 *levels = Some(new_levels_vec);
169 }
170 levels.as_mut().unwrap().extend(
171 level_buf.as_ref()[range.start as usize..range.end as usize]
172 .iter()
173 .copied(),
174 );
175 } else if let Some(levels) = levels {
176 let num_values = (range.end - range.start) as usize;
177 levels.extend(iter::repeat_n(0, num_values));
180 }
181 }
182
183 fn map_range(
220 range: Range<u64>,
221 rep: Option<&impl AsRef<[u16]>>,
222 def: Option<&impl AsRef<[u16]>>,
223 max_rep: u16,
224 max_visible_def: u16,
225 total_items: u64,
228 preamble_action: PreambleAction,
229 ) -> (Range<u64>, Range<u64>) {
230 if let Some(rep) = rep {
231 let mut rep = rep.as_ref();
232 let mut items_in_preamble = 0;
235 let first_row_start = match preamble_action {
236 PreambleAction::Skip | PreambleAction::Take => {
237 let first_row_start = if let Some(def) = def.as_ref() {
238 let mut first_row_start = None;
239 for (idx, (rep, def)) in rep.iter().zip(def.as_ref()).enumerate() {
240 if *rep == max_rep {
241 first_row_start = Some(idx);
242 break;
243 }
244 if *def <= max_visible_def {
245 items_in_preamble += 1;
246 }
247 }
248 first_row_start
249 } else {
250 let first_row_start = rep.iter().position(|&r| r == max_rep);
251 items_in_preamble = first_row_start.unwrap_or(rep.len());
252 first_row_start
253 };
254 if first_row_start.is_none() {
257 assert!(preamble_action == PreambleAction::Take);
258 return (0..total_items, 0..rep.len() as u64);
259 }
260 let first_row_start = first_row_start.unwrap() as u64;
261 rep = &rep[first_row_start as usize..];
262 first_row_start
263 }
264 PreambleAction::Absent => {
265 debug_assert!(rep[0] == max_rep);
266 0
267 }
268 };
269
270 if range.start == range.end {
272 debug_assert!(preamble_action == PreambleAction::Take);
273 return (0..items_in_preamble as u64, 0..first_row_start);
274 }
275 assert!(range.start < range.end);
276
277 let mut rows_seen = 0;
278 let mut new_start = 0;
279 let mut new_levels_start = 0;
280
281 if let Some(def) = def {
282 let def = &def.as_ref()[first_row_start as usize..];
283
284 let mut lead_invis_seen = 0;
286
287 if range.start > 0 {
288 if def[0] > max_visible_def {
289 lead_invis_seen += 1;
290 }
291 for (idx, (rep, def)) in rep.iter().zip(def).skip(1).enumerate() {
292 if *rep == max_rep {
293 rows_seen += 1;
294 if rows_seen == range.start {
295 new_start = idx as u64 + 1 - lead_invis_seen;
296 new_levels_start = idx as u64 + 1;
297 break;
298 }
299 if *def > max_visible_def {
300 lead_invis_seen += 1;
301 }
302 }
303 }
304 }
305
306 rows_seen += 1;
307
308 let mut new_end = u64::MAX;
309 let mut new_levels_end = rep.len() as u64;
310 let new_start_is_visible = def[new_levels_start as usize] <= max_visible_def;
311 let mut tail_invis_seen = if new_start_is_visible { 0 } else { 1 };
312 for (idx, (rep, def)) in rep[(new_levels_start + 1) as usize..]
313 .iter()
314 .zip(&def[(new_levels_start + 1) as usize..])
315 .enumerate()
316 {
317 if *rep == max_rep {
318 rows_seen += 1;
319 if rows_seen == range.end + 1 {
320 new_end = idx as u64 + new_start + 1 - tail_invis_seen;
321 new_levels_end = idx as u64 + new_levels_start + 1;
322 break;
323 }
324 if *def > max_visible_def {
325 tail_invis_seen += 1;
326 }
327 }
328 }
329
330 if new_end == u64::MAX {
331 new_levels_end = rep.len() as u64;
332 let total_invis_seen = lead_invis_seen + tail_invis_seen;
334 new_end = rep.len() as u64 - total_invis_seen;
335 }
336
337 assert_ne!(new_end, u64::MAX);
338
339 if preamble_action == PreambleAction::Skip {
341 new_start += first_row_start;
344 new_end += first_row_start;
345 new_levels_start += first_row_start;
346 new_levels_end += first_row_start;
347 } else if preamble_action == PreambleAction::Take {
348 debug_assert_eq!(new_start, 0);
349 debug_assert_eq!(new_levels_start, 0);
350 new_end += first_row_start;
351 new_levels_end += first_row_start;
352 }
353
354 (new_start..new_end, new_levels_start..new_levels_end)
355 } else {
356 if range.start > 0 {
362 for (idx, rep) in rep.iter().skip(1).enumerate() {
363 if *rep == max_rep {
364 rows_seen += 1;
365 if rows_seen == range.start {
366 new_start = idx as u64 + 1;
367 break;
368 }
369 }
370 }
371 }
372 let mut new_end = rep.len() as u64;
373 if range.end < total_items {
375 for (idx, rep) in rep[(new_start + 1) as usize..].iter().enumerate() {
376 if *rep == max_rep {
377 rows_seen += 1;
378 if rows_seen == range.end {
379 new_end = idx as u64 + new_start + 1;
380 break;
381 }
382 }
383 }
384 }
385
386 if preamble_action == PreambleAction::Skip {
388 new_start += first_row_start;
389 new_end += first_row_start;
390 } else if preamble_action == PreambleAction::Take {
391 debug_assert_eq!(new_start, 0);
392 new_end += first_row_start;
393 }
394
395 (new_start..new_end, new_start..new_end)
396 }
397 } else {
398 (range.clone(), range)
401 }
402 }
403
404 fn decode_miniblock_chunk(
406 &self,
407 buf: &LanceBuffer,
408 items_in_chunk: u64,
409 ) -> Result<DecodedMiniBlockChunk> {
410 let mut offset = 0;
411 let num_levels = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
412 offset += 2;
413
414 let rep_size = if self.rep_decompressor.is_some() {
415 let rep_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
416 offset += 2;
417 Some(rep_size)
418 } else {
419 None
420 };
421 let def_size = if self.def_decompressor.is_some() {
422 let def_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
423 offset += 2;
424 Some(def_size)
425 } else {
426 None
427 };
428 let buffer_sizes = (0..self.num_buffers)
429 .map(|_| {
430 let size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
431 offset += 2;
432 size
433 })
434 .collect::<Vec<_>>();
435
436 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
437
438 let rep = rep_size.map(|rep_size| {
439 let rep = buf.slice_with_length(offset, rep_size as usize);
440 offset += rep_size as usize;
441 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
442 rep
443 });
444
445 let def = def_size.map(|def_size| {
446 let def = buf.slice_with_length(offset, def_size as usize);
447 offset += def_size as usize;
448 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
449 def
450 });
451
452 let buffers = buffer_sizes
453 .into_iter()
454 .map(|buf_size| {
455 let buf = buf.slice_with_length(offset, buf_size as usize);
456 offset += buf_size as usize;
457 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
458 buf
459 })
460 .collect::<Vec<_>>();
461
462 let values = self
463 .value_decompressor
464 .decompress(buffers, items_in_chunk)?;
465
466 let rep = rep
467 .map(|rep| {
468 Self::decode_levels(
469 self.rep_decompressor.as_ref().unwrap().as_ref(),
470 rep,
471 num_levels,
472 )
473 })
474 .transpose()?;
475 let def = def
476 .map(|def| {
477 Self::decode_levels(
478 self.def_decompressor.as_ref().unwrap().as_ref(),
479 def,
480 num_levels,
481 )
482 })
483 .transpose()?;
484
485 Ok(DecodedMiniBlockChunk { rep, def, values })
486 }
487}
488
489impl DecodePageTask for DecodeMiniBlockTask {
490 fn decode(self: Box<Self>) -> Result<DecodedPage> {
491 let mut repbuf: Option<LevelBuffer> = None;
493 let mut defbuf: Option<LevelBuffer> = None;
494
495 let max_rep = self.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
496
497 let estimated_size_bytes = self
499 .instructions
500 .iter()
501 .map(|(_, chunk)| chunk.data.len())
502 .sum::<usize>()
503 * 2;
504 let mut data_builder =
505 DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
506
507 let mut level_offset = 0;
509 for (instructions, chunk) in self.instructions.iter() {
511 let DecodedMiniBlockChunk { rep, def, values } =
515 self.decode_miniblock_chunk(&chunk.data, chunk.items_in_chunk)?;
516
517 let row_range_start =
519 instructions.rows_to_skip + instructions.chunk_instructions.rows_to_skip;
520 let row_range_end = row_range_start + instructions.rows_to_take;
521
522 let (item_range, level_range) = Self::map_range(
524 row_range_start..row_range_end,
525 rep.as_ref(),
526 def.as_ref(),
527 max_rep,
528 self.max_visible_level,
529 chunk.items_in_chunk,
530 instructions.preamble_action,
531 );
532
533 Self::extend_levels(level_range.clone(), &mut repbuf, &rep, level_offset);
535 Self::extend_levels(level_range.clone(), &mut defbuf, &def, level_offset);
536 level_offset += (level_range.end - level_range.start) as usize;
537 data_builder.append(&values, item_range);
538 }
539
540 let data = data_builder.finish();
541
542 let unraveler = RepDefUnraveler::new(repbuf, defbuf, self.def_meaning.clone());
543
544 if let Some(dictionary) = &self.dictionary_data {
546 let estimated_size_bytes = dictionary.data_size()
548 * (data.num_values() + dictionary.num_values() - 1)
549 / dictionary.num_values();
550 let mut data_builder = DataBlockBuilder::with_capacity_estimate(estimated_size_bytes);
551
552 if let DataBlock::FixedWidth(mut fixed_width_data_block) = data {
554 let indices = fixed_width_data_block.data.borrow_to_typed_slice::<i32>();
555 let indices = indices.as_ref();
556
557 indices.iter().for_each(|&idx| {
558 data_builder.append(dictionary, idx as u64..idx as u64 + 1);
559 });
560
561 let data = data_builder.finish();
562 return Ok(DecodedPage {
563 data,
564 repdef: unraveler,
565 });
566 }
567 }
568
569 Ok(DecodedPage {
570 data,
571 repdef: unraveler,
572 })
573 }
574}
575
576#[derive(Debug)]
579struct LoadedChunk {
580 data: LanceBuffer,
581 items_in_chunk: u64,
582 byte_range: Range<u64>,
583 chunk_idx: usize,
584}
585
586impl Clone for LoadedChunk {
587 fn clone(&self) -> Self {
588 Self {
589 data: self.data.try_clone().unwrap(),
591 items_in_chunk: self.items_in_chunk,
592 byte_range: self.byte_range.clone(),
593 chunk_idx: self.chunk_idx,
594 }
595 }
596}
597
598#[derive(Debug)]
601struct MiniBlockDecoder {
602 rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
603 def_decompressor: Option<Arc<dyn BlockDecompressor>>,
604 value_decompressor: Arc<dyn MiniBlockDecompressor>,
605 def_meaning: Arc<[DefinitionInterpretation]>,
606 loaded_chunks: VecDeque<LoadedChunk>,
607 instructions: VecDeque<ChunkInstructions>,
608 offset_in_current_chunk: u64,
609 num_rows: u64,
610 num_buffers: u64,
611 dictionary: Option<Arc<DataBlock>>,
612}
613
614impl StructuralPageDecoder for MiniBlockDecoder {
617 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
618 let mut items_desired = num_rows;
619 let mut need_preamble = false;
620 let mut skip_in_chunk = self.offset_in_current_chunk;
621 let mut drain_instructions = Vec::new();
622 while items_desired > 0 || need_preamble {
623 let (instructions, consumed) = self
624 .instructions
625 .front()
626 .unwrap()
627 .drain_from_instruction(&mut items_desired, &mut need_preamble, &mut skip_in_chunk);
628
629 while self.loaded_chunks.front().unwrap().chunk_idx
630 != instructions.chunk_instructions.chunk_idx
631 {
632 self.loaded_chunks.pop_front();
633 }
634 drain_instructions.push((instructions, self.loaded_chunks.front().unwrap().clone()));
635 if consumed {
636 self.instructions.pop_front();
637 }
638 }
639 self.offset_in_current_chunk = skip_in_chunk;
642
643 let max_visible_level = self
644 .def_meaning
645 .iter()
646 .take_while(|l| !l.is_list())
647 .map(|l| l.num_def_levels())
648 .sum::<u16>();
649
650 Ok(Box::new(DecodeMiniBlockTask {
651 instructions: drain_instructions,
652 def_decompressor: self.def_decompressor.clone(),
653 rep_decompressor: self.rep_decompressor.clone(),
654 value_decompressor: self.value_decompressor.clone(),
655 dictionary_data: self.dictionary.clone(),
656 def_meaning: self.def_meaning.clone(),
657 num_buffers: self.num_buffers,
658 max_visible_level,
659 }))
660 }
661
662 fn num_rows(&self) -> u64 {
663 self.num_rows
664 }
665}
666
667#[derive(Debug)]
668struct CachedComplexAllNullState {
669 rep: Option<ScalarBuffer<u16>>,
670 def: Option<ScalarBuffer<u16>>,
671}
672
673impl DeepSizeOf for CachedComplexAllNullState {
674 fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
675 self.rep.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
676 + self.def.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
677 }
678}
679
680impl CachedPageData for CachedComplexAllNullState {
681 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
682 self
683 }
684}
685
686#[derive(Debug)]
695pub struct ComplexAllNullScheduler {
696 buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
698 def_meaning: Arc<[DefinitionInterpretation]>,
699 repdef: Option<Arc<CachedComplexAllNullState>>,
700}
701
702impl ComplexAllNullScheduler {
703 pub fn new(
704 buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
705 def_meaning: Arc<[DefinitionInterpretation]>,
706 ) -> Self {
707 Self {
708 buffer_offsets_and_sizes,
709 def_meaning,
710 repdef: None,
711 }
712 }
713}
714
715impl StructuralPageScheduler for ComplexAllNullScheduler {
716 fn initialize<'a>(
717 &'a mut self,
718 io: &Arc<dyn EncodingsIo>,
719 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
720 let (rep_pos, rep_size) = self.buffer_offsets_and_sizes[0];
722 let (def_pos, def_size) = self.buffer_offsets_and_sizes[1];
723 let has_rep = rep_size > 0;
724 let has_def = def_size > 0;
725
726 let mut reads = Vec::with_capacity(2);
727 if has_rep {
728 reads.push(rep_pos..rep_pos + rep_size);
729 }
730 if has_def {
731 reads.push(def_pos..def_pos + def_size);
732 }
733
734 let data = io.submit_request(reads, 0);
735
736 async move {
737 let data = data.await?;
738 let mut data_iter = data.into_iter();
739
740 let rep = if has_rep {
741 let rep = data_iter.next().unwrap();
742 let mut rep = LanceBuffer::from_bytes(rep, 2);
743 let rep = rep.borrow_to_typed_slice::<u16>();
744 Some(rep)
745 } else {
746 None
747 };
748
749 let def = if has_def {
750 let def = data_iter.next().unwrap();
751 let mut def = LanceBuffer::from_bytes(def, 2);
752 let def = def.borrow_to_typed_slice::<u16>();
753 Some(def)
754 } else {
755 None
756 };
757
758 let repdef = Arc::new(CachedComplexAllNullState { rep, def });
759
760 self.repdef = Some(repdef.clone());
761
762 Ok(repdef as Arc<dyn CachedPageData>)
763 }
764 .boxed()
765 }
766
767 fn load(&mut self, data: &Arc<dyn CachedPageData>) {
768 self.repdef = Some(
769 data.clone()
770 .as_arc_any()
771 .downcast::<CachedComplexAllNullState>()
772 .unwrap(),
773 );
774 }
775
776 fn schedule_ranges(
777 &self,
778 ranges: &[Range<u64>],
779 _io: &Arc<dyn EncodingsIo>,
780 ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
781 let ranges = VecDeque::from_iter(ranges.iter().cloned());
782 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
783 Ok(std::future::ready(Ok(Box::new(ComplexAllNullPageDecoder {
784 ranges,
785 rep: self.repdef.as_ref().unwrap().rep.clone(),
786 def: self.repdef.as_ref().unwrap().def.clone(),
787 num_rows,
788 def_meaning: self.def_meaning.clone(),
789 }) as Box<dyn StructuralPageDecoder>))
790 .boxed())
791 }
792}
793
794#[derive(Debug)]
795pub struct ComplexAllNullPageDecoder {
796 ranges: VecDeque<Range<u64>>,
797 rep: Option<ScalarBuffer<u16>>,
798 def: Option<ScalarBuffer<u16>>,
799 num_rows: u64,
800 def_meaning: Arc<[DefinitionInterpretation]>,
801}
802
803impl ComplexAllNullPageDecoder {
804 fn drain_ranges(&mut self, num_rows: u64) -> Vec<Range<u64>> {
805 let mut rows_desired = num_rows;
806 let mut ranges = Vec::with_capacity(self.ranges.len());
807 while rows_desired > 0 {
808 let front = self.ranges.front_mut().unwrap();
809 let avail = front.end - front.start;
810 if avail > rows_desired {
811 ranges.push(front.start..front.start + rows_desired);
812 front.start += rows_desired;
813 rows_desired = 0;
814 } else {
815 ranges.push(self.ranges.pop_front().unwrap());
816 rows_desired -= avail;
817 }
818 }
819 ranges
820 }
821}
822
823impl StructuralPageDecoder for ComplexAllNullPageDecoder {
824 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
825 let drained_ranges = self.drain_ranges(num_rows);
826 Ok(Box::new(DecodeComplexAllNullTask {
827 ranges: drained_ranges,
828 rep: self.rep.clone(),
829 def: self.def.clone(),
830 def_meaning: self.def_meaning.clone(),
831 }))
832 }
833
834 fn num_rows(&self) -> u64 {
835 self.num_rows
836 }
837}
838
839#[derive(Debug)]
842pub struct DecodeComplexAllNullTask {
843 ranges: Vec<Range<u64>>,
844 rep: Option<ScalarBuffer<u16>>,
845 def: Option<ScalarBuffer<u16>>,
846 def_meaning: Arc<[DefinitionInterpretation]>,
847}
848
849impl DecodeComplexAllNullTask {
850 fn decode_level(
851 &self,
852 levels: &Option<ScalarBuffer<u16>>,
853 num_values: u64,
854 ) -> Option<Vec<u16>> {
855 levels.as_ref().map(|levels| {
856 let mut referenced_levels = Vec::with_capacity(num_values as usize);
857 for range in &self.ranges {
858 referenced_levels.extend(
859 levels[range.start as usize..range.end as usize]
860 .iter()
861 .copied(),
862 );
863 }
864 referenced_levels
865 })
866 }
867}
868
869impl DecodePageTask for DecodeComplexAllNullTask {
870 fn decode(self: Box<Self>) -> Result<DecodedPage> {
871 let num_values = self.ranges.iter().map(|r| r.end - r.start).sum::<u64>();
872 let data = DataBlock::AllNull(AllNullDataBlock { num_values });
873 let rep = self.decode_level(&self.rep, num_values);
874 let def = self.decode_level(&self.def, num_values);
875 let unraveler = RepDefUnraveler::new(rep, def, self.def_meaning);
876 Ok(DecodedPage {
877 data,
878 repdef: unraveler,
879 })
880 }
881}
882
883#[derive(Debug, Default)]
888pub struct SimpleAllNullScheduler {}
889
890impl StructuralPageScheduler for SimpleAllNullScheduler {
891 fn initialize<'a>(
892 &'a mut self,
893 _io: &Arc<dyn EncodingsIo>,
894 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
895 std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
896 }
897
898 fn load(&mut self, _cache: &Arc<dyn CachedPageData>) {}
899
900 fn schedule_ranges(
901 &self,
902 ranges: &[Range<u64>],
903 _io: &Arc<dyn EncodingsIo>,
904 ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
905 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
906 Ok(std::future::ready(Ok(
907 Box::new(SimpleAllNullPageDecoder { num_rows }) as Box<dyn StructuralPageDecoder>
908 ))
909 .boxed())
910 }
911}
912
913#[derive(Debug)]
916struct SimpleAllNullDecodePageTask {
917 num_values: u64,
918}
919impl DecodePageTask for SimpleAllNullDecodePageTask {
920 fn decode(self: Box<Self>) -> Result<DecodedPage> {
921 let unraveler = RepDefUnraveler::new(
922 None,
923 Some(vec![1; self.num_values as usize]),
924 Arc::new([DefinitionInterpretation::NullableItem]),
925 );
926 Ok(DecodedPage {
927 data: DataBlock::AllNull(AllNullDataBlock {
928 num_values: self.num_values,
929 }),
930 repdef: unraveler,
931 })
932 }
933}
934
935#[derive(Debug)]
936pub struct SimpleAllNullPageDecoder {
937 num_rows: u64,
938}
939
940impl StructuralPageDecoder for SimpleAllNullPageDecoder {
941 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
942 Ok(Box::new(SimpleAllNullDecodePageTask {
943 num_values: num_rows,
944 }))
945 }
946
947 fn num_rows(&self) -> u64 {
948 self.num_rows
949 }
950}
951
952#[derive(Debug, Clone)]
953struct MiniBlockSchedulerDictionary {
954 dictionary_decompressor: Arc<dyn BlockDecompressor>,
956 dictionary_buf_position_and_size: (u64, u64),
957 dictionary_data_alignment: u64,
958 num_dictionary_items: u64,
959}
960
961#[derive(Debug)]
962struct RepIndexBlock {
963 first_row: u64,
967 starts_including_trailer: u64,
970 has_preamble: bool,
972 has_trailer: bool,
974}
975
976impl DeepSizeOf for RepIndexBlock {
977 fn deep_size_of_children(&self, _context: &mut Context) -> usize {
978 0
979 }
980}
981
982#[derive(Debug)]
983struct RepetitionIndex {
984 blocks: Vec<RepIndexBlock>,
985}
986
987impl DeepSizeOf for RepetitionIndex {
988 fn deep_size_of_children(&self, context: &mut Context) -> usize {
989 self.blocks.deep_size_of_children(context)
990 }
991}
992
993impl RepetitionIndex {
994 fn decode(rep_index: &[Vec<u64>]) -> Self {
995 let mut chunk_has_preamble = false;
996 let mut offset = 0;
997 let mut blocks = Vec::with_capacity(rep_index.len());
998 for chunk_rep in rep_index {
999 let ends_count = chunk_rep[0];
1000 let partial_count = chunk_rep[1];
1001
1002 let chunk_has_trailer = partial_count > 0;
1003 let mut starts_including_trailer = ends_count;
1004 if chunk_has_trailer {
1005 starts_including_trailer += 1;
1006 }
1007 if chunk_has_preamble {
1008 starts_including_trailer -= 1;
1009 }
1010
1011 blocks.push(RepIndexBlock {
1012 first_row: offset,
1013 starts_including_trailer,
1014 has_preamble: chunk_has_preamble,
1015 has_trailer: chunk_has_trailer,
1016 });
1017
1018 chunk_has_preamble = chunk_has_trailer;
1019 offset += starts_including_trailer;
1020 }
1021
1022 Self { blocks }
1023 }
1024}
1025
1026#[derive(Debug)]
1028struct MiniBlockCacheableState {
1029 chunk_meta: Vec<ChunkMeta>,
1031 rep_index: RepetitionIndex,
1033 dictionary: Option<Arc<DataBlock>>,
1035}
1036
1037impl DeepSizeOf for MiniBlockCacheableState {
1038 fn deep_size_of_children(&self, context: &mut Context) -> usize {
1039 self.rep_index.deep_size_of_children(context)
1040 + self
1041 .dictionary
1042 .as_ref()
1043 .map(|dict| dict.data_size() as usize)
1044 .unwrap_or(0)
1045 }
1046}
1047
1048impl CachedPageData for MiniBlockCacheableState {
1049 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
1050 self
1051 }
1052}
1053
1054#[derive(Debug)]
1081pub struct MiniBlockScheduler {
1082 buffer_offsets_and_sizes: Vec<(u64, u64)>,
1084 priority: u64,
1085 items_in_page: u64,
1086 repetition_index_depth: u16,
1087 num_buffers: u64,
1088 rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
1089 def_decompressor: Option<Arc<dyn BlockDecompressor>>,
1090 value_decompressor: Arc<dyn MiniBlockDecompressor>,
1091 def_meaning: Arc<[DefinitionInterpretation]>,
1092 dictionary: Option<MiniBlockSchedulerDictionary>,
1093 page_meta: Option<Arc<MiniBlockCacheableState>>,
1095}
1096
1097impl MiniBlockScheduler {
1098 fn try_new(
1099 buffer_offsets_and_sizes: &[(u64, u64)],
1100 priority: u64,
1101 items_in_page: u64,
1102 layout: &pb::MiniBlockLayout,
1103 decompressors: &dyn DecompressionStrategy,
1104 ) -> Result<Self> {
1105 let rep_decompressor = layout
1106 .rep_compression
1107 .as_ref()
1108 .map(|rep_compression| {
1109 decompressors
1110 .create_block_decompressor(rep_compression)
1111 .map(Arc::from)
1112 })
1113 .transpose()?;
1114 let def_decompressor = layout
1115 .def_compression
1116 .as_ref()
1117 .map(|def_compression| {
1118 decompressors
1119 .create_block_decompressor(def_compression)
1120 .map(Arc::from)
1121 })
1122 .transpose()?;
1123 let def_meaning = layout
1124 .layers
1125 .iter()
1126 .map(|l| ProtobufUtils::repdef_layer_to_def_interp(*l))
1127 .collect::<Vec<_>>();
1128 let value_decompressor = decompressors
1129 .create_miniblock_decompressor(layout.value_compression.as_ref().unwrap())?;
1130
1131 let dictionary = if let Some(dictionary_encoding) = layout.dictionary.as_ref() {
1132 let num_dictionary_items = layout.num_dictionary_items;
1133 match dictionary_encoding.array_encoding.as_ref().unwrap() {
1134 pb::array_encoding::ArrayEncoding::Variable(_) => {
1135 Some(MiniBlockSchedulerDictionary {
1136 dictionary_decompressor: decompressors
1137 .create_block_decompressor(dictionary_encoding)?
1138 .into(),
1139 dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1140 dictionary_data_alignment: 4,
1141 num_dictionary_items,
1142 })
1143 }
1144 pb::array_encoding::ArrayEncoding::Flat(_) => Some(MiniBlockSchedulerDictionary {
1145 dictionary_decompressor: decompressors
1146 .create_block_decompressor(dictionary_encoding)?
1147 .into(),
1148 dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1149 dictionary_data_alignment: 16,
1150 num_dictionary_items,
1151 }),
1152 _ => {
1153 unreachable!("Currently only encodings `BinaryBlock` and `Flat` used for encoding MiniBlock dictionary.")
1154 }
1155 }
1156 } else {
1157 None
1158 };
1159
1160 Ok(Self {
1161 buffer_offsets_and_sizes: buffer_offsets_and_sizes.to_vec(),
1162 rep_decompressor,
1163 def_decompressor,
1164 value_decompressor: value_decompressor.into(),
1165 repetition_index_depth: layout.repetition_index_depth as u16,
1166 num_buffers: layout.num_buffers,
1167 priority,
1168 items_in_page,
1169 dictionary,
1170 def_meaning: def_meaning.into(),
1171 page_meta: None,
1172 })
1173 }
1174
1175 fn lookup_chunks(&self, chunk_indices: &[usize]) -> Vec<LoadedChunk> {
1176 let page_meta = self.page_meta.as_ref().unwrap();
1177 chunk_indices
1178 .iter()
1179 .map(|&chunk_idx| {
1180 let chunk_meta = &page_meta.chunk_meta[chunk_idx];
1181 let bytes_start = chunk_meta.offset_bytes;
1182 let bytes_end = bytes_start + chunk_meta.chunk_size_bytes;
1183 LoadedChunk {
1184 byte_range: bytes_start..bytes_end,
1185 items_in_chunk: chunk_meta.num_values,
1186 chunk_idx,
1187 data: LanceBuffer::empty(),
1188 }
1189 })
1190 .collect()
1191 }
1192}
1193
1194#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1195enum PreambleAction {
1196 Take,
1197 Skip,
1198 Absent,
1199}
1200
1201#[derive(Clone, Debug, PartialEq, Eq)]
1207struct ChunkInstructions {
1208 chunk_idx: usize,
1210 preamble: PreambleAction,
1216 rows_to_skip: u64,
1220 rows_to_take: u64,
1222 take_trailer: bool,
1230}
1231
1232#[derive(Debug, PartialEq, Eq)]
1250struct ChunkDrainInstructions {
1251 chunk_instructions: ChunkInstructions,
1252 rows_to_skip: u64,
1253 rows_to_take: u64,
1254 preamble_action: PreambleAction,
1255}
1256
1257impl ChunkInstructions {
1258 fn schedule_instructions(rep_index: &RepetitionIndex, user_ranges: &[Range<u64>]) -> Vec<Self> {
1264 let mut chunk_instructions = Vec::with_capacity(user_ranges.len());
1268
1269 for user_range in user_ranges {
1270 let mut rows_needed = user_range.end - user_range.start;
1271 let mut need_preamble = false;
1272
1273 let mut block_index = match rep_index
1276 .blocks
1277 .binary_search_by_key(&user_range.start, |block| block.first_row)
1278 {
1279 Ok(idx) => {
1280 let mut idx = idx;
1283 while idx > 0 && rep_index.blocks[idx - 1].first_row == user_range.start {
1284 idx -= 1;
1285 }
1286 idx
1287 }
1288 Err(idx) => idx - 1,
1290 };
1291
1292 let mut to_skip = user_range.start - rep_index.blocks[block_index].first_row;
1293
1294 while rows_needed > 0 || need_preamble {
1295 let chunk = &rep_index.blocks[block_index];
1296 let rows_avail = chunk.starts_including_trailer - to_skip;
1297 debug_assert!(rows_avail > 0);
1298
1299 let rows_to_take = rows_avail.min(rows_needed);
1300 rows_needed -= rows_to_take;
1301
1302 let mut take_trailer = false;
1303 let preamble = if chunk.has_preamble {
1304 if need_preamble {
1305 PreambleAction::Take
1306 } else {
1307 PreambleAction::Skip
1308 }
1309 } else {
1310 PreambleAction::Absent
1311 };
1312 let mut rows_to_take_no_trailer = rows_to_take;
1313
1314 if rows_to_take == rows_avail && chunk.has_trailer {
1316 take_trailer = true;
1317 need_preamble = true;
1318 rows_to_take_no_trailer -= 1;
1319 } else {
1320 need_preamble = false;
1321 };
1322
1323 chunk_instructions.push(Self {
1324 preamble,
1325 chunk_idx: block_index,
1326 rows_to_skip: to_skip,
1327 rows_to_take: rows_to_take_no_trailer,
1328 take_trailer,
1329 });
1330
1331 to_skip = 0;
1332 block_index += 1;
1333 }
1334 }
1335
1336 if user_ranges.len() > 1 {
1340 let mut merged_instructions = Vec::with_capacity(chunk_instructions.len());
1342 let mut instructions_iter = chunk_instructions.into_iter();
1343 merged_instructions.push(instructions_iter.next().unwrap());
1344 for instruction in instructions_iter {
1345 let last = merged_instructions.last_mut().unwrap();
1346 if last.chunk_idx == instruction.chunk_idx
1347 && last.rows_to_take + last.rows_to_skip == instruction.rows_to_skip
1348 {
1349 last.rows_to_take += instruction.rows_to_take;
1350 last.take_trailer |= instruction.take_trailer;
1351 } else {
1352 merged_instructions.push(instruction);
1353 }
1354 }
1355 merged_instructions
1356 } else {
1357 chunk_instructions
1358 }
1359 }
1360
1361 fn drain_from_instruction(
1362 &self,
1363 rows_desired: &mut u64,
1364 need_preamble: &mut bool,
1365 skip_in_chunk: &mut u64,
1366 ) -> (ChunkDrainInstructions, bool) {
1367 debug_assert!(!*need_preamble || *skip_in_chunk == 0);
1369 let mut rows_avail = self.rows_to_take - *skip_in_chunk;
1370 let has_preamble = self.preamble != PreambleAction::Absent;
1371 let preamble_action = match (*need_preamble, has_preamble) {
1372 (true, true) => PreambleAction::Take,
1373 (true, false) => panic!("Need preamble but there isn't one"),
1374 (false, true) => PreambleAction::Skip,
1375 (false, false) => PreambleAction::Absent,
1376 };
1377
1378 if self.take_trailer {
1380 rows_avail += 1;
1381 }
1382
1383 let rows_taking = if *rows_desired >= rows_avail {
1386 *need_preamble = self.take_trailer;
1389 rows_avail
1390 } else {
1391 *need_preamble = false;
1394 *rows_desired
1395 };
1396 let rows_skipped = *skip_in_chunk;
1397
1398 let consumed_chunk = if *rows_desired >= rows_avail {
1400 *rows_desired -= rows_avail;
1401 *skip_in_chunk = 0;
1402 true
1403 } else {
1404 *skip_in_chunk += *rows_desired;
1405 *rows_desired = 0;
1406 false
1407 };
1408
1409 (
1410 ChunkDrainInstructions {
1411 chunk_instructions: self.clone(),
1412 rows_to_skip: rows_skipped,
1413 rows_to_take: rows_taking,
1414 preamble_action,
1415 },
1416 consumed_chunk,
1417 )
1418 }
1419}
1420
1421impl StructuralPageScheduler for MiniBlockScheduler {
1422 fn initialize<'a>(
1423 &'a mut self,
1424 io: &Arc<dyn EncodingsIo>,
1425 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
1426 let (meta_buf_position, meta_buf_size) = self.buffer_offsets_and_sizes[0];
1430 let value_buf_position = self.buffer_offsets_and_sizes[1].0;
1431 let mut bufs_needed = 1;
1432 if self.dictionary.is_some() {
1433 bufs_needed += 1;
1434 }
1435 if self.repetition_index_depth > 0 {
1436 bufs_needed += 1;
1437 }
1438 let mut required_ranges = Vec::with_capacity(bufs_needed);
1439 required_ranges.push(meta_buf_position..meta_buf_position + meta_buf_size);
1440 if let Some(ref dictionary) = self.dictionary {
1441 required_ranges.push(
1442 dictionary.dictionary_buf_position_and_size.0
1443 ..dictionary.dictionary_buf_position_and_size.0
1444 + dictionary.dictionary_buf_position_and_size.1,
1445 );
1446 }
1447 if self.repetition_index_depth > 0 {
1448 let (rep_index_pos, rep_index_size) = self.buffer_offsets_and_sizes.last().unwrap();
1449 required_ranges.push(*rep_index_pos..*rep_index_pos + *rep_index_size);
1450 }
1451 let io_req = io.submit_request(required_ranges, 0);
1452
1453 async move {
1454 let mut buffers = io_req.await?.into_iter().fuse();
1455 let meta_bytes = buffers.next().unwrap();
1456 let dictionary_bytes = self.dictionary.as_ref().and_then(|_| buffers.next());
1457 let rep_index_bytes = buffers.next();
1458
1459 assert!(meta_bytes.len() % 2 == 0);
1461 let mut bytes = LanceBuffer::from_bytes(meta_bytes, 2);
1462 let words = bytes.borrow_to_typed_slice::<u16>();
1463 let words = words.as_ref();
1464
1465 let mut chunk_meta = Vec::with_capacity(words.len());
1466
1467 let mut rows_counter = 0;
1468 let mut offset_bytes = value_buf_position;
1469 for (word_idx, word) in words.iter().enumerate() {
1470 let log_num_values = word & 0x0F;
1471 let divided_bytes = word >> 4;
1472 let num_bytes = (divided_bytes as usize + 1) * MINIBLOCK_ALIGNMENT;
1473 debug_assert!(num_bytes > 0);
1474 let num_values = if word_idx < words.len() - 1 {
1475 debug_assert!(log_num_values > 0);
1476 1 << log_num_values
1477 } else {
1478 debug_assert!(
1479 log_num_values == 0
1480 || (1 << log_num_values) == (self.items_in_page - rows_counter)
1481 );
1482 self.items_in_page - rows_counter
1483 };
1484 rows_counter += num_values;
1485
1486 chunk_meta.push(ChunkMeta {
1487 num_values,
1488 chunk_size_bytes: num_bytes as u64,
1489 offset_bytes,
1490 });
1491 offset_bytes += num_bytes as u64;
1492 }
1493
1494 let rep_index = if let Some(rep_index_data) = rep_index_bytes {
1496 assert!(rep_index_data.len() % 8 == 0);
1499 let mut repetition_index_vals = LanceBuffer::from_bytes(rep_index_data, 8);
1500 let repetition_index_vals = repetition_index_vals.borrow_to_typed_slice::<u64>();
1501 repetition_index_vals
1503 .as_ref()
1504 .chunks_exact(self.repetition_index_depth as usize + 1)
1505 .map(|c| c.to_vec())
1506 .collect::<Vec<_>>()
1507 } else {
1508 chunk_meta
1511 .iter()
1512 .map(|c| vec![c.num_values, 0])
1513 .collect::<Vec<_>>()
1514 };
1515
1516 let mut page_meta = MiniBlockCacheableState {
1517 chunk_meta,
1518 rep_index: RepetitionIndex::decode(&rep_index),
1519 dictionary: None,
1520 };
1521
1522 if let Some(ref mut dictionary) = self.dictionary {
1524 let dictionary_data = dictionary_bytes.unwrap();
1525 page_meta.dictionary =
1526 Some(Arc::new(dictionary.dictionary_decompressor.decompress(
1527 LanceBuffer::from_bytes(
1528 dictionary_data,
1529 dictionary.dictionary_data_alignment,
1530 ),
1531 dictionary.num_dictionary_items,
1532 )?));
1533 };
1534 let page_meta = Arc::new(page_meta);
1535 self.page_meta = Some(page_meta.clone());
1536 Ok(page_meta as Arc<dyn CachedPageData>)
1537 }
1538 .boxed()
1539 }
1540
1541 fn load(&mut self, data: &Arc<dyn CachedPageData>) {
1542 self.page_meta = Some(
1543 data.clone()
1544 .as_arc_any()
1545 .downcast::<MiniBlockCacheableState>()
1546 .unwrap(),
1547 );
1548 }
1549
1550 fn schedule_ranges(
1551 &self,
1552 ranges: &[Range<u64>],
1553 io: &Arc<dyn EncodingsIo>,
1554 ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
1555 let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
1556
1557 let page_meta = self.page_meta.as_ref().unwrap();
1558
1559 let chunk_instructions =
1560 ChunkInstructions::schedule_instructions(&page_meta.rep_index, ranges);
1561
1562 debug_assert_eq!(
1563 num_rows,
1564 chunk_instructions
1565 .iter()
1566 .map(|ci| {
1567 let taken = ci.rows_to_take;
1568 if ci.take_trailer {
1569 taken + 1
1570 } else {
1571 taken
1572 }
1573 })
1574 .sum::<u64>()
1575 );
1576
1577 let chunks_needed = chunk_instructions
1578 .iter()
1579 .map(|ci| ci.chunk_idx)
1580 .unique()
1581 .collect::<Vec<_>>();
1582 let mut loaded_chunks = self.lookup_chunks(&chunks_needed);
1583 let chunk_ranges = loaded_chunks
1584 .iter()
1585 .map(|c| c.byte_range.clone())
1586 .collect::<Vec<_>>();
1587 let loaded_chunk_data = io.submit_request(chunk_ranges, self.priority);
1588
1589 let rep_decompressor = self.rep_decompressor.clone();
1590 let def_decompressor = self.def_decompressor.clone();
1591 let value_decompressor = self.value_decompressor.clone();
1592 let num_buffers = self.num_buffers;
1593 let dictionary = page_meta
1594 .dictionary
1595 .as_ref()
1596 .map(|dictionary| dictionary.clone());
1597 let def_meaning = self.def_meaning.clone();
1598
1599 let res = async move {
1600 let loaded_chunk_data = loaded_chunk_data.await?;
1601 for (loaded_chunk, chunk_data) in loaded_chunks.iter_mut().zip(loaded_chunk_data) {
1602 loaded_chunk.data = LanceBuffer::from_bytes(chunk_data, 1);
1603 }
1604
1605 Ok(Box::new(MiniBlockDecoder {
1606 rep_decompressor,
1607 def_decompressor,
1608 value_decompressor,
1609 def_meaning,
1610 loaded_chunks: VecDeque::from_iter(loaded_chunks),
1611 instructions: VecDeque::from(chunk_instructions),
1612 offset_in_current_chunk: 0,
1613 dictionary,
1614 num_rows,
1615 num_buffers,
1616 }) as Box<dyn StructuralPageDecoder>)
1617 }
1618 .boxed();
1619 Ok(res)
1620 }
1621}
1622
1623#[derive(Debug)]
1624struct FullZipRepIndexDetails {
1625 buf_position: u64,
1626 bytes_per_value: u64, }
1628
1629#[derive(Debug)]
1630enum PerValueDecompressor {
1631 Fixed(Arc<dyn FixedPerValueDecompressor>),
1632 Variable(Arc<dyn VariablePerValueDecompressor>),
1633}
1634
1635#[derive(Debug)]
1636struct FullZipDecodeDetails {
1637 value_decompressor: PerValueDecompressor,
1638 def_meaning: Arc<[DefinitionInterpretation]>,
1639 ctrl_word_parser: ControlWordParser,
1640 max_rep: u16,
1641 max_visible_def: u16,
1642}
1643
1644#[derive(Debug)]
1652pub struct FullZipScheduler {
1653 data_buf_position: u64,
1654 rep_index: Option<FullZipRepIndexDetails>,
1655 priority: u64,
1656 rows_in_page: u64,
1657 bits_per_offset: u8,
1658 details: Arc<FullZipDecodeDetails>,
1659 cached_state: Option<Arc<FullZipCacheableState>>,
1661}
1662
1663impl FullZipScheduler {
1664 fn try_new(
1665 buffer_offsets_and_sizes: &[(u64, u64)],
1666 priority: u64,
1667 rows_in_page: u64,
1668 layout: &pb::FullZipLayout,
1669 decompressors: &dyn DecompressionStrategy,
1670 ) -> Result<Self> {
1671 let (data_buf_position, _) = buffer_offsets_and_sizes[0];
1675 let rep_index = buffer_offsets_and_sizes.get(1).map(|(pos, len)| {
1676 let num_reps = rows_in_page + 1;
1677 let bytes_per_rep = len / num_reps;
1678 debug_assert_eq!(len % num_reps, 0);
1679 debug_assert!(
1680 bytes_per_rep == 1
1681 || bytes_per_rep == 2
1682 || bytes_per_rep == 4
1683 || bytes_per_rep == 8
1684 );
1685 FullZipRepIndexDetails {
1686 buf_position: *pos,
1687 bytes_per_value: bytes_per_rep,
1688 }
1689 });
1690
1691 let value_decompressor = match layout.details {
1692 Some(pb::full_zip_layout::Details::BitsPerValue(_)) => {
1693 let decompressor = decompressors.create_fixed_per_value_decompressor(
1694 layout.value_compression.as_ref().unwrap(),
1695 )?;
1696 PerValueDecompressor::Fixed(decompressor.into())
1697 }
1698 Some(pb::full_zip_layout::Details::BitsPerOffset(_)) => {
1699 let decompressor = decompressors.create_variable_per_value_decompressor(
1700 layout.value_compression.as_ref().unwrap(),
1701 )?;
1702 PerValueDecompressor::Variable(decompressor.into())
1703 }
1704 None => {
1705 panic!("Full-zip layout must have a `details` field");
1706 }
1707 };
1708 let ctrl_word_parser = ControlWordParser::new(
1709 layout.bits_rep.try_into().unwrap(),
1710 layout.bits_def.try_into().unwrap(),
1711 );
1712 let def_meaning = layout
1713 .layers
1714 .iter()
1715 .map(|l| ProtobufUtils::repdef_layer_to_def_interp(*l))
1716 .collect::<Vec<_>>();
1717
1718 let max_rep = def_meaning.iter().filter(|d| d.is_list()).count() as u16;
1719 let max_visible_def = def_meaning
1720 .iter()
1721 .filter(|d| !d.is_list())
1722 .map(|d| d.num_def_levels())
1723 .sum();
1724
1725 let bits_per_offset = match layout.details {
1726 Some(pb::full_zip_layout::Details::BitsPerValue(_)) => 32,
1727 Some(pb::full_zip_layout::Details::BitsPerOffset(bits_per_offset)) => {
1728 bits_per_offset as u8
1729 }
1730 None => panic!("Full-zip layout must have a `details` field"),
1731 };
1732
1733 let details = Arc::new(FullZipDecodeDetails {
1734 value_decompressor,
1735 def_meaning: def_meaning.into(),
1736 ctrl_word_parser,
1737 max_rep,
1738 max_visible_def,
1739 });
1740 Ok(Self {
1741 data_buf_position,
1742 rep_index,
1743 details,
1744 priority,
1745 rows_in_page,
1746 bits_per_offset,
1747 cached_state: None,
1748 })
1749 }
1750
1751 #[allow(clippy::too_many_arguments)]
1757 async fn indirect_schedule_ranges(
1758 data_buffer_pos: u64,
1759 row_ranges: Vec<Range<u64>>,
1760 rep_index_ranges: Vec<Range<u64>>,
1761 bytes_per_rep: u64,
1762 io: Arc<dyn EncodingsIo>,
1763 priority: u64,
1764 bits_per_offset: u8,
1765 details: Arc<FullZipDecodeDetails>,
1766 ) -> Result<Box<dyn StructuralPageDecoder>> {
1767 let byte_ranges = io
1768 .submit_request(rep_index_ranges, priority)
1769 .await?
1770 .into_iter()
1771 .map(|d| LanceBuffer::from_bytes(d, 1))
1772 .collect::<Vec<_>>();
1773 let byte_ranges = LanceBuffer::concat(&byte_ranges);
1774 let byte_ranges = ByteUnpacker::new(byte_ranges, bytes_per_rep as usize)
1775 .chunks(2)
1776 .into_iter()
1777 .map(|mut c| {
1778 let start = c.next().unwrap() + data_buffer_pos;
1779 let end = c.next().unwrap() + data_buffer_pos;
1780 start..end
1781 })
1782 .collect::<Vec<_>>();
1783
1784 let data = io.submit_request(byte_ranges, priority);
1785
1786 let data = data.await?;
1787 let data = data
1788 .into_iter()
1789 .map(|d| LanceBuffer::from_bytes(d, 1))
1790 .collect();
1791 let num_rows = row_ranges.into_iter().map(|r| r.end - r.start).sum();
1792
1793 match &details.value_decompressor {
1794 PerValueDecompressor::Fixed(decompressor) => {
1795 let bits_per_value = decompressor.bits_per_value();
1796 if bits_per_value == 0 {
1797 return Err(lance_core::Error::Internal {
1798 message: "Invalid encoding: bits_per_value must be greater than 0".into(),
1799 location: location!(),
1800 });
1801 }
1802 if bits_per_value % 8 != 0 {
1803 return Err(lance_core::Error::NotSupported {
1806 source: "Bit-packed full-zip encoding (non-byte-aligned values) is not yet implemented".into(),
1807 location: location!(),
1808 });
1809 }
1810 let bytes_per_value = bits_per_value / 8;
1811 let total_bytes_per_value =
1812 bytes_per_value as usize + details.ctrl_word_parser.bytes_per_word();
1813 Ok(Box::new(FixedFullZipDecoder {
1814 details,
1815 data,
1816 num_rows,
1817 offset_in_current: 0,
1818 bytes_per_value: bytes_per_value as usize,
1819 total_bytes_per_value,
1820 }) as Box<dyn StructuralPageDecoder>)
1821 }
1822 PerValueDecompressor::Variable(_decompressor) => {
1823 Ok(Box::new(VariableFullZipDecoder::new(
1826 details,
1827 data,
1828 num_rows,
1829 bits_per_offset,
1830 bits_per_offset,
1831 )))
1832 }
1833 }
1834 }
1835
1836 fn schedule_ranges_with_cached_rep(
1838 &self,
1839 ranges: &[Range<u64>],
1840 io: &Arc<dyn EncodingsIo>,
1841 cached_rep_data: &LanceBuffer,
1842 bytes_per_value: u64,
1843 ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
1844 use crate::utils::bytepack::ByteUnpacker;
1845
1846 let byte_ranges: Vec<Range<u64>> = ranges
1848 .iter()
1849 .map(|r| {
1850 let start_offset = (r.start * bytes_per_value) as usize;
1852 let end_offset = (r.end * bytes_per_value) as usize;
1853
1854 let start_slice =
1856 &cached_rep_data[start_offset..start_offset + bytes_per_value as usize];
1857 let start_val =
1858 ByteUnpacker::new(start_slice.iter().copied(), bytes_per_value as usize)
1859 .next()
1860 .unwrap();
1861
1862 let end_slice = &cached_rep_data[end_offset..end_offset + bytes_per_value as usize];
1863 let end_val =
1864 ByteUnpacker::new(end_slice.iter().copied(), bytes_per_value as usize)
1865 .next()
1866 .unwrap();
1867
1868 (self.data_buf_position + start_val)..(self.data_buf_position + end_val)
1869 })
1870 .collect();
1871
1872 let data = io.submit_request(byte_ranges, self.priority);
1873 let row_ranges = ranges.to_vec();
1874 let details = self.details.clone();
1875 let bits_per_offset = self.bits_per_offset;
1876
1877 Ok(async move {
1878 let data = data.await?;
1879 let data = data
1880 .into_iter()
1881 .map(|d| LanceBuffer::from_bytes(d, 1))
1882 .collect();
1883 let num_rows = row_ranges.into_iter().map(|r| r.end - r.start).sum();
1884
1885 match &details.value_decompressor {
1886 PerValueDecompressor::Fixed(decompressor) => {
1887 let bits_per_value = decompressor.bits_per_value();
1888 if bits_per_value == 0 {
1889 return Err(lance_core::Error::Internal {
1890 message: "Invalid encoding: bits_per_value must be greater than 0".into(),
1891 location: location!(),
1892 });
1893 }
1894 if bits_per_value % 8 != 0 {
1895 return Err(lance_core::Error::NotSupported {
1896 source: "Bit-packed full-zip encoding (non-byte-aligned values) is not yet implemented".into(),
1897 location: location!(),
1898 });
1899 }
1900 let bytes_per_value = bits_per_value / 8;
1901 let total_bytes_per_value =
1902 bytes_per_value as usize + details.ctrl_word_parser.bytes_per_word();
1903 Ok(Box::new(FixedFullZipDecoder {
1904 details,
1905 data,
1906 num_rows,
1907 offset_in_current: 0,
1908 bytes_per_value: bytes_per_value as usize,
1909 total_bytes_per_value,
1910 }) as Box<dyn StructuralPageDecoder>)
1911 }
1912 PerValueDecompressor::Variable(_decompressor) => {
1913 Ok(Box::new(VariableFullZipDecoder::new(
1914 details,
1915 data,
1916 num_rows,
1917 bits_per_offset,
1918 bits_per_offset,
1919 )) as Box<dyn StructuralPageDecoder>)
1920 }
1921 }
1922 }
1923 .boxed())
1924 }
1925
1926 fn schedule_ranges_rep(
1928 &self,
1929 ranges: &[Range<u64>],
1930 io: &Arc<dyn EncodingsIo>,
1931 rep_index: &FullZipRepIndexDetails,
1932 ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
1933 if let Some(cached_state) = &self.cached_state {
1935 return self.schedule_ranges_with_cached_rep(
1936 ranges,
1937 io,
1938 &cached_state.rep_index_buffer,
1939 rep_index.bytes_per_value,
1940 );
1941 }
1942
1943 let rep_index_ranges = ranges
1945 .iter()
1946 .flat_map(|r| {
1947 let first_val_start =
1948 rep_index.buf_position + (r.start * rep_index.bytes_per_value);
1949 let first_val_end = first_val_start + rep_index.bytes_per_value;
1950 let last_val_start = rep_index.buf_position + (r.end * rep_index.bytes_per_value);
1951 let last_val_end = last_val_start + rep_index.bytes_per_value;
1952 [first_val_start..first_val_end, last_val_start..last_val_end]
1953 })
1954 .collect::<Vec<_>>();
1955
1956 Ok(Self::indirect_schedule_ranges(
1959 self.data_buf_position,
1960 ranges.to_vec(),
1961 rep_index_ranges,
1962 rep_index.bytes_per_value,
1963 io.clone(),
1964 self.priority,
1965 self.bits_per_offset,
1966 self.details.clone(),
1967 )
1968 .boxed())
1969 }
1970
1971 fn schedule_ranges_simple(
1975 &self,
1976 ranges: &[Range<u64>],
1977 io: &dyn EncodingsIo,
1978 ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
1979 let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
1981
1982 let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor else {
1983 unreachable!()
1984 };
1985
1986 let bits_per_value = decompressor.bits_per_value();
1988 assert_eq!(bits_per_value % 8, 0);
1989 let bytes_per_value = bits_per_value / 8;
1990 let bytes_per_cw = self.details.ctrl_word_parser.bytes_per_word();
1991 let total_bytes_per_value = bytes_per_value + bytes_per_cw as u64;
1992 let byte_ranges = ranges.iter().map(|r| {
1993 debug_assert!(r.end <= self.rows_in_page);
1994 let start = self.data_buf_position + r.start * total_bytes_per_value;
1995 let end = self.data_buf_position + r.end * total_bytes_per_value;
1996 start..end
1997 });
1998
1999 let data = io.submit_request(byte_ranges.collect(), self.priority);
2001
2002 let details = self.details.clone();
2003
2004 Ok(async move {
2005 let data = data.await?;
2006 let data = data
2007 .into_iter()
2008 .map(|d| LanceBuffer::from_bytes(d, 1))
2009 .collect();
2010 Ok(Box::new(FixedFullZipDecoder {
2011 details,
2012 data,
2013 num_rows,
2014 offset_in_current: 0,
2015 bytes_per_value: bytes_per_value as usize,
2016 total_bytes_per_value: total_bytes_per_value as usize,
2017 }) as Box<dyn StructuralPageDecoder>)
2018 }
2019 .boxed())
2020 }
2021}
2022
2023#[derive(Debug)]
2025struct FullZipCacheableState {
2026 rep_index_buffer: LanceBuffer,
2028}
2029
2030impl DeepSizeOf for FullZipCacheableState {
2031 fn deep_size_of_children(&self, _context: &mut Context) -> usize {
2032 self.rep_index_buffer.len()
2033 }
2034}
2035
2036impl CachedPageData for FullZipCacheableState {
2037 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
2038 self
2039 }
2040}
2041
2042impl StructuralPageScheduler for FullZipScheduler {
2043 fn initialize<'a>(
2046 &'a mut self,
2047 io: &Arc<dyn EncodingsIo>,
2048 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
2049 if let Some(rep_index) = &self.rep_index {
2051 let total_size = (self.rows_in_page + 1) * rep_index.bytes_per_value;
2053 let rep_index_range = rep_index.buf_position..(rep_index.buf_position + total_size);
2054
2055 let io_clone = io.clone();
2057 let future = async move {
2058 let rep_index_data = io_clone.submit_request(vec![rep_index_range], 0).await?;
2059 let rep_index_buffer = LanceBuffer::from_bytes(rep_index_data[0].clone(), 1);
2060
2061 Ok(Arc::new(FullZipCacheableState { rep_index_buffer }) as Arc<dyn CachedPageData>)
2063 };
2064
2065 future.boxed()
2066 } else {
2067 std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
2069 }
2070 }
2071
2072 fn load(&mut self, cache: &Arc<dyn CachedPageData>) {
2076 if let Ok(cached_state) = cache
2078 .clone()
2079 .as_arc_any()
2080 .downcast::<FullZipCacheableState>()
2081 {
2082 self.cached_state = Some(cached_state);
2084 }
2085 }
2086
2087 fn schedule_ranges(
2088 &self,
2089 ranges: &[Range<u64>],
2090 io: &Arc<dyn EncodingsIo>,
2091 ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
2092 if let Some(rep_index) = self.rep_index.as_ref() {
2093 self.schedule_ranges_rep(ranges, io, rep_index)
2094 } else {
2095 self.schedule_ranges_simple(ranges, io.as_ref())
2096 }
2097 }
2098}
2099
2100#[derive(Debug)]
2108struct FixedFullZipDecoder {
2109 details: Arc<FullZipDecodeDetails>,
2110 data: VecDeque<LanceBuffer>,
2111 offset_in_current: usize,
2112 bytes_per_value: usize,
2113 total_bytes_per_value: usize,
2114 num_rows: u64,
2115}
2116
2117impl FixedFullZipDecoder {
2118 fn slice_next_task(&mut self, num_rows: u64) -> FullZipDecodeTaskItem {
2119 debug_assert!(num_rows > 0);
2120 let cur_buf = self.data.front_mut().unwrap();
2121 let start = self.offset_in_current;
2122 if self.details.ctrl_word_parser.has_rep() {
2123 let mut rows_started = 0;
2126 let mut num_items = 0;
2129 while self.offset_in_current < cur_buf.len() {
2130 let control = self.details.ctrl_word_parser.parse_desc(
2131 &cur_buf[self.offset_in_current..],
2132 self.details.max_rep,
2133 self.details.max_visible_def,
2134 );
2135 if control.is_new_row {
2136 if rows_started == num_rows {
2137 break;
2138 }
2139 rows_started += 1;
2140 }
2141 num_items += 1;
2142 if control.is_visible {
2143 self.offset_in_current += self.total_bytes_per_value;
2144 } else {
2145 self.offset_in_current += self.details.ctrl_word_parser.bytes_per_word();
2146 }
2147 }
2148
2149 let task_slice = cur_buf.slice_with_length(start, self.offset_in_current - start);
2150 if self.offset_in_current == cur_buf.len() {
2151 self.data.pop_front();
2152 self.offset_in_current = 0;
2153 }
2154
2155 FullZipDecodeTaskItem {
2156 data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2157 data: task_slice,
2158 bits_per_value: self.bytes_per_value as u64 * 8,
2159 num_values: num_items,
2160 block_info: BlockInfo::new(),
2161 }),
2162 rows_in_buf: rows_started,
2163 }
2164 } else {
2165 let cur_buf = self.data.front_mut().unwrap();
2168 let bytes_avail = cur_buf.len() - self.offset_in_current;
2169 let offset_in_cur = self.offset_in_current;
2170
2171 let bytes_needed = num_rows as usize * self.total_bytes_per_value;
2172 let mut rows_taken = num_rows;
2173 let task_slice = if bytes_needed >= bytes_avail {
2174 self.offset_in_current = 0;
2175 rows_taken = bytes_avail as u64 / self.total_bytes_per_value as u64;
2176 self.data
2177 .pop_front()
2178 .unwrap()
2179 .slice_with_length(offset_in_cur, bytes_avail)
2180 } else {
2181 self.offset_in_current += bytes_needed;
2182 cur_buf.slice_with_length(offset_in_cur, bytes_needed)
2183 };
2184 FullZipDecodeTaskItem {
2185 data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2186 data: task_slice,
2187 bits_per_value: self.bytes_per_value as u64 * 8,
2188 num_values: rows_taken,
2189 block_info: BlockInfo::new(),
2190 }),
2191 rows_in_buf: rows_taken,
2192 }
2193 }
2194 }
2195}
2196
2197impl StructuralPageDecoder for FixedFullZipDecoder {
2198 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2199 let mut task_data = Vec::with_capacity(self.data.len());
2200 let mut remaining = num_rows;
2201 while remaining > 0 {
2202 let task_item = self.slice_next_task(remaining);
2203 remaining -= task_item.rows_in_buf;
2204 task_data.push(task_item);
2205 }
2206 Ok(Box::new(FixedFullZipDecodeTask {
2207 details: self.details.clone(),
2208 data: task_data,
2209 bytes_per_value: self.bytes_per_value,
2210 num_rows: num_rows as usize,
2211 }))
2212 }
2213
2214 fn num_rows(&self) -> u64 {
2215 self.num_rows
2216 }
2217}
2218
2219#[derive(Debug)]
2224struct VariableFullZipDecoder {
2225 details: Arc<FullZipDecodeDetails>,
2226 decompressor: Arc<dyn VariablePerValueDecompressor>,
2227 data: LanceBuffer,
2228 offsets: LanceBuffer,
2229 rep: ScalarBuffer<u16>,
2230 def: ScalarBuffer<u16>,
2231 repdef_starts: Vec<usize>,
2232 data_starts: Vec<usize>,
2233 offset_starts: Vec<usize>,
2234 visible_item_counts: Vec<u64>,
2235 bits_per_offset: u8,
2236 current_idx: usize,
2237 num_rows: u64,
2238}
2239
2240impl VariableFullZipDecoder {
2241 fn new(
2242 details: Arc<FullZipDecodeDetails>,
2243 data: VecDeque<LanceBuffer>,
2244 num_rows: u64,
2245 in_bits_per_length: u8,
2246 out_bits_per_offset: u8,
2247 ) -> Self {
2248 let decompressor = match details.value_decompressor {
2249 PerValueDecompressor::Variable(ref d) => d.clone(),
2250 _ => unreachable!(),
2251 };
2252
2253 assert_eq!(in_bits_per_length % 8, 0);
2254 assert!(out_bits_per_offset == 32 || out_bits_per_offset == 64);
2255
2256 let mut decoder = Self {
2257 details,
2258 decompressor,
2259 data: LanceBuffer::empty(),
2260 offsets: LanceBuffer::empty(),
2261 rep: LanceBuffer::empty().borrow_to_typed_slice(),
2262 def: LanceBuffer::empty().borrow_to_typed_slice(),
2263 bits_per_offset: out_bits_per_offset,
2264 repdef_starts: Vec::with_capacity(num_rows as usize + 1),
2265 data_starts: Vec::with_capacity(num_rows as usize + 1),
2266 offset_starts: Vec::with_capacity(num_rows as usize + 1),
2267 visible_item_counts: Vec::with_capacity(num_rows as usize + 1),
2268 current_idx: 0,
2269 num_rows,
2270 };
2271
2272 decoder.unzip(data, in_bits_per_length, out_bits_per_offset, num_rows);
2293
2294 decoder
2295 }
2296
2297 unsafe fn parse_length(data: &[u8], bits_per_offset: u8) -> u64 {
2298 match bits_per_offset {
2299 8 => *data.get_unchecked(0) as u64,
2300 16 => u16::from_le_bytes([*data.get_unchecked(0), *data.get_unchecked(1)]) as u64,
2301 32 => u32::from_le_bytes([
2302 *data.get_unchecked(0),
2303 *data.get_unchecked(1),
2304 *data.get_unchecked(2),
2305 *data.get_unchecked(3),
2306 ]) as u64,
2307 64 => u64::from_le_bytes([
2308 *data.get_unchecked(0),
2309 *data.get_unchecked(1),
2310 *data.get_unchecked(2),
2311 *data.get_unchecked(3),
2312 *data.get_unchecked(4),
2313 *data.get_unchecked(5),
2314 *data.get_unchecked(6),
2315 *data.get_unchecked(7),
2316 ]),
2317 _ => unreachable!(),
2318 }
2319 }
2320
2321 fn unzip(
2322 &mut self,
2323 data: VecDeque<LanceBuffer>,
2324 in_bits_per_length: u8,
2325 out_bits_per_offset: u8,
2326 num_rows: u64,
2327 ) {
2328 let mut rep = Vec::with_capacity(num_rows as usize);
2330 let mut def = Vec::with_capacity(num_rows as usize);
2331 let bytes_cw = self.details.ctrl_word_parser.bytes_per_word() * num_rows as usize;
2332
2333 let bytes_per_offset = out_bits_per_offset as usize / 8;
2336 let bytes_offsets = bytes_per_offset * (num_rows as usize + 1);
2337 let mut offsets_data = Vec::with_capacity(bytes_offsets);
2338
2339 let bytes_per_length = in_bits_per_length as usize / 8;
2340 let bytes_lengths = bytes_per_length * num_rows as usize;
2341
2342 let bytes_data = data.iter().map(|d| d.len()).sum::<usize>();
2343 let mut unzipped_data =
2346 Vec::with_capacity((bytes_data - bytes_cw).saturating_sub(bytes_lengths));
2347
2348 let mut current_offset = 0_u64;
2349 let mut visible_item_count = 0_u64;
2350 for databuf in data.into_iter() {
2351 let mut databuf = databuf.as_ref();
2352 while !databuf.is_empty() {
2353 let data_start = unzipped_data.len();
2354 let offset_start = offsets_data.len();
2355 let repdef_start = rep.len().max(def.len());
2358 let ctrl_desc = self.details.ctrl_word_parser.parse_desc(
2360 databuf,
2361 self.details.max_rep,
2362 self.details.max_visible_def,
2363 );
2364 self.details
2365 .ctrl_word_parser
2366 .parse(databuf, &mut rep, &mut def);
2367 databuf = &databuf[self.details.ctrl_word_parser.bytes_per_word()..];
2368
2369 if ctrl_desc.is_new_row {
2370 self.repdef_starts.push(repdef_start);
2371 self.data_starts.push(data_start);
2372 self.offset_starts.push(offset_start);
2373 self.visible_item_counts.push(visible_item_count);
2374 }
2375 if ctrl_desc.is_visible {
2376 visible_item_count += 1;
2377 if ctrl_desc.is_valid_item {
2378 debug_assert!(databuf.len() >= bytes_per_length);
2380 let length = unsafe { Self::parse_length(databuf, in_bits_per_length) };
2381 match out_bits_per_offset {
2382 32 => offsets_data
2383 .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2384 64 => offsets_data.extend_from_slice(¤t_offset.to_le_bytes()),
2385 _ => unreachable!(),
2386 };
2387 databuf = &databuf[bytes_per_offset..];
2388 unzipped_data.extend_from_slice(&databuf[..length as usize]);
2389 databuf = &databuf[length as usize..];
2390 current_offset += length;
2391 } else {
2392 match out_bits_per_offset {
2394 32 => offsets_data
2395 .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2396 64 => offsets_data.extend_from_slice(¤t_offset.to_le_bytes()),
2397 _ => unreachable!(),
2398 }
2399 }
2400 }
2401 }
2402 }
2403 self.repdef_starts.push(rep.len().max(def.len()));
2404 self.data_starts.push(unzipped_data.len());
2405 self.offset_starts.push(offsets_data.len());
2406 self.visible_item_counts.push(visible_item_count);
2407 match out_bits_per_offset {
2408 32 => offsets_data.extend_from_slice(&(current_offset as u32).to_le_bytes()),
2409 64 => offsets_data.extend_from_slice(¤t_offset.to_le_bytes()),
2410 _ => unreachable!(),
2411 };
2412 self.rep = ScalarBuffer::from(rep);
2413 self.def = ScalarBuffer::from(def);
2414 self.data = LanceBuffer::Owned(unzipped_data);
2415 self.offsets = LanceBuffer::Owned(offsets_data);
2416 }
2417}
2418
2419impl StructuralPageDecoder for VariableFullZipDecoder {
2420 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2421 let start = self.current_idx;
2422 let end = start + num_rows as usize;
2423
2424 let data = self.data.borrow_and_clone();
2432
2433 let offset_start = self.offset_starts[start];
2434 let offset_end = self.offset_starts[end] + (self.bits_per_offset as usize / 8);
2435 let offsets = self
2436 .offsets
2437 .slice_with_length(offset_start, offset_end - offset_start);
2438
2439 let repdef_start = self.repdef_starts[start];
2440 let repdef_end = self.repdef_starts[end];
2441 let rep = if self.rep.is_empty() {
2442 self.rep.clone()
2443 } else {
2444 self.rep.slice(repdef_start, repdef_end - repdef_start)
2445 };
2446 let def = if self.def.is_empty() {
2447 self.def.clone()
2448 } else {
2449 self.def.slice(repdef_start, repdef_end - repdef_start)
2450 };
2451
2452 let visible_item_counts_start = self.visible_item_counts[start];
2453 let visible_item_counts_end = self.visible_item_counts[end];
2454 let num_visible_items = visible_item_counts_end - visible_item_counts_start;
2455
2456 self.current_idx += num_rows as usize;
2457
2458 Ok(Box::new(VariableFullZipDecodeTask {
2459 details: self.details.clone(),
2460 decompressor: self.decompressor.clone(),
2461 data,
2462 offsets,
2463 bits_per_offset: self.bits_per_offset,
2464 num_visible_items,
2465 rep,
2466 def,
2467 }))
2468 }
2469
2470 fn num_rows(&self) -> u64 {
2471 self.num_rows
2472 }
2473}
2474
2475#[derive(Debug)]
2476struct VariableFullZipDecodeTask {
2477 details: Arc<FullZipDecodeDetails>,
2478 decompressor: Arc<dyn VariablePerValueDecompressor>,
2479 data: LanceBuffer,
2480 offsets: LanceBuffer,
2481 bits_per_offset: u8,
2482 num_visible_items: u64,
2483 rep: ScalarBuffer<u16>,
2484 def: ScalarBuffer<u16>,
2485}
2486
2487impl DecodePageTask for VariableFullZipDecodeTask {
2488 fn decode(self: Box<Self>) -> Result<DecodedPage> {
2489 let block = VariableWidthBlock {
2490 data: self.data,
2491 offsets: self.offsets,
2492 bits_per_offset: self.bits_per_offset,
2493 num_values: self.num_visible_items,
2494 block_info: BlockInfo::new(),
2495 };
2496 let decomopressed = self.decompressor.decompress(block)?;
2497 let rep = self.rep.to_vec();
2498 let def = self.def.to_vec();
2499 let unraveler =
2500 RepDefUnraveler::new(Some(rep), Some(def), self.details.def_meaning.clone());
2501 Ok(DecodedPage {
2502 data: decomopressed,
2503 repdef: unraveler,
2504 })
2505 }
2506}
2507
2508#[derive(Debug)]
2509struct FullZipDecodeTaskItem {
2510 data: PerValueDataBlock,
2511 rows_in_buf: u64,
2512}
2513
2514#[derive(Debug)]
2517struct FixedFullZipDecodeTask {
2518 details: Arc<FullZipDecodeDetails>,
2519 data: Vec<FullZipDecodeTaskItem>,
2520 num_rows: usize,
2521 bytes_per_value: usize,
2522}
2523
2524impl DecodePageTask for FixedFullZipDecodeTask {
2525 fn decode(self: Box<Self>) -> Result<DecodedPage> {
2526 let estimated_size_bytes = self
2528 .data
2529 .iter()
2530 .map(|task_item| task_item.data.data_size() as usize)
2531 .sum::<usize>()
2532 * 2;
2533 let mut data_builder =
2534 DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
2535
2536 if self.details.ctrl_word_parser.bytes_per_word() == 0 {
2537 for task_item in self.data.into_iter() {
2541 let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
2542 unreachable!()
2543 };
2544 let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
2545 else {
2546 unreachable!()
2547 };
2548 debug_assert_eq!(fixed_data.num_values, task_item.rows_in_buf);
2549 let decompressed = decompressor.decompress(fixed_data, task_item.rows_in_buf)?;
2550 data_builder.append(&decompressed, 0..task_item.rows_in_buf);
2551 }
2552
2553 let unraveler = RepDefUnraveler::new(None, None, self.details.def_meaning.clone());
2554
2555 Ok(DecodedPage {
2556 data: data_builder.finish(),
2557 repdef: unraveler,
2558 })
2559 } else {
2560 let mut rep = Vec::with_capacity(self.num_rows);
2562 let mut def = Vec::with_capacity(self.num_rows);
2563
2564 for task_item in self.data.into_iter() {
2565 let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
2566 unreachable!()
2567 };
2568 let mut buf_slice = fixed_data.data.as_ref();
2569 let num_values = fixed_data.num_values as usize;
2570 let mut values = Vec::with_capacity(
2573 fixed_data.data.len()
2574 - (self.details.ctrl_word_parser.bytes_per_word() * num_values),
2575 );
2576 let mut visible_items = 0;
2577 for _ in 0..num_values {
2578 self.details
2580 .ctrl_word_parser
2581 .parse(buf_slice, &mut rep, &mut def);
2582 buf_slice = &buf_slice[self.details.ctrl_word_parser.bytes_per_word()..];
2583
2584 let is_visible = def
2585 .last()
2586 .map(|d| *d <= self.details.max_visible_def)
2587 .unwrap_or(true);
2588 if is_visible {
2589 values.extend_from_slice(buf_slice[..self.bytes_per_value].as_ref());
2591 buf_slice = &buf_slice[self.bytes_per_value..];
2592 visible_items += 1;
2593 }
2594 }
2595
2596 let values_buf = LanceBuffer::Owned(values);
2598 let fixed_data = FixedWidthDataBlock {
2599 bits_per_value: self.bytes_per_value as u64 * 8,
2600 block_info: BlockInfo::new(),
2601 data: values_buf,
2602 num_values: visible_items,
2603 };
2604 let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
2605 else {
2606 unreachable!()
2607 };
2608 let decompressed = decompressor.decompress(fixed_data, visible_items)?;
2609 data_builder.append(&decompressed, 0..visible_items);
2610 }
2611
2612 let repetition = if rep.is_empty() { None } else { Some(rep) };
2613 let definition = if def.is_empty() { None } else { Some(def) };
2614
2615 let unraveler =
2616 RepDefUnraveler::new(repetition, definition, self.details.def_meaning.clone());
2617 let data = data_builder.finish();
2618
2619 Ok(DecodedPage {
2620 data,
2621 repdef: unraveler,
2622 })
2623 }
2624 }
2625}
2626
2627#[derive(Debug)]
2628struct StructuralPrimitiveFieldSchedulingJob<'a> {
2629 scheduler: &'a StructuralPrimitiveFieldScheduler,
2630 ranges: Vec<Range<u64>>,
2631 page_idx: usize,
2632 range_idx: usize,
2633 global_row_offset: u64,
2634}
2635
2636impl<'a> StructuralPrimitiveFieldSchedulingJob<'a> {
2637 pub fn new(scheduler: &'a StructuralPrimitiveFieldScheduler, ranges: Vec<Range<u64>>) -> Self {
2638 Self {
2639 scheduler,
2640 ranges,
2641 page_idx: 0,
2642 range_idx: 0,
2643 global_row_offset: 0,
2644 }
2645 }
2646}
2647
2648impl StructuralSchedulingJob for StructuralPrimitiveFieldSchedulingJob<'_> {
2649 fn schedule_next(
2650 &mut self,
2651 context: &mut SchedulerContext,
2652 ) -> Result<Option<ScheduledScanLine>> {
2653 if self.range_idx >= self.ranges.len() {
2654 return Ok(None);
2655 }
2656 let mut range = self.ranges[self.range_idx].clone();
2658 let priority = range.start;
2659
2660 let mut cur_page = &self.scheduler.page_schedulers[self.page_idx];
2661 trace!(
2662 "Current range is {:?} and current page has {} rows",
2663 range,
2664 cur_page.num_rows
2665 );
2666 while cur_page.num_rows + self.global_row_offset <= range.start {
2668 self.global_row_offset += cur_page.num_rows;
2669 self.page_idx += 1;
2670 trace!("Skipping entire page of {} rows", cur_page.num_rows);
2671 cur_page = &self.scheduler.page_schedulers[self.page_idx];
2672 }
2673
2674 let mut ranges_in_page = Vec::new();
2678 while cur_page.num_rows + self.global_row_offset > range.start {
2679 range.start = range.start.max(self.global_row_offset);
2680 let start_in_page = range.start - self.global_row_offset;
2681 let end_in_page = start_in_page + (range.end - range.start);
2682 let end_in_page = end_in_page.min(cur_page.num_rows);
2683 let last_in_range = (end_in_page + self.global_row_offset) >= range.end;
2684
2685 ranges_in_page.push(start_in_page..end_in_page);
2686 if last_in_range {
2687 self.range_idx += 1;
2688 if self.range_idx == self.ranges.len() {
2689 break;
2690 }
2691 range = self.ranges[self.range_idx].clone();
2692 } else {
2693 break;
2694 }
2695 }
2696
2697 let num_rows_in_next = ranges_in_page.iter().map(|r| r.end - r.start).sum();
2698 trace!(
2699 "Scheduling {} rows across {} ranges from page with {} rows (priority={}, column_index={}, page_index={})",
2700 num_rows_in_next,
2701 ranges_in_page.len(),
2702 cur_page.num_rows,
2703 priority,
2704 self.scheduler.column_index,
2705 cur_page.page_index,
2706 );
2707
2708 self.global_row_offset += cur_page.num_rows;
2709 self.page_idx += 1;
2710
2711 let page_decoder = cur_page
2712 .scheduler
2713 .schedule_ranges(&ranges_in_page, context.io())?;
2714
2715 let cur_path = context.current_path();
2716 let page_index = cur_page.page_index;
2717 let unloaded_page = async move {
2718 let page_decoder = page_decoder.await?;
2719 Ok(LoadedPage {
2720 decoder: page_decoder,
2721 path: cur_path,
2722 page_index,
2723 })
2724 }
2725 .boxed();
2726
2727 Ok(Some(ScheduledScanLine {
2728 decoders: vec![MessageType::UnloadedPage(UnloadedPage(unloaded_page))],
2729 rows_scheduled: num_rows_in_next,
2730 }))
2731 }
2732}
2733
2734#[derive(Debug)]
2735struct PageInfoAndScheduler {
2736 page_index: usize,
2737 num_rows: u64,
2738 scheduler: Box<dyn StructuralPageScheduler>,
2739}
2740
2741#[derive(Debug)]
2746pub struct StructuralPrimitiveFieldScheduler {
2747 page_schedulers: Vec<PageInfoAndScheduler>,
2748 column_index: u32,
2749}
2750
2751impl StructuralPrimitiveFieldScheduler {
2752 pub fn try_new(
2753 column_info: &ColumnInfo,
2754 decompressors: &dyn DecompressionStrategy,
2755 ) -> Result<Self> {
2756 let page_schedulers = column_info
2757 .page_infos
2758 .iter()
2759 .enumerate()
2760 .map(|(page_index, page_info)| {
2761 Self::page_info_to_scheduler(
2762 page_info,
2763 page_index,
2764 column_info.index as usize,
2765 decompressors,
2766 )
2767 })
2768 .collect::<Result<Vec<_>>>()?;
2769 Ok(Self {
2770 page_schedulers,
2771 column_index: column_info.index,
2772 })
2773 }
2774
2775 fn page_info_to_scheduler(
2776 page_info: &PageInfo,
2777 page_index: usize,
2778 _column_index: usize,
2779 decompressors: &dyn DecompressionStrategy,
2780 ) -> Result<PageInfoAndScheduler> {
2781 let scheduler: Box<dyn StructuralPageScheduler> =
2782 match page_info.encoding.as_structural().layout.as_ref() {
2783 Some(pb::page_layout::Layout::MiniBlockLayout(mini_block)) => {
2784 Box::new(MiniBlockScheduler::try_new(
2785 &page_info.buffer_offsets_and_sizes,
2786 page_info.priority,
2787 mini_block.num_items,
2788 mini_block,
2789 decompressors,
2790 )?)
2791 }
2792 Some(pb::page_layout::Layout::FullZipLayout(full_zip)) => {
2793 Box::new(FullZipScheduler::try_new(
2794 &page_info.buffer_offsets_and_sizes,
2795 page_info.priority,
2796 page_info.num_rows,
2797 full_zip,
2798 decompressors,
2799 )?)
2800 }
2801 Some(pb::page_layout::Layout::AllNullLayout(all_null)) => {
2802 let def_meaning = all_null
2803 .layers
2804 .iter()
2805 .map(|l| ProtobufUtils::repdef_layer_to_def_interp(*l))
2806 .collect::<Vec<_>>();
2807 if def_meaning.len() == 1
2808 && def_meaning[0] == DefinitionInterpretation::NullableItem
2809 {
2810 Box::new(SimpleAllNullScheduler::default())
2811 as Box<dyn StructuralPageScheduler>
2812 } else {
2813 Box::new(ComplexAllNullScheduler::new(
2814 page_info.buffer_offsets_and_sizes.clone(),
2815 def_meaning.into(),
2816 )) as Box<dyn StructuralPageScheduler>
2817 }
2818 }
2819 _ => todo!(),
2820 };
2821 Ok(PageInfoAndScheduler {
2822 page_index,
2823 num_rows: page_info.num_rows,
2824 scheduler,
2825 })
2826 }
2827}
2828
2829pub trait CachedPageData: Any + Send + Sync + DeepSizeOf + 'static {
2830 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static>;
2831}
2832
2833pub struct NoCachedPageData;
2834
2835impl DeepSizeOf for NoCachedPageData {
2836 fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
2837 0
2838 }
2839}
2840impl CachedPageData for NoCachedPageData {
2841 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
2842 self
2843 }
2844}
2845
2846pub struct CachedFieldData {
2847 pages: Vec<Arc<dyn CachedPageData>>,
2848}
2849
2850impl DeepSizeOf for CachedFieldData {
2851 fn deep_size_of_children(&self, ctx: &mut Context) -> usize {
2852 self.pages.deep_size_of_children(ctx)
2853 }
2854}
2855
2856impl StructuralFieldScheduler for StructuralPrimitiveFieldScheduler {
2857 fn initialize<'a>(
2858 &'a mut self,
2859 _filter: &'a FilterExpression,
2860 context: &'a SchedulerContext,
2861 ) -> BoxFuture<'a, Result<()>> {
2862 let cache_key = self.column_index.to_string();
2863 if let Some(cached_data) = context.cache().get::<CachedFieldData>(&cache_key) {
2864 self.page_schedulers
2865 .iter_mut()
2866 .zip(cached_data.pages.iter())
2867 .for_each(|(page_scheduler, cached_data)| {
2868 page_scheduler.scheduler.load(cached_data);
2869 });
2870 return std::future::ready(Ok(())).boxed();
2871 };
2872
2873 let cache = context.cache().clone();
2874 let page_data = self
2875 .page_schedulers
2876 .iter_mut()
2877 .map(|s| s.scheduler.initialize(context.io()))
2878 .collect::<FuturesOrdered<_>>();
2879
2880 async move {
2881 let page_data = page_data.try_collect::<Vec<_>>().await?;
2882 let cached_data = Arc::new(CachedFieldData { pages: page_data });
2883 cache.insert::<CachedFieldData>(&cache_key, cached_data);
2884 Ok(())
2885 }
2886 .boxed()
2887 }
2888
2889 fn schedule_ranges<'a>(
2890 &'a self,
2891 ranges: &[Range<u64>],
2892 _filter: &FilterExpression,
2893 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
2894 let ranges = ranges.to_vec();
2895 Ok(Box::new(StructuralPrimitiveFieldSchedulingJob::new(
2896 self, ranges,
2897 )))
2898 }
2899}
2900
2901#[derive(Debug)]
2904pub struct StructuralCompositeDecodeArrayTask {
2905 tasks: Vec<Box<dyn DecodePageTask>>,
2906 should_validate: bool,
2907 data_type: DataType,
2908}
2909
2910impl StructuralCompositeDecodeArrayTask {
2911 fn restore_validity(
2912 array: Arc<dyn Array>,
2913 unraveler: &mut CompositeRepDefUnraveler,
2914 ) -> Arc<dyn Array> {
2915 let validity = unraveler.unravel_validity(array.len());
2916 let Some(validity) = validity else {
2917 return array;
2918 };
2919 if array.data_type() == &DataType::Null {
2920 return array;
2922 }
2923 assert_eq!(validity.len(), array.len());
2924 make_array(unsafe {
2927 array
2928 .to_data()
2929 .into_builder()
2930 .nulls(Some(validity))
2931 .build_unchecked()
2932 })
2933 }
2934}
2935
2936impl StructuralDecodeArrayTask for StructuralCompositeDecodeArrayTask {
2937 fn decode(self: Box<Self>) -> Result<DecodedArray> {
2938 let mut arrays = Vec::with_capacity(self.tasks.len());
2939 let mut unravelers = Vec::with_capacity(self.tasks.len());
2940 for task in self.tasks {
2941 let decoded = task.decode()?;
2942 unravelers.push(decoded.repdef);
2943
2944 let array = make_array(
2945 decoded
2946 .data
2947 .into_arrow(self.data_type.clone(), self.should_validate)?,
2948 );
2949
2950 arrays.push(array);
2951 }
2952 let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
2953 let array = arrow_select::concat::concat(&array_refs)?;
2954 let mut repdef = CompositeRepDefUnraveler::new(unravelers);
2955
2956 let array = Self::restore_validity(array, &mut repdef);
2957
2958 Ok(DecodedArray { array, repdef })
2959 }
2960}
2961
2962#[derive(Debug)]
2963pub struct StructuralPrimitiveFieldDecoder {
2964 field: Arc<ArrowField>,
2965 page_decoders: VecDeque<Box<dyn StructuralPageDecoder>>,
2966 should_validate: bool,
2967 rows_drained_in_current: u64,
2968}
2969
2970impl StructuralPrimitiveFieldDecoder {
2971 pub fn new(field: &Arc<ArrowField>, should_validate: bool) -> Self {
2972 Self {
2973 field: field.clone(),
2974 page_decoders: VecDeque::new(),
2975 should_validate,
2976 rows_drained_in_current: 0,
2977 }
2978 }
2979}
2980
2981impl StructuralFieldDecoder for StructuralPrimitiveFieldDecoder {
2982 fn accept_page(&mut self, child: LoadedPage) -> Result<()> {
2983 assert!(child.path.is_empty());
2984 self.page_decoders.push_back(child.decoder);
2985 Ok(())
2986 }
2987
2988 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
2989 let mut remaining = num_rows;
2990 let mut tasks = Vec::new();
2991 while remaining > 0 {
2992 let cur_page = self.page_decoders.front_mut().unwrap();
2993 let num_in_page = cur_page.num_rows() - self.rows_drained_in_current;
2994 let to_take = num_in_page.min(remaining);
2995
2996 let task = cur_page.drain(to_take)?;
2997 tasks.push(task);
2998
2999 if to_take == num_in_page {
3000 self.page_decoders.pop_front();
3001 self.rows_drained_in_current = 0;
3002 } else {
3003 self.rows_drained_in_current += to_take;
3004 }
3005
3006 remaining -= to_take;
3007 }
3008 Ok(Box::new(StructuralCompositeDecodeArrayTask {
3009 tasks,
3010 should_validate: self.should_validate,
3011 data_type: self.field.data_type().clone(),
3012 }))
3013 }
3014
3015 fn data_type(&self) -> &DataType {
3016 self.field.data_type()
3017 }
3018}
3019
3020struct SerializedFullZip {
3022 values: LanceBuffer,
3024 repetition_index: Option<LanceBuffer>,
3026}
3027
3028const MINIBLOCK_ALIGNMENT: usize = 8;
3048
3049pub struct PrimitiveStructuralEncoder {
3076 accumulation_queue: AccumulationQueue,
3078
3079 keep_original_array: bool,
3080 accumulated_repdefs: Vec<RepDefBuilder>,
3081 compression_strategy: Arc<dyn CompressionStrategy>,
3083 column_index: u32,
3084 field: Field,
3085 encoding_metadata: Arc<HashMap<String, String>>,
3086}
3087
3088struct CompressedLevelsChunk {
3089 data: LanceBuffer,
3090 num_levels: u16,
3091}
3092
3093struct CompressedLevels {
3094 data: Vec<CompressedLevelsChunk>,
3095 compression: pb::ArrayEncoding,
3096 rep_index: Option<LanceBuffer>,
3097}
3098
3099struct SerializedMiniBlockPage {
3100 num_buffers: u64,
3101 data: LanceBuffer,
3102 metadata: LanceBuffer,
3103}
3104
3105impl PrimitiveStructuralEncoder {
3106 pub fn try_new(
3107 options: &EncodingOptions,
3108 compression_strategy: Arc<dyn CompressionStrategy>,
3109 column_index: u32,
3110 field: Field,
3111 encoding_metadata: Arc<HashMap<String, String>>,
3112 ) -> Result<Self> {
3113 Ok(Self {
3114 accumulation_queue: AccumulationQueue::new(
3115 options.cache_bytes_per_column,
3116 column_index,
3117 options.keep_original_array,
3118 ),
3119 keep_original_array: options.keep_original_array,
3120 accumulated_repdefs: Vec::new(),
3121 column_index,
3122 compression_strategy,
3123 field,
3124 encoding_metadata,
3125 })
3126 }
3127
3128 fn is_narrow(data_block: &DataBlock) -> bool {
3136 const MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE: u64 = 256;
3137
3138 if let Some(max_len_array) = data_block.get_stat(Stat::MaxLength) {
3139 let max_len_array = max_len_array
3140 .as_any()
3141 .downcast_ref::<PrimitiveArray<UInt64Type>>()
3142 .unwrap();
3143 if max_len_array.value(0) < MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE {
3144 return true;
3145 }
3146 }
3147 false
3148 }
3149
3150 fn prefers_miniblock(
3151 data_block: &DataBlock,
3152 encoding_metadata: &HashMap<String, String>,
3153 ) -> bool {
3154 if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3156 return user_requested.to_lowercase() == STRUCTURAL_ENCODING_MINIBLOCK;
3157 }
3158 Self::is_narrow(data_block)
3160 }
3161
3162 fn prefers_fullzip(encoding_metadata: &HashMap<String, String>) -> bool {
3163 if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3167 return user_requested.to_lowercase() == STRUCTURAL_ENCODING_FULLZIP;
3168 }
3169 true
3170 }
3171
3172 fn serialize_miniblocks(
3219 miniblocks: MiniBlockCompressed,
3220 rep: Option<Vec<CompressedLevelsChunk>>,
3221 def: Option<Vec<CompressedLevelsChunk>>,
3222 ) -> SerializedMiniBlockPage {
3223 let bytes_rep = rep
3224 .as_ref()
3225 .map(|rep| rep.iter().map(|r| r.data.len()).sum::<usize>())
3226 .unwrap_or(0);
3227 let bytes_def = def
3228 .as_ref()
3229 .map(|def| def.iter().map(|d| d.data.len()).sum::<usize>())
3230 .unwrap_or(0);
3231 let bytes_data = miniblocks.data.iter().map(|d| d.len()).sum::<usize>();
3232 let mut num_buffers = miniblocks.data.len();
3233 if rep.is_some() {
3234 num_buffers += 1;
3235 }
3236 if def.is_some() {
3237 num_buffers += 1;
3238 }
3239 let max_extra = 9 * num_buffers;
3241 let mut data_buffer = Vec::with_capacity(bytes_rep + bytes_def + bytes_data + max_extra);
3242 let mut meta_buffer = Vec::with_capacity(miniblocks.chunks.len() * 2);
3243
3244 let mut rep_iter = rep.map(|r| r.into_iter());
3245 let mut def_iter = def.map(|d| d.into_iter());
3246
3247 let mut buffer_offsets = vec![0; miniblocks.data.len()];
3248 for chunk in miniblocks.chunks {
3249 let start_pos = data_buffer.len();
3250 debug_assert_eq!(start_pos % MINIBLOCK_ALIGNMENT, 0);
3252
3253 let rep = rep_iter.as_mut().map(|r| r.next().unwrap());
3254 let def = def_iter.as_mut().map(|d| d.next().unwrap());
3255
3256 let num_levels = rep
3258 .as_ref()
3259 .map(|r| r.num_levels)
3260 .unwrap_or(def.as_ref().map(|d| d.num_levels).unwrap_or(0));
3261 data_buffer.extend_from_slice(&num_levels.to_le_bytes());
3262
3263 if let Some(rep) = rep.as_ref() {
3265 let bytes_rep = u16::try_from(rep.data.len()).unwrap();
3266 data_buffer.extend_from_slice(&bytes_rep.to_le_bytes());
3267 }
3268 if let Some(def) = def.as_ref() {
3269 let bytes_def = u16::try_from(def.data.len()).unwrap();
3270 data_buffer.extend_from_slice(&bytes_def.to_le_bytes());
3271 }
3272
3273 for buffer_size in &chunk.buffer_sizes {
3274 let bytes = *buffer_size;
3275 data_buffer.extend_from_slice(&bytes.to_le_bytes());
3276 }
3277
3278 let add_padding = |data_buffer: &mut Vec<u8>| {
3280 let pad = pad_bytes::<MINIBLOCK_ALIGNMENT>(data_buffer.len());
3281 data_buffer.extend(iter::repeat_n(FILL_BYTE, pad));
3282 };
3283 add_padding(&mut data_buffer);
3284
3285 if let Some(rep) = rep.as_ref() {
3287 data_buffer.extend_from_slice(&rep.data);
3288 add_padding(&mut data_buffer);
3289 }
3290 if let Some(def) = def.as_ref() {
3291 data_buffer.extend_from_slice(&def.data);
3292 add_padding(&mut data_buffer);
3293 }
3294 for (buffer_size, (buffer, buffer_offset)) in chunk
3295 .buffer_sizes
3296 .iter()
3297 .zip(miniblocks.data.iter().zip(buffer_offsets.iter_mut()))
3298 {
3299 let start = *buffer_offset;
3300 let end = start + *buffer_size as usize;
3301 *buffer_offset += *buffer_size as usize;
3302 data_buffer.extend_from_slice(&buffer[start..end]);
3303 add_padding(&mut data_buffer);
3304 }
3305
3306 let chunk_bytes = data_buffer.len() - start_pos;
3307 assert!(chunk_bytes <= 16 * 1024);
3308 assert!(chunk_bytes > 0);
3309 assert_eq!(chunk_bytes % 8, 0);
3310 let divided_bytes = chunk_bytes / MINIBLOCK_ALIGNMENT;
3314 let divided_bytes_minus_one = (divided_bytes - 1) as u64;
3315
3316 let metadata = ((divided_bytes_minus_one << 4) | chunk.log_num_values as u64) as u16;
3317 meta_buffer.extend_from_slice(&metadata.to_le_bytes());
3318 }
3319
3320 let data_buffer = LanceBuffer::Owned(data_buffer);
3321 let metadata_buffer = LanceBuffer::Owned(meta_buffer);
3322
3323 SerializedMiniBlockPage {
3324 num_buffers: miniblocks.data.len() as u64,
3325 data: data_buffer,
3326 metadata: metadata_buffer,
3327 }
3328 }
3329
3330 fn compress_levels(
3335 mut levels: RepDefSlicer<'_>,
3336 num_elements: u64,
3337 compression_strategy: &dyn CompressionStrategy,
3338 chunks: &[MiniBlockChunk],
3339 max_rep: u16,
3341 ) -> Result<CompressedLevels> {
3342 let mut rep_index = if max_rep > 0 {
3343 Vec::with_capacity(chunks.len())
3344 } else {
3345 vec![]
3346 };
3347 let num_levels = levels.num_levels() as u64;
3349 let mut levels_buf = levels.all_levels().try_clone().unwrap();
3350 let levels_block = DataBlock::FixedWidth(FixedWidthDataBlock {
3351 data: levels_buf.borrow_and_clone(),
3352 bits_per_value: 16,
3353 num_values: num_levels,
3354 block_info: BlockInfo::new(),
3355 });
3356 let levels_field = Field::new_arrow("", DataType::UInt16, false)?;
3357 let (compressor, compressor_desc) =
3359 compression_strategy.create_block_compressor(&levels_field, &levels_block)?;
3360 let mut level_chunks = Vec::with_capacity(chunks.len());
3362 let mut values_counter = 0;
3363 for (chunk_idx, chunk) in chunks.iter().enumerate() {
3364 let chunk_num_values = chunk.num_values(values_counter, num_elements);
3365 values_counter += chunk_num_values;
3366 let mut chunk_levels = if chunk_idx < chunks.len() - 1 {
3367 levels.slice_next(chunk_num_values as usize)
3368 } else {
3369 levels.slice_rest()
3370 };
3371 let num_chunk_levels = (chunk_levels.len() / 2) as u64;
3372 if max_rep > 0 {
3373 let rep_values = chunk_levels.borrow_to_typed_slice::<u16>();
3383 let rep_values = rep_values.as_ref();
3384
3385 let mut num_rows = rep_values.iter().skip(1).filter(|v| **v == max_rep).count();
3388 let num_leftovers = if chunk_idx < chunks.len() - 1 {
3389 rep_values
3390 .iter()
3391 .rev()
3392 .position(|v| *v == max_rep)
3393 .map(|pos| pos + 1)
3395 .unwrap_or(rep_values.len())
3396 } else {
3397 0
3399 };
3400
3401 if chunk_idx != 0 && rep_values[0] == max_rep {
3402 let rep_len = rep_index.len();
3406 if rep_index[rep_len - 1] != 0 {
3407 rep_index[rep_len - 2] += 1;
3409 rep_index[rep_len - 1] = 0;
3410 }
3411 }
3412
3413 if chunk_idx == chunks.len() - 1 {
3414 num_rows += 1;
3416 }
3417 rep_index.push(num_rows as u64);
3418 rep_index.push(num_leftovers as u64);
3419 }
3420 let chunk_levels_block = DataBlock::FixedWidth(FixedWidthDataBlock {
3421 data: chunk_levels,
3422 bits_per_value: 16,
3423 num_values: num_chunk_levels,
3424 block_info: BlockInfo::new(),
3425 });
3426 let compressed_levels = compressor.compress(chunk_levels_block)?;
3427 level_chunks.push(CompressedLevelsChunk {
3428 data: compressed_levels,
3429 num_levels: num_chunk_levels as u16,
3430 });
3431 }
3432 debug_assert_eq!(levels.num_levels_remaining(), 0);
3433 let rep_index = if rep_index.is_empty() {
3434 None
3435 } else {
3436 Some(LanceBuffer::reinterpret_vec(rep_index))
3437 };
3438 Ok(CompressedLevels {
3439 data: level_chunks,
3440 compression: compressor_desc,
3441 rep_index,
3442 })
3443 }
3444
3445 fn encode_simple_all_null(
3446 column_idx: u32,
3447 num_rows: u64,
3448 row_number: u64,
3449 ) -> Result<EncodedPage> {
3450 let description = ProtobufUtils::simple_all_null_layout();
3451 Ok(EncodedPage {
3452 column_idx,
3453 data: vec![],
3454 description: PageEncoding::Structural(description),
3455 num_rows,
3456 row_number,
3457 })
3458 }
3459
3460 fn encode_complex_all_null(
3464 column_idx: u32,
3465 repdefs: Vec<RepDefBuilder>,
3466 row_number: u64,
3467 num_rows: u64,
3468 ) -> Result<EncodedPage> {
3469 let repdef = RepDefBuilder::serialize(repdefs);
3470
3471 let rep_bytes = if let Some(rep) = repdef.repetition_levels.as_ref() {
3473 LanceBuffer::reinterpret_slice(rep.clone())
3474 } else {
3475 LanceBuffer::empty()
3476 };
3477
3478 let def_bytes = if let Some(def) = repdef.definition_levels.as_ref() {
3479 LanceBuffer::reinterpret_slice(def.clone())
3480 } else {
3481 LanceBuffer::empty()
3482 };
3483
3484 let description = ProtobufUtils::all_null_layout(&repdef.def_meaning);
3485 Ok(EncodedPage {
3486 column_idx,
3487 data: vec![rep_bytes, def_bytes],
3488 description: PageEncoding::Structural(description),
3489 num_rows,
3490 row_number,
3491 })
3492 }
3493
3494 #[allow(clippy::too_many_arguments)]
3495 fn encode_miniblock(
3496 column_idx: u32,
3497 field: &Field,
3498 compression_strategy: &dyn CompressionStrategy,
3499 data: DataBlock,
3500 repdefs: Vec<RepDefBuilder>,
3501 row_number: u64,
3502 dictionary_data: Option<DataBlock>,
3503 num_rows: u64,
3504 ) -> Result<EncodedPage> {
3505 let repdef = RepDefBuilder::serialize(repdefs);
3506
3507 if let DataBlock::AllNull(_null_block) = data {
3508 todo!()
3511 }
3512
3513 let num_items = data.num_values();
3514
3515 let compressor = compression_strategy.create_miniblock_compressor(field, &data)?;
3516 let (compressed_data, value_encoding) = compressor.compress(data)?;
3517
3518 let max_rep = repdef.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
3519
3520 let mut compressed_rep = repdef
3521 .rep_slicer()
3522 .map(|rep_slicer| {
3523 Self::compress_levels(
3524 rep_slicer,
3525 num_items,
3526 compression_strategy,
3527 &compressed_data.chunks,
3528 max_rep,
3529 )
3530 })
3531 .transpose()?;
3532
3533 let (rep_index, rep_index_depth) =
3534 match compressed_rep.as_mut().and_then(|cr| cr.rep_index.as_mut()) {
3535 Some(rep_index) => (Some(rep_index.borrow_and_clone()), 1),
3536 None => (None, 0),
3537 };
3538
3539 let mut compressed_def = repdef
3540 .def_slicer()
3541 .map(|def_slicer| {
3542 Self::compress_levels(
3543 def_slicer,
3544 num_items,
3545 compression_strategy,
3546 &compressed_data.chunks,
3547 0,
3548 )
3549 })
3550 .transpose()?;
3551
3552 let rep_data = compressed_rep
3558 .as_mut()
3559 .map(|cr| std::mem::take(&mut cr.data));
3560 let def_data = compressed_def
3561 .as_mut()
3562 .map(|cd| std::mem::take(&mut cd.data));
3563
3564 let serialized = Self::serialize_miniblocks(compressed_data, rep_data, def_data);
3565
3566 let mut data = Vec::with_capacity(4);
3568 data.push(serialized.metadata);
3569 data.push(serialized.data);
3570
3571 if let Some(dictionary_data) = dictionary_data {
3572 let num_dictionary_items = dictionary_data.num_values();
3573 let dummy_dictionary_field = Field::new_arrow("", DataType::UInt16, false)?;
3575
3576 let (compressor, dictionary_encoding) = compression_strategy
3577 .create_block_compressor(&dummy_dictionary_field, &dictionary_data)?;
3578 let dictionary_buffer = compressor.compress(dictionary_data)?;
3579
3580 data.push(dictionary_buffer);
3581 if let Some(rep_index) = rep_index {
3582 data.push(rep_index);
3583 }
3584
3585 let description = ProtobufUtils::miniblock_layout(
3586 compressed_rep.map(|cr| cr.compression),
3587 compressed_def.map(|cd| cd.compression),
3588 value_encoding,
3589 rep_index_depth,
3590 serialized.num_buffers,
3591 Some((dictionary_encoding, num_dictionary_items)),
3592 &repdef.def_meaning,
3593 num_items,
3594 );
3595 Ok(EncodedPage {
3596 num_rows,
3597 column_idx,
3598 data,
3599 description: PageEncoding::Structural(description),
3600 row_number,
3601 })
3602 } else {
3603 let description = ProtobufUtils::miniblock_layout(
3604 compressed_rep.map(|cr| cr.compression),
3605 compressed_def.map(|cd| cd.compression),
3606 value_encoding,
3607 rep_index_depth,
3608 serialized.num_buffers,
3609 None,
3610 &repdef.def_meaning,
3611 num_items,
3612 );
3613
3614 if let Some(mut rep_index) = rep_index {
3615 let view = rep_index.borrow_to_typed_slice::<u64>();
3616 let total = view.chunks_exact(2).map(|c| c[0]).sum::<u64>();
3617 debug_assert_eq!(total, num_rows);
3618
3619 data.push(rep_index);
3620 }
3621
3622 Ok(EncodedPage {
3623 num_rows,
3624 column_idx,
3625 data,
3626 description: PageEncoding::Structural(description),
3627 row_number,
3628 })
3629 }
3630 }
3631
3632 fn serialize_full_zip_fixed(
3634 fixed: FixedWidthDataBlock,
3635 mut repdef: ControlWordIterator,
3636 num_values: u64,
3637 ) -> SerializedFullZip {
3638 let len = fixed.data.len() + repdef.bytes_per_word() * num_values as usize;
3639 let mut zipped_data = Vec::with_capacity(len);
3640
3641 let max_rep_index_val = if repdef.has_repetition() {
3642 len as u64
3643 } else {
3644 0
3646 };
3647 let mut rep_index_builder =
3648 BytepackedIntegerEncoder::with_capacity(num_values as usize + 1, max_rep_index_val);
3649
3650 assert_eq!(
3653 fixed.bits_per_value % 8,
3654 0,
3655 "Non-byte aligned full-zip compression not yet supported"
3656 );
3657
3658 let bytes_per_value = fixed.bits_per_value as usize / 8;
3659
3660 let mut data_iter = fixed.data.chunks_exact(bytes_per_value);
3661 let mut offset = 0;
3662 while let Some(control) = repdef.append_next(&mut zipped_data) {
3663 if control.is_new_row {
3664 debug_assert!(offset <= len);
3666 unsafe { rep_index_builder.append(offset as u64) };
3668 }
3669 if control.is_visible {
3670 let value = data_iter.next().unwrap();
3671 zipped_data.extend_from_slice(value);
3672 }
3673 offset = zipped_data.len();
3674 }
3675
3676 debug_assert_eq!(zipped_data.len(), len);
3677 unsafe {
3680 rep_index_builder.append(zipped_data.len() as u64);
3681 }
3682
3683 let zipped_data = LanceBuffer::Owned(zipped_data);
3684 let rep_index = rep_index_builder.into_data();
3685 let rep_index = if rep_index.is_empty() {
3686 None
3687 } else {
3688 Some(LanceBuffer::Owned(rep_index))
3689 };
3690 SerializedFullZip {
3691 values: zipped_data,
3692 repetition_index: rep_index,
3693 }
3694 }
3695
3696 fn serialize_full_zip_variable(
3700 mut variable: VariableWidthBlock,
3701 mut repdef: ControlWordIterator,
3702 num_items: u64,
3703 ) -> SerializedFullZip {
3704 let bytes_per_offset = variable.bits_per_offset as usize / 8;
3705 assert_eq!(
3706 variable.bits_per_offset % 8,
3707 0,
3708 "Only byte-aligned offsets supported"
3709 );
3710 let len = variable.data.len()
3711 + repdef.bytes_per_word() * num_items as usize
3712 + bytes_per_offset * variable.num_values as usize;
3713 let mut buf = Vec::with_capacity(len);
3714
3715 let max_rep_index_val = len as u64;
3716 let mut rep_index_builder =
3717 BytepackedIntegerEncoder::with_capacity(num_items as usize + 1, max_rep_index_val);
3718
3719 match bytes_per_offset {
3721 4 => {
3722 let offs = variable.offsets.borrow_to_typed_slice::<u32>();
3723 let mut rep_offset = 0;
3724 let mut windows_iter = offs.as_ref().windows(2);
3725 while let Some(control) = repdef.append_next(&mut buf) {
3726 if control.is_new_row {
3727 debug_assert!(rep_offset <= len);
3729 unsafe { rep_index_builder.append(rep_offset as u64) };
3731 }
3732 if control.is_visible {
3733 let window = windows_iter.next().unwrap();
3734 if control.is_valid_item {
3735 buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
3736 buf.extend_from_slice(
3737 &variable.data[window[0] as usize..window[1] as usize],
3738 );
3739 }
3740 }
3741 rep_offset = buf.len();
3742 }
3743 }
3744 8 => {
3745 let offs = variable.offsets.borrow_to_typed_slice::<u64>();
3746 let mut rep_offset = 0;
3747 let mut windows_iter = offs.as_ref().windows(2);
3748 while let Some(control) = repdef.append_next(&mut buf) {
3749 if control.is_new_row {
3750 debug_assert!(rep_offset <= len);
3752 unsafe { rep_index_builder.append(rep_offset as u64) };
3754 }
3755 if control.is_visible {
3756 let window = windows_iter.next().unwrap();
3757 if control.is_valid_item {
3758 buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
3759 buf.extend_from_slice(
3760 &variable.data[window[0] as usize..window[1] as usize],
3761 );
3762 }
3763 }
3764 rep_offset = buf.len();
3765 }
3766 }
3767 _ => panic!("Unsupported offset size"),
3768 }
3769
3770 debug_assert!(buf.len() <= len);
3773 unsafe {
3776 rep_index_builder.append(buf.len() as u64);
3777 }
3778
3779 let zipped_data = LanceBuffer::Owned(buf);
3780 let rep_index = rep_index_builder.into_data();
3781 debug_assert!(!rep_index.is_empty());
3782 let rep_index = Some(LanceBuffer::Owned(rep_index));
3783 SerializedFullZip {
3784 values: zipped_data,
3785 repetition_index: rep_index,
3786 }
3787 }
3788
3789 fn serialize_full_zip(
3792 compressed_data: PerValueDataBlock,
3793 repdef: ControlWordIterator,
3794 num_items: u64,
3795 ) -> SerializedFullZip {
3796 match compressed_data {
3797 PerValueDataBlock::Fixed(fixed) => {
3798 Self::serialize_full_zip_fixed(fixed, repdef, num_items)
3799 }
3800 PerValueDataBlock::Variable(var) => {
3801 Self::serialize_full_zip_variable(var, repdef, num_items)
3802 }
3803 }
3804 }
3805
3806 fn encode_full_zip(
3807 column_idx: u32,
3808 field: &Field,
3809 compression_strategy: &dyn CompressionStrategy,
3810 data: DataBlock,
3811 repdefs: Vec<RepDefBuilder>,
3812 row_number: u64,
3813 num_lists: u64,
3814 ) -> Result<EncodedPage> {
3815 let repdef = RepDefBuilder::serialize(repdefs);
3816 let max_rep = repdef
3817 .repetition_levels
3818 .as_ref()
3819 .map_or(0, |r| r.iter().max().copied().unwrap_or(0));
3820 let max_def = repdef
3821 .definition_levels
3822 .as_ref()
3823 .map_or(0, |d| d.iter().max().copied().unwrap_or(0));
3824
3825 let (num_items, num_visible_items) =
3829 if let Some(rep_levels) = repdef.repetition_levels.as_ref() {
3830 (rep_levels.len() as u64, data.num_values())
3833 } else {
3834 (data.num_values(), data.num_values())
3836 };
3837
3838 let max_visible_def = repdef.max_visible_level.unwrap_or(u16::MAX);
3839
3840 let repdef_iter = build_control_word_iterator(
3841 repdef.repetition_levels.as_deref(),
3842 max_rep,
3843 repdef.definition_levels.as_deref(),
3844 max_def,
3845 max_visible_def,
3846 num_items as usize,
3847 );
3848 let bits_rep = repdef_iter.bits_rep();
3849 let bits_def = repdef_iter.bits_def();
3850
3851 let compressor = compression_strategy.create_per_value(field, &data)?;
3852 let (compressed_data, value_encoding) = compressor.compress(data)?;
3853
3854 let description = match &compressed_data {
3855 PerValueDataBlock::Fixed(fixed) => ProtobufUtils::fixed_full_zip_layout(
3856 bits_rep,
3857 bits_def,
3858 fixed.bits_per_value as u32,
3859 value_encoding,
3860 &repdef.def_meaning,
3861 num_items as u32,
3862 num_visible_items as u32,
3863 ),
3864 PerValueDataBlock::Variable(variable) => ProtobufUtils::variable_full_zip_layout(
3865 bits_rep,
3866 bits_def,
3867 variable.bits_per_offset as u32,
3868 value_encoding,
3869 &repdef.def_meaning,
3870 num_items as u32,
3871 num_visible_items as u32,
3872 ),
3873 };
3874
3875 let zipped = Self::serialize_full_zip(compressed_data, repdef_iter, num_items);
3876
3877 let data = if let Some(repindex) = zipped.repetition_index {
3878 vec![zipped.values, repindex]
3879 } else {
3880 vec![zipped.values]
3881 };
3882
3883 Ok(EncodedPage {
3884 num_rows: num_lists,
3885 column_idx,
3886 data,
3887 description: PageEncoding::Structural(description),
3888 row_number,
3889 })
3890 }
3891
3892 fn dictionary_encode(mut data_block: DataBlock) -> (DataBlock, DataBlock) {
3893 let cardinality = data_block
3894 .get_stat(Stat::Cardinality)
3895 .unwrap()
3896 .as_primitive::<UInt64Type>()
3897 .value(0);
3898 match data_block {
3899 DataBlock::FixedWidth(ref mut fixed_width_data_block) => {
3900 let mut map = HashMap::new();
3903 let u128_slice = fixed_width_data_block.data.borrow_to_typed_slice::<u128>();
3904 let u128_slice = u128_slice.as_ref();
3905 let mut dictionary_buffer = Vec::with_capacity(cardinality as usize);
3906 let mut indices_buffer =
3907 Vec::with_capacity(fixed_width_data_block.num_values as usize);
3908 let mut curr_idx: i32 = 0;
3909 u128_slice.iter().for_each(|&value| {
3910 let idx = *map.entry(value).or_insert_with(|| {
3911 dictionary_buffer.push(value);
3912 curr_idx += 1;
3913 curr_idx - 1
3914 });
3915 indices_buffer.push(idx);
3916 });
3917 let dictionary_data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
3918 data: LanceBuffer::reinterpret_vec(dictionary_buffer),
3919 bits_per_value: 128,
3920 num_values: curr_idx as u64,
3921 block_info: BlockInfo::default(),
3922 });
3923 let mut indices_data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
3924 data: LanceBuffer::reinterpret_vec(indices_buffer),
3925 bits_per_value: 32,
3926 num_values: fixed_width_data_block.num_values,
3927 block_info: BlockInfo::default(),
3928 });
3929 indices_data_block.compute_stat();
3932
3933 (indices_data_block, dictionary_data_block)
3934 }
3935 DataBlock::VariableWidth(ref mut variable_width_data_block) => {
3936 match variable_width_data_block.bits_per_offset {
3937 32 => {
3938 let mut map = HashMap::new();
3939 let offsets = variable_width_data_block
3940 .offsets
3941 .borrow_to_typed_slice::<u32>();
3942 let offsets = offsets.as_ref();
3943
3944 let max_len = variable_width_data_block.get_stat(Stat::MaxLength).expect(
3945 "VariableWidth DataBlock should have valid `Stat::DataSize` statistics",
3946 );
3947 let max_len = max_len.as_primitive::<UInt64Type>().value(0);
3948
3949 let mut dictionary_buffer: Vec<u8> =
3950 Vec::with_capacity((max_len * cardinality) as usize);
3951 let mut dictionary_offsets_buffer = vec![0];
3952 let mut curr_idx = 0;
3953 let mut indices_buffer =
3954 Vec::with_capacity(variable_width_data_block.num_values as usize);
3955
3956 offsets
3957 .iter()
3958 .zip(offsets.iter().skip(1))
3959 .for_each(|(&start, &end)| {
3960 let key =
3961 &variable_width_data_block.data[start as usize..end as usize];
3962 let idx: i32 = *map.entry(U8SliceKey(key)).or_insert_with(|| {
3963 dictionary_buffer.extend_from_slice(key);
3964 dictionary_offsets_buffer.push(dictionary_buffer.len() as u32);
3965 curr_idx += 1;
3966 curr_idx - 1
3967 });
3968 indices_buffer.push(idx);
3969 });
3970
3971 let dictionary_data_block = DataBlock::VariableWidth(VariableWidthBlock {
3972 data: LanceBuffer::reinterpret_vec(dictionary_buffer),
3973 offsets: LanceBuffer::reinterpret_vec(dictionary_offsets_buffer),
3974 bits_per_offset: 32,
3975 num_values: curr_idx as u64,
3976 block_info: BlockInfo::default(),
3977 });
3978
3979 let mut indices_data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
3980 data: LanceBuffer::reinterpret_vec(indices_buffer),
3981 bits_per_value: 32,
3982 num_values: variable_width_data_block.num_values,
3983 block_info: BlockInfo::default(),
3984 });
3985 indices_data_block.compute_stat();
3988
3989 (indices_data_block, dictionary_data_block)
3990 }
3991 64 => {
3992 todo!("A follow up PR to support dictionary encoding with dictionary type `VariableWidth DataBlock` with bits_per_offset 64");
3993 }
3994 _ => {
3995 unreachable!()
3996 }
3997 }
3998 }
3999 _ => {
4000 unreachable!("dictionary encode called with data block {:?}", data_block)
4001 }
4002 }
4003 }
4004
4005 fn should_dictionary_encode(data_block: &DataBlock) -> bool {
4006 let too_small = env::var("LANCE_ENCODING_DICT_TOO_SMALL")
4008 .ok()
4009 .and_then(|val| val.parse().ok())
4010 .unwrap_or(100);
4011 if data_block.num_values() < too_small {
4012 return false;
4013 }
4014
4015 let divisor = env::var("LANCE_ENCODING_DICT_DIVISOR")
4018 .ok()
4019 .and_then(|val| val.parse().ok())
4020 .unwrap_or(2);
4021
4022 let max_cardinality = env::var("LANCE_ENCODING_DICT_MAX_CARDINALITY")
4025 .ok()
4026 .and_then(|val| val.parse().ok())
4027 .unwrap_or(100000);
4028
4029 let threshold = (data_block.num_values() / divisor).min(max_cardinality);
4030
4031 let cardinality = if let Some(cardinality_array) = data_block.get_stat(Stat::Cardinality) {
4032 cardinality_array.as_primitive::<UInt64Type>().value(0)
4033 } else {
4034 u64::MAX
4035 };
4036
4037 cardinality < threshold
4038 }
4039
4040 fn do_flush(
4042 &mut self,
4043 arrays: Vec<ArrayRef>,
4044 repdefs: Vec<RepDefBuilder>,
4045 row_number: u64,
4046 num_rows: u64,
4047 ) -> Result<Vec<EncodeTask>> {
4048 let column_idx = self.column_index;
4049 let compression_strategy = self.compression_strategy.clone();
4050 let field = self.field.clone();
4051 let encoding_metadata = self.encoding_metadata.clone();
4052 let task = spawn_cpu(move || {
4053 let num_values = arrays.iter().map(|arr| arr.len() as u64).sum();
4054
4055 if num_values == 0 {
4056 return Self::encode_complex_all_null(column_idx, repdefs, row_number, num_rows);
4060 }
4061 let num_nulls = arrays
4062 .iter()
4063 .map(|arr| arr.logical_nulls().map(|n| n.null_count()).unwrap_or(0) as u64)
4064 .sum::<u64>();
4065
4066 if num_values == num_nulls {
4067 return if repdefs.iter().all(|rd| rd.is_simple_validity()) {
4068 log::debug!(
4069 "Encoding column {} with {} items using simple-null layout",
4070 column_idx,
4071 num_values
4072 );
4073 Self::encode_simple_all_null(column_idx, num_values, row_number)
4075 } else {
4076 Self::encode_complex_all_null(column_idx, repdefs, row_number, num_rows)
4078 };
4079 }
4080
4081 let data_block = DataBlock::from_arrays(&arrays, num_values);
4082
4083 if let DataBlock::Struct(ref struct_data_block) = data_block {
4085 if struct_data_block
4086 .children
4087 .iter()
4088 .any(|child| !matches!(child, DataBlock::FixedWidth(_)))
4089 {
4090 panic!("packed struct encoding currently only supports fixed-width fields.")
4091 }
4092 }
4093
4094 let data_block = data_block.remove_outer_validity();
4096
4097
4098 if Self::should_dictionary_encode(&data_block) {
4099 log::debug!(
4100 "Encoding column {} with {} items using dictionary encoding (mini-block layout)",
4101 column_idx,
4102 num_values
4103 );
4104 let (indices_data_block, dictionary_data_block) =
4105 Self::dictionary_encode(data_block);
4106 Self::encode_miniblock(
4107 column_idx,
4108 &field,
4109 compression_strategy.as_ref(),
4110 indices_data_block,
4111 repdefs,
4112 row_number,
4113 Some(dictionary_data_block),
4114 num_rows,
4115 )
4116 } else if Self::prefers_miniblock(&data_block, encoding_metadata.as_ref()) {
4117 log::debug!(
4118 "Encoding column {} with {} items using mini-block layout",
4119 column_idx,
4120 num_values
4121 );
4122 Self::encode_miniblock(
4123 column_idx,
4124 &field,
4125 compression_strategy.as_ref(),
4126 data_block,
4127 repdefs,
4128 row_number,
4129 None,
4130 num_rows,
4131 )
4132 } else if Self::prefers_fullzip(encoding_metadata.as_ref()) {
4133 log::debug!(
4134 "Encoding column {} with {} items using full-zip layout",
4135 column_idx,
4136 num_values
4137 );
4138 Self::encode_full_zip(
4139 column_idx,
4140 &field,
4141 compression_strategy.as_ref(),
4142 data_block,
4143 repdefs,
4144 row_number,
4145 num_rows,
4146 )
4147 } else {
4148 Err(Error::InvalidInput { source: format!("Cannot determine structural encoding for field {}. This typically indicates an invalid value of the field metadata key {}", field.name, STRUCTURAL_ENCODING_META_KEY).into(), location: location!() })
4149 }
4150 })
4151 .boxed();
4152 Ok(vec![task])
4153 }
4154
4155 fn extract_validity_buf(
4156 array: &dyn Array,
4157 repdef: &mut RepDefBuilder,
4158 keep_original_array: bool,
4159 ) {
4160 if let Some(validity) = array.nulls() {
4161 if keep_original_array {
4162 repdef.add_validity_bitmap(validity.clone());
4163 } else {
4164 repdef.add_validity_bitmap(deep_copy_nulls(Some(validity)).unwrap());
4165 }
4166 } else {
4167 repdef.add_no_null(array.len());
4168 }
4169 }
4170
4171 fn extract_validity(array: &dyn Array, repdef: &mut RepDefBuilder, keep_original_array: bool) {
4172 match array.data_type() {
4173 DataType::Null => {
4174 repdef.add_validity_bitmap(NullBuffer::new(BooleanBuffer::new_unset(array.len())));
4175 }
4176 DataType::Dictionary(_, _) => {
4177 unreachable!()
4178 }
4179 _ => Self::extract_validity_buf(array, repdef, keep_original_array),
4188 }
4189 }
4190}
4191
4192impl FieldEncoder for PrimitiveStructuralEncoder {
4193 fn maybe_encode(
4195 &mut self,
4196 array: ArrayRef,
4197 _external_buffers: &mut OutOfLineBuffers,
4198 mut repdef: RepDefBuilder,
4199 row_number: u64,
4200 num_rows: u64,
4201 ) -> Result<Vec<EncodeTask>> {
4202 Self::extract_validity(array.as_ref(), &mut repdef, self.keep_original_array);
4203 self.accumulated_repdefs.push(repdef);
4204
4205 if let Some((arrays, row_number, num_rows)) =
4206 self.accumulation_queue.insert(array, row_number, num_rows)
4207 {
4208 let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
4209 Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
4210 } else {
4211 Ok(vec![])
4212 }
4213 }
4214
4215 fn flush(&mut self, _external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
4217 if let Some((arrays, row_number, num_rows)) = self.accumulation_queue.flush() {
4218 let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
4219 Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
4220 } else {
4221 Ok(vec![])
4222 }
4223 }
4224
4225 fn num_columns(&self) -> u32 {
4226 1
4227 }
4228
4229 fn finish(
4230 &mut self,
4231 _external_buffers: &mut OutOfLineBuffers,
4232 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
4233 std::future::ready(Ok(vec![EncodedColumn::default()])).boxed()
4234 }
4235}
4236
4237#[cfg(test)]
4238#[allow(clippy::single_range_in_vec_init)]
4239mod tests {
4240 use std::{collections::VecDeque, sync::Arc};
4241
4242 use crate::encodings::logical::primitive::{
4243 ChunkDrainInstructions, PrimitiveStructuralEncoder,
4244 };
4245 use arrow_array::{ArrayRef, Int8Array, StringArray};
4246
4247 use super::{
4248 ChunkInstructions, DataBlock, DecodeMiniBlockTask, FixedPerValueDecompressor,
4249 FixedWidthDataBlock, FullZipCacheableState, FullZipDecodeDetails, FullZipRepIndexDetails,
4250 FullZipScheduler, PerValueDecompressor, PreambleAction, RepetitionIndex,
4251 StructuralPageScheduler,
4252 };
4253
4254 #[test]
4255 fn test_is_narrow() {
4256 let int8_array = Int8Array::from(vec![1, 2, 3]);
4257 let array_ref: ArrayRef = Arc::new(int8_array);
4258 let block = DataBlock::from_array(array_ref);
4259
4260 assert!(PrimitiveStructuralEncoder::is_narrow(&block));
4261
4262 let string_array = StringArray::from(vec![Some("hello"), Some("world")]);
4263 let block = DataBlock::from_array(string_array);
4264 assert!(PrimitiveStructuralEncoder::is_narrow(&block));
4265
4266 let string_array = StringArray::from(vec![
4267 Some("hello world".repeat(100)),
4268 Some("world".to_string()),
4269 ]);
4270 let block = DataBlock::from_array(string_array);
4271 assert!((!PrimitiveStructuralEncoder::is_narrow(&block)));
4272 }
4273
4274 #[test]
4275 fn test_map_range() {
4276 let rep = Some(vec![1, 0, 0, 1, 0, 1, 1, 0, 0]);
4279 let def = Some(vec![0, 0, 0, 0, 0, 1, 0, 0, 0]);
4280 let max_visible_def = 0;
4281 let total_items = 8;
4282 let max_rep = 1;
4283
4284 let check = |range, expected_item_range, expected_level_range| {
4285 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4286 range,
4287 rep.as_ref(),
4288 def.as_ref(),
4289 max_rep,
4290 max_visible_def,
4291 total_items,
4292 PreambleAction::Absent,
4293 );
4294 assert_eq!(item_range, expected_item_range);
4295 assert_eq!(level_range, expected_level_range);
4296 };
4297
4298 check(0..1, 0..3, 0..3);
4299 check(1..2, 3..5, 3..5);
4300 check(2..3, 5..5, 5..6);
4301 check(3..4, 5..8, 6..9);
4302 check(0..2, 0..5, 0..5);
4303 check(1..3, 3..5, 3..6);
4304 check(2..4, 5..8, 5..9);
4305 check(0..3, 0..5, 0..6);
4306 check(1..4, 3..8, 3..9);
4307 check(0..4, 0..8, 0..9);
4308
4309 let rep = Some(vec![1, 1, 0, 1]);
4312 let def = Some(vec![1, 0, 0, 0]);
4313 let max_visible_def = 0;
4314 let total_items = 3;
4315
4316 let check = |range, expected_item_range, expected_level_range| {
4317 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4318 range,
4319 rep.as_ref(),
4320 def.as_ref(),
4321 max_rep,
4322 max_visible_def,
4323 total_items,
4324 PreambleAction::Absent,
4325 );
4326 assert_eq!(item_range, expected_item_range);
4327 assert_eq!(level_range, expected_level_range);
4328 };
4329
4330 check(0..1, 0..0, 0..1);
4331 check(1..2, 0..2, 1..3);
4332 check(2..3, 2..3, 3..4);
4333 check(0..2, 0..2, 0..3);
4334 check(1..3, 0..3, 1..4);
4335 check(0..3, 0..3, 0..4);
4336
4337 let rep = Some(vec![1, 1, 0, 1]);
4340 let def = Some(vec![0, 0, 0, 1]);
4341 let max_visible_def = 0;
4342 let total_items = 3;
4343
4344 let check = |range, expected_item_range, expected_level_range| {
4345 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4346 range,
4347 rep.as_ref(),
4348 def.as_ref(),
4349 max_rep,
4350 max_visible_def,
4351 total_items,
4352 PreambleAction::Absent,
4353 );
4354 assert_eq!(item_range, expected_item_range);
4355 assert_eq!(level_range, expected_level_range);
4356 };
4357
4358 check(0..1, 0..1, 0..1);
4359 check(1..2, 1..3, 1..3);
4360 check(2..3, 3..3, 3..4);
4361 check(0..2, 0..3, 0..3);
4362 check(1..3, 1..3, 1..4);
4363 check(0..3, 0..3, 0..4);
4364
4365 let rep = Some(vec![1, 0, 1, 0, 1, 0]);
4368 let def: Option<&[u16]> = None;
4369 let max_visible_def = 0;
4370 let total_items = 6;
4371
4372 let check = |range, expected_item_range, expected_level_range| {
4373 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4374 range,
4375 rep.as_ref(),
4376 def.as_ref(),
4377 max_rep,
4378 max_visible_def,
4379 total_items,
4380 PreambleAction::Absent,
4381 );
4382 assert_eq!(item_range, expected_item_range);
4383 assert_eq!(level_range, expected_level_range);
4384 };
4385
4386 check(0..1, 0..2, 0..2);
4387 check(1..2, 2..4, 2..4);
4388 check(2..3, 4..6, 4..6);
4389 check(0..2, 0..4, 0..4);
4390 check(1..3, 2..6, 2..6);
4391 check(0..3, 0..6, 0..6);
4392
4393 let rep: Option<&[u16]> = None;
4396 let def = Some(vec![0, 0, 1, 0]);
4397 let max_visible_def = 1;
4398 let total_items = 4;
4399
4400 let check = |range, expected_item_range, expected_level_range| {
4401 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4402 range,
4403 rep.as_ref(),
4404 def.as_ref(),
4405 max_rep,
4406 max_visible_def,
4407 total_items,
4408 PreambleAction::Absent,
4409 );
4410 assert_eq!(item_range, expected_item_range);
4411 assert_eq!(level_range, expected_level_range);
4412 };
4413
4414 check(0..1, 0..1, 0..1);
4415 check(1..2, 1..2, 1..2);
4416 check(2..3, 2..3, 2..3);
4417 check(0..2, 0..2, 0..2);
4418 check(1..3, 1..3, 1..3);
4419 check(0..3, 0..3, 0..3);
4420
4421 let rep = Some(vec![0, 1, 0, 1]);
4426 let def = Some(vec![0, 0, 0, 1]);
4427 let max_visible_def = 0;
4428 let total_items = 3;
4429
4430 let check = |range, expected_item_range, expected_level_range| {
4431 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4432 range,
4433 rep.as_ref(),
4434 def.as_ref(),
4435 max_rep,
4436 max_visible_def,
4437 total_items,
4438 PreambleAction::Take,
4439 );
4440 assert_eq!(item_range, expected_item_range);
4441 assert_eq!(level_range, expected_level_range);
4442 };
4443
4444 check(0..1, 0..3, 0..3);
4446 check(0..2, 0..3, 0..4);
4447
4448 let check = |range, expected_item_range, expected_level_range| {
4449 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4450 range,
4451 rep.as_ref(),
4452 def.as_ref(),
4453 max_rep,
4454 max_visible_def,
4455 total_items,
4456 PreambleAction::Skip,
4457 );
4458 assert_eq!(item_range, expected_item_range);
4459 assert_eq!(level_range, expected_level_range);
4460 };
4461
4462 check(0..1, 1..3, 1..3);
4463 check(1..2, 3..3, 3..4);
4464 check(0..2, 1..3, 1..4);
4465
4466 let rep = Some(vec![0, 1, 1, 0]);
4471 let def = Some(vec![0, 1, 0, 0]);
4472 let max_visible_def = 0;
4473 let total_items = 4;
4474
4475 let check = |range, expected_item_range, expected_level_range| {
4476 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4477 range,
4478 rep.as_ref(),
4479 def.as_ref(),
4480 max_rep,
4481 max_visible_def,
4482 total_items,
4483 PreambleAction::Take,
4484 );
4485 assert_eq!(item_range, expected_item_range);
4486 assert_eq!(level_range, expected_level_range);
4487 };
4488
4489 check(0..1, 0..1, 0..2);
4491 check(0..2, 0..3, 0..4);
4492
4493 let check = |range, expected_item_range, expected_level_range| {
4494 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4495 range,
4496 rep.as_ref(),
4497 def.as_ref(),
4498 max_rep,
4499 max_visible_def,
4500 total_items,
4501 PreambleAction::Skip,
4502 );
4503 assert_eq!(item_range, expected_item_range);
4504 assert_eq!(level_range, expected_level_range);
4505 };
4506
4507 check(0..1, 1..1, 1..2);
4509 check(1..2, 1..3, 2..4);
4510 check(0..2, 1..3, 1..4);
4511
4512 let rep = Some(vec![0, 1, 0, 1]);
4515 let def: Option<Vec<u16>> = None;
4516 let max_visible_def = 0;
4517 let total_items = 4;
4518
4519 let check = |range, expected_item_range, expected_level_range| {
4520 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4521 range,
4522 rep.as_ref(),
4523 def.as_ref(),
4524 max_rep,
4525 max_visible_def,
4526 total_items,
4527 PreambleAction::Take,
4528 );
4529 assert_eq!(item_range, expected_item_range);
4530 assert_eq!(level_range, expected_level_range);
4531 };
4532
4533 check(0..1, 0..3, 0..3);
4535 check(0..2, 0..4, 0..4);
4536
4537 let check = |range, expected_item_range, expected_level_range| {
4538 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4539 range,
4540 rep.as_ref(),
4541 def.as_ref(),
4542 max_rep,
4543 max_visible_def,
4544 total_items,
4545 PreambleAction::Skip,
4546 );
4547 assert_eq!(item_range, expected_item_range);
4548 assert_eq!(level_range, expected_level_range);
4549 };
4550
4551 check(0..1, 1..3, 1..3);
4552 check(1..2, 3..4, 3..4);
4553 check(0..2, 1..4, 1..4);
4554 }
4555
4556 #[test]
4557 fn test_schedule_instructions() {
4558 let repetition_index = vec![vec![5, 2], vec![3, 0], vec![4, 7], vec![2, 0]];
4559 let repetition_index = RepetitionIndex::decode(&repetition_index);
4560
4561 let check = |user_ranges, expected_instructions| {
4562 let instructions =
4563 ChunkInstructions::schedule_instructions(&repetition_index, user_ranges);
4564 assert_eq!(instructions, expected_instructions);
4565 };
4566
4567 let expected_take_all = vec![
4569 ChunkInstructions {
4570 chunk_idx: 0,
4571 preamble: PreambleAction::Absent,
4572 rows_to_skip: 0,
4573 rows_to_take: 5,
4574 take_trailer: true,
4575 },
4576 ChunkInstructions {
4577 chunk_idx: 1,
4578 preamble: PreambleAction::Take,
4579 rows_to_skip: 0,
4580 rows_to_take: 2,
4581 take_trailer: false,
4582 },
4583 ChunkInstructions {
4584 chunk_idx: 2,
4585 preamble: PreambleAction::Absent,
4586 rows_to_skip: 0,
4587 rows_to_take: 4,
4588 take_trailer: true,
4589 },
4590 ChunkInstructions {
4591 chunk_idx: 3,
4592 preamble: PreambleAction::Take,
4593 rows_to_skip: 0,
4594 rows_to_take: 1,
4595 take_trailer: false,
4596 },
4597 ];
4598
4599 check(&[0..14], expected_take_all.clone());
4601
4602 check(
4604 &[
4605 0..1,
4606 1..2,
4607 2..3,
4608 3..4,
4609 4..5,
4610 5..6,
4611 6..7,
4612 7..8,
4613 8..9,
4614 9..10,
4615 10..11,
4616 11..12,
4617 12..13,
4618 13..14,
4619 ],
4620 expected_take_all,
4621 );
4622
4623 check(
4627 &[0..1, 3..4],
4628 vec![
4629 ChunkInstructions {
4630 chunk_idx: 0,
4631 preamble: PreambleAction::Absent,
4632 rows_to_skip: 0,
4633 rows_to_take: 1,
4634 take_trailer: false,
4635 },
4636 ChunkInstructions {
4637 chunk_idx: 0,
4638 preamble: PreambleAction::Absent,
4639 rows_to_skip: 3,
4640 rows_to_take: 1,
4641 take_trailer: false,
4642 },
4643 ],
4644 );
4645
4646 check(
4648 &[5..6],
4649 vec![
4650 ChunkInstructions {
4651 chunk_idx: 0,
4652 preamble: PreambleAction::Absent,
4653 rows_to_skip: 5,
4654 rows_to_take: 0,
4655 take_trailer: true,
4656 },
4657 ChunkInstructions {
4658 chunk_idx: 1,
4659 preamble: PreambleAction::Take,
4660 rows_to_skip: 0,
4661 rows_to_take: 0,
4662 take_trailer: false,
4663 },
4664 ],
4665 );
4666
4667 check(
4669 &[7..10],
4670 vec![
4671 ChunkInstructions {
4672 chunk_idx: 1,
4673 preamble: PreambleAction::Skip,
4674 rows_to_skip: 1,
4675 rows_to_take: 1,
4676 take_trailer: false,
4677 },
4678 ChunkInstructions {
4679 chunk_idx: 2,
4680 preamble: PreambleAction::Absent,
4681 rows_to_skip: 0,
4682 rows_to_take: 2,
4683 take_trailer: false,
4684 },
4685 ],
4686 );
4687 }
4688
4689 #[test]
4690 fn test_drain_instructions() {
4691 fn drain_from_instructions(
4692 instructions: &mut VecDeque<ChunkInstructions>,
4693 mut rows_desired: u64,
4694 need_preamble: &mut bool,
4695 skip_in_chunk: &mut u64,
4696 ) -> Vec<ChunkDrainInstructions> {
4697 let mut drain_instructions = Vec::with_capacity(instructions.len());
4699 while rows_desired > 0 || *need_preamble {
4700 let (next_instructions, consumed_chunk) = instructions
4701 .front()
4702 .unwrap()
4703 .drain_from_instruction(&mut rows_desired, need_preamble, skip_in_chunk);
4704 if consumed_chunk {
4705 instructions.pop_front();
4706 }
4707 drain_instructions.push(next_instructions);
4708 }
4709 drain_instructions
4710 }
4711
4712 let repetition_index = vec![vec![5, 2], vec![3, 0], vec![4, 7], vec![2, 0]];
4713 let repetition_index = RepetitionIndex::decode(&repetition_index);
4714 let user_ranges = vec![1..7, 10..14];
4715
4716 let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
4718
4719 let mut to_drain = VecDeque::from(scheduled.clone());
4720
4721 let mut need_preamble = false;
4724 let mut skip_in_chunk = 0;
4725
4726 let next_batch =
4727 drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
4728
4729 assert!(!need_preamble);
4730 assert_eq!(skip_in_chunk, 4);
4731 assert_eq!(
4732 next_batch,
4733 vec![ChunkDrainInstructions {
4734 chunk_instructions: scheduled[0].clone(),
4735 rows_to_take: 4,
4736 rows_to_skip: 0,
4737 preamble_action: PreambleAction::Absent,
4738 }]
4739 );
4740
4741 let next_batch =
4742 drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
4743
4744 assert!(!need_preamble);
4745 assert_eq!(skip_in_chunk, 2);
4746
4747 assert_eq!(
4748 next_batch,
4749 vec![
4750 ChunkDrainInstructions {
4751 chunk_instructions: scheduled[0].clone(),
4752 rows_to_take: 1,
4753 rows_to_skip: 4,
4754 preamble_action: PreambleAction::Absent,
4755 },
4756 ChunkDrainInstructions {
4757 chunk_instructions: scheduled[1].clone(),
4758 rows_to_take: 1,
4759 rows_to_skip: 0,
4760 preamble_action: PreambleAction::Take,
4761 },
4762 ChunkDrainInstructions {
4763 chunk_instructions: scheduled[2].clone(),
4764 rows_to_take: 2,
4765 rows_to_skip: 0,
4766 preamble_action: PreambleAction::Absent,
4767 }
4768 ]
4769 );
4770
4771 let next_batch =
4772 drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
4773
4774 assert!(!need_preamble);
4775 assert_eq!(skip_in_chunk, 0);
4776
4777 assert_eq!(
4778 next_batch,
4779 vec![
4780 ChunkDrainInstructions {
4781 chunk_instructions: scheduled[2].clone(),
4782 rows_to_take: 1,
4783 rows_to_skip: 2,
4784 preamble_action: PreambleAction::Absent,
4785 },
4786 ChunkDrainInstructions {
4787 chunk_instructions: scheduled[3].clone(),
4788 rows_to_take: 1,
4789 rows_to_skip: 0,
4790 preamble_action: PreambleAction::Take,
4791 },
4792 ]
4793 );
4794
4795 let repetition_index = vec![vec![5, 2], vec![3, 3], vec![20, 0]];
4797 let repetition_index = RepetitionIndex::decode(&repetition_index);
4798 let user_ranges = vec![0..28];
4799
4800 let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
4802
4803 let mut to_drain = VecDeque::from(scheduled.clone());
4804
4805 let mut need_preamble = false;
4808 let mut skip_in_chunk = 0;
4809
4810 let next_batch =
4811 drain_from_instructions(&mut to_drain, 7, &mut need_preamble, &mut skip_in_chunk);
4812
4813 assert_eq!(
4814 next_batch,
4815 vec![
4816 ChunkDrainInstructions {
4817 chunk_instructions: scheduled[0].clone(),
4818 rows_to_take: 6,
4819 rows_to_skip: 0,
4820 preamble_action: PreambleAction::Absent,
4821 },
4822 ChunkDrainInstructions {
4823 chunk_instructions: scheduled[1].clone(),
4824 rows_to_take: 1,
4825 rows_to_skip: 0,
4826 preamble_action: PreambleAction::Take,
4827 },
4828 ]
4829 );
4830
4831 assert!(!need_preamble);
4832 assert_eq!(skip_in_chunk, 1);
4833
4834 let next_batch =
4837 drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
4838
4839 assert_eq!(
4840 next_batch,
4841 vec![
4842 ChunkDrainInstructions {
4843 chunk_instructions: scheduled[1].clone(),
4844 rows_to_take: 2,
4845 rows_to_skip: 1,
4846 preamble_action: PreambleAction::Skip,
4847 },
4848 ChunkDrainInstructions {
4849 chunk_instructions: scheduled[2].clone(),
4850 rows_to_take: 0,
4851 rows_to_skip: 0,
4852 preamble_action: PreambleAction::Take,
4853 },
4854 ]
4855 );
4856
4857 assert!(!need_preamble);
4858 assert_eq!(skip_in_chunk, 0);
4859 }
4860
4861 #[tokio::test]
4862 async fn test_fullzip_repetition_index_caching() {
4863 use crate::testing::SimulatedScheduler;
4864 use lance_core::cache::LanceCache;
4865
4866 #[derive(Debug)]
4868 struct TestFixedDecompressor;
4869
4870 impl FixedPerValueDecompressor for TestFixedDecompressor {
4871 fn decompress(
4872 &self,
4873 _data: FixedWidthDataBlock,
4874 _num_rows: u64,
4875 ) -> crate::Result<DataBlock> {
4876 unimplemented!("Test decompressor")
4877 }
4878
4879 fn bits_per_value(&self) -> u64 {
4880 32
4881 }
4882 }
4883
4884 let rows_in_page = 100u64;
4886 let bytes_per_value = 4u64;
4887 let _rep_index_size = (rows_in_page + 1) * bytes_per_value;
4888
4889 let mut rep_index_data = Vec::new();
4891 for i in 0..=rows_in_page {
4892 let offset = (i * 100) as u32; rep_index_data.extend_from_slice(&offset.to_le_bytes());
4894 }
4895
4896 let mut full_data = vec![0u8; 1000];
4898 full_data.extend_from_slice(&rep_index_data);
4899 full_data.extend_from_slice(&vec![0u8; 10000]); let data = bytes::Bytes::from(full_data);
4902 let io = Arc::new(SimulatedScheduler::new(data));
4903 let _cache = Arc::new(LanceCache::with_capacity(1024 * 1024));
4904
4905 let mut scheduler = FullZipScheduler {
4907 data_buf_position: 0,
4908 rep_index: Some(FullZipRepIndexDetails {
4909 buf_position: 1000,
4910 bytes_per_value,
4911 }),
4912 priority: 0,
4913 rows_in_page,
4914 bits_per_offset: 32,
4915 details: Arc::new(FullZipDecodeDetails {
4916 value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
4917 def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
4918 ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
4919 max_rep: 0,
4920 max_visible_def: 0,
4921 }),
4922 cached_state: None,
4923 };
4924
4925 let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
4927 let cached_data1 = scheduler.initialize(&io_dyn).await.unwrap();
4928
4929 let is_cached = cached_data1
4931 .clone()
4932 .as_arc_any()
4933 .downcast::<FullZipCacheableState>()
4934 .is_ok();
4935 assert!(
4936 is_cached,
4937 "Expected FullZipCacheableState, got NoCachedPageData"
4938 );
4939
4940 scheduler.load(&cached_data1);
4942
4943 assert!(
4945 scheduler.cached_state.is_some(),
4946 "cached_state should be populated after load"
4947 );
4948
4949 let cached_state = scheduler.cached_state.as_ref().unwrap();
4951
4952 let ranges = vec![0..10, 20..30];
4954 let result = scheduler.schedule_ranges_rep(
4955 &ranges,
4956 &io_dyn,
4957 &FullZipRepIndexDetails {
4958 buf_position: 1000,
4959 bytes_per_value,
4960 },
4961 );
4962
4963 assert!(
4965 result.is_ok(),
4966 "schedule_ranges_rep should succeed with cached data"
4967 );
4968
4969 let mut scheduler2 = FullZipScheduler {
4971 data_buf_position: 0,
4972 rep_index: Some(FullZipRepIndexDetails {
4973 buf_position: 1000,
4974 bytes_per_value,
4975 }),
4976 priority: 0,
4977 rows_in_page,
4978 bits_per_offset: 32,
4979 details: scheduler.details.clone(),
4980 cached_state: None,
4981 };
4982
4983 scheduler2.load(&cached_data1);
4985 assert!(
4986 scheduler2.cached_state.is_some(),
4987 "Second scheduler should have cached_state after load"
4988 );
4989
4990 let cached_state2 = scheduler2.cached_state.as_ref().unwrap();
4992 assert!(
4993 Arc::ptr_eq(cached_state, cached_state2),
4994 "Both schedulers should share the same cached data"
4995 );
4996 }
4997}