1use std::{
5 any::Any,
6 collections::{HashMap, VecDeque},
7 fmt::Debug,
8 iter,
9 ops::Range,
10 sync::Arc,
11 vec,
12};
13
14use arrow::array::AsArray;
15use arrow_array::{make_array, types::UInt64Type, Array, ArrayRef, PrimitiveArray};
16use arrow_buffer::{bit_util, BooleanBuffer, NullBuffer, ScalarBuffer};
17use arrow_schema::{DataType, Field as ArrowField};
18use futures::{future::BoxFuture, stream::FuturesOrdered, FutureExt, TryStreamExt};
19use itertools::Itertools;
20use lance_arrow::deepcopy::deep_copy_array;
21use lance_core::{
22 cache::{Context, DeepSizeOf},
23 datatypes::{
24 STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK,
25 },
26 error::Error,
27 utils::bit::pad_bytes,
28 utils::hash::U8SliceKey,
29};
30use log::{debug, trace};
31use snafu::location;
32
33use crate::repdef::{
34 build_control_word_iterator, CompositeRepDefUnraveler, ControlWordIterator, ControlWordParser,
35 DefinitionInterpretation, RepDefSlicer,
36};
37use crate::statistics::{ComputeStat, GetStat, Stat};
38use crate::utils::bytepack::ByteUnpacker;
39use crate::{
40 data::{AllNullDataBlock, DataBlock, VariableWidthBlock},
41 utils::bytepack::BytepackedIntegerEncoder,
42};
43use crate::{
44 decoder::{FixedPerValueDecompressor, VariablePerValueDecompressor},
45 encoder::PerValueDataBlock,
46};
47use lance_core::{datatypes::Field, utils::tokio::spawn_cpu, Result};
48
49use crate::{
50 buffer::LanceBuffer,
51 data::{BlockInfo, DataBlockBuilder, FixedWidthDataBlock},
52 decoder::{
53 BlockDecompressor, ColumnInfo, DecodeArrayTask, DecodePageTask, DecodedArray, DecodedPage,
54 DecompressorStrategy, FieldScheduler, FilterExpression, LoadedPage, LogicalPageDecoder,
55 MessageType, MiniBlockDecompressor, NextDecodeTask, PageEncoding, PageInfo, PageScheduler,
56 PrimitivePageDecoder, PriorityRange, ScheduledScanLine, SchedulerContext, SchedulingJob,
57 StructuralDecodeArrayTask, StructuralFieldDecoder, StructuralFieldScheduler,
58 StructuralPageDecoder, StructuralSchedulingJob, UnloadedPage,
59 },
60 encoder::{
61 ArrayEncodingStrategy, CompressionStrategy, EncodeTask, EncodedColumn, EncodedPage,
62 EncodingOptions, FieldEncoder, MiniBlockChunk, MiniBlockCompressed, OutOfLineBuffers,
63 },
64 encodings::physical::{decoder_from_array_encoding, ColumnBuffers, PageBuffers},
65 format::{pb, ProtobufUtils},
66 repdef::{LevelBuffer, RepDefBuilder, RepDefUnraveler},
67 EncodingsIo,
68};
69
70const FILL_BYTE: u8 = 0xFE;
71
72#[derive(Debug)]
73struct PrimitivePage {
74 scheduler: Box<dyn PageScheduler>,
75 num_rows: u64,
76 page_index: u32,
77}
78
79#[derive(Debug)]
89pub struct PrimitiveFieldScheduler {
90 data_type: DataType,
91 page_schedulers: Vec<PrimitivePage>,
92 num_rows: u64,
93 should_validate: bool,
94 column_index: u32,
95}
96
97impl PrimitiveFieldScheduler {
98 pub fn new(
99 column_index: u32,
100 data_type: DataType,
101 pages: Arc<[PageInfo]>,
102 buffers: ColumnBuffers,
103 should_validate: bool,
104 ) -> Self {
105 let page_schedulers = pages
106 .iter()
107 .enumerate()
108 .filter(|(page_index, page)| {
110 log::trace!("Skipping empty page with index {}", page_index);
111 page.num_rows > 0
112 })
113 .map(|(page_index, page)| {
114 let page_buffers = PageBuffers {
115 column_buffers: buffers,
116 positions_and_sizes: &page.buffer_offsets_and_sizes,
117 };
118 let scheduler = decoder_from_array_encoding(
119 page.encoding.as_legacy(),
120 &page_buffers,
121 &data_type,
122 );
123 PrimitivePage {
124 scheduler,
125 num_rows: page.num_rows,
126 page_index: page_index as u32,
127 }
128 })
129 .collect::<Vec<_>>();
130 let num_rows = page_schedulers.iter().map(|p| p.num_rows).sum();
131 Self {
132 data_type,
133 page_schedulers,
134 num_rows,
135 should_validate,
136 column_index,
137 }
138 }
139}
140
141#[derive(Debug)]
142struct PrimitiveFieldSchedulingJob<'a> {
143 scheduler: &'a PrimitiveFieldScheduler,
144 ranges: Vec<Range<u64>>,
145 page_idx: usize,
146 range_idx: usize,
147 range_offset: u64,
148 global_row_offset: u64,
149}
150
151impl<'a> PrimitiveFieldSchedulingJob<'a> {
152 pub fn new(scheduler: &'a PrimitiveFieldScheduler, ranges: Vec<Range<u64>>) -> Self {
153 Self {
154 scheduler,
155 ranges,
156 page_idx: 0,
157 range_idx: 0,
158 range_offset: 0,
159 global_row_offset: 0,
160 }
161 }
162}
163
164impl SchedulingJob for PrimitiveFieldSchedulingJob<'_> {
165 fn schedule_next(
166 &mut self,
167 context: &mut SchedulerContext,
168 priority: &dyn PriorityRange,
169 ) -> Result<ScheduledScanLine> {
170 debug_assert!(self.range_idx < self.ranges.len());
171 let mut range = self.ranges[self.range_idx].clone();
173 range.start += self.range_offset;
174
175 let mut cur_page = &self.scheduler.page_schedulers[self.page_idx];
176 trace!(
177 "Current range is {:?} and current page has {} rows",
178 range,
179 cur_page.num_rows
180 );
181 while cur_page.num_rows + self.global_row_offset <= range.start {
183 self.global_row_offset += cur_page.num_rows;
184 self.page_idx += 1;
185 trace!("Skipping entire page of {} rows", cur_page.num_rows);
186 cur_page = &self.scheduler.page_schedulers[self.page_idx];
187 }
188
189 let mut ranges_in_page = Vec::new();
193 while cur_page.num_rows + self.global_row_offset > range.start {
194 range.start = range.start.max(self.global_row_offset);
195 let start_in_page = range.start - self.global_row_offset;
196 let end_in_page = start_in_page + (range.end - range.start);
197 let end_in_page = end_in_page.min(cur_page.num_rows);
198 let last_in_range = (end_in_page + self.global_row_offset) >= range.end;
199
200 ranges_in_page.push(start_in_page..end_in_page);
201 if last_in_range {
202 self.range_idx += 1;
203 if self.range_idx == self.ranges.len() {
204 break;
205 }
206 range = self.ranges[self.range_idx].clone();
207 } else {
208 break;
209 }
210 }
211
212 let num_rows_in_next = ranges_in_page.iter().map(|r| r.end - r.start).sum();
213 trace!(
214 "Scheduling {} rows across {} ranges from page with {} rows (priority={}, column_index={}, page_index={})",
215 num_rows_in_next,
216 ranges_in_page.len(),
217 cur_page.num_rows,
218 priority.current_priority(),
219 self.scheduler.column_index,
220 cur_page.page_index,
221 );
222
223 self.global_row_offset += cur_page.num_rows;
224 self.page_idx += 1;
225
226 let physical_decoder = cur_page.scheduler.schedule_ranges(
227 &ranges_in_page,
228 context.io(),
229 priority.current_priority(),
230 );
231
232 let logical_decoder = PrimitiveFieldDecoder {
233 data_type: self.scheduler.data_type.clone(),
234 column_index: self.scheduler.column_index,
235 unloaded_physical_decoder: Some(physical_decoder),
236 physical_decoder: None,
237 rows_drained: 0,
238 num_rows: num_rows_in_next,
239 should_validate: self.scheduler.should_validate,
240 page_index: cur_page.page_index,
241 };
242
243 let decoder = Box::new(logical_decoder);
244 let decoder_ready = context.locate_decoder(decoder);
245 Ok(ScheduledScanLine {
246 decoders: vec![MessageType::DecoderReady(decoder_ready)],
247 rows_scheduled: num_rows_in_next,
248 })
249 }
250
251 fn num_rows(&self) -> u64 {
252 self.ranges.iter().map(|r| r.end - r.start).sum()
253 }
254}
255
256impl FieldScheduler for PrimitiveFieldScheduler {
257 fn num_rows(&self) -> u64 {
258 self.num_rows
259 }
260
261 fn schedule_ranges<'a>(
262 &'a self,
263 ranges: &[std::ops::Range<u64>],
264 _filter: &FilterExpression,
266 ) -> Result<Box<dyn SchedulingJob + 'a>> {
267 Ok(Box::new(PrimitiveFieldSchedulingJob::new(
268 self,
269 ranges.to_vec(),
270 )))
271 }
272
273 fn initialize<'a>(
274 &'a self,
275 _filter: &'a FilterExpression,
276 _context: &'a SchedulerContext,
277 ) -> BoxFuture<'a, Result<()>> {
278 std::future::ready(Ok(())).boxed()
280 }
281}
282
283trait StructuralPageScheduler: std::fmt::Debug + Send {
286 fn initialize<'a>(
288 &'a mut self,
289 io: &Arc<dyn EncodingsIo>,
290 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>>;
291 fn load(&mut self, data: &Arc<dyn CachedPageData>);
293 fn schedule_ranges(
295 &self,
296 ranges: &[Range<u64>],
297 io: &Arc<dyn EncodingsIo>,
298 ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>>;
299}
300
301#[derive(Debug)]
303struct ChunkMeta {
304 num_values: u64,
305 chunk_size_bytes: u64,
306 offset_bytes: u64,
307}
308
309#[derive(Debug)]
311struct DecodedMiniBlockChunk {
312 rep: Option<ScalarBuffer<u16>>,
313 def: Option<ScalarBuffer<u16>>,
314 values: DataBlock,
315}
316
317#[derive(Debug)]
325struct DecodeMiniBlockTask {
326 rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
327 def_decompressor: Option<Arc<dyn BlockDecompressor>>,
328 value_decompressor: Arc<dyn MiniBlockDecompressor>,
329 dictionary_data: Option<Arc<DataBlock>>,
330 def_meaning: Arc<[DefinitionInterpretation]>,
331 num_buffers: u64,
332 max_visible_level: u16,
333 instructions: Vec<(ChunkDrainInstructions, LoadedChunk)>,
334}
335
336impl DecodeMiniBlockTask {
337 fn decode_levels(
338 rep_decompressor: &dyn BlockDecompressor,
339 levels: LanceBuffer,
340 num_levels: u16,
341 ) -> Result<ScalarBuffer<u16>> {
342 let rep = rep_decompressor.decompress(levels, num_levels as u64)?;
343 let mut rep = rep.as_fixed_width().unwrap();
344 debug_assert_eq!(rep.num_values, num_levels as u64);
345 debug_assert_eq!(rep.bits_per_value, 16);
346 Ok(rep.data.borrow_to_typed_slice::<u16>())
347 }
348
349 fn extend_levels(
356 range: Range<u64>,
357 levels: &mut Option<LevelBuffer>,
358 level_buf: &Option<impl AsRef<[u16]>>,
359 dest_offset: usize,
360 ) {
361 if let Some(level_buf) = level_buf {
362 if levels.is_none() {
363 let mut new_levels_vec =
366 LevelBuffer::with_capacity(dest_offset + (range.end - range.start) as usize);
367 new_levels_vec.extend(iter::repeat_n(0, dest_offset));
368 *levels = Some(new_levels_vec);
369 }
370 levels.as_mut().unwrap().extend(
371 level_buf.as_ref()[range.start as usize..range.end as usize]
372 .iter()
373 .copied(),
374 );
375 } else if let Some(levels) = levels {
376 let num_values = (range.end - range.start) as usize;
377 levels.extend(iter::repeat_n(0, num_values));
380 }
381 }
382
383 fn map_range(
420 range: Range<u64>,
421 rep: Option<&impl AsRef<[u16]>>,
422 def: Option<&impl AsRef<[u16]>>,
423 max_rep: u16,
424 max_visible_def: u16,
425 total_items: u64,
428 preamble_action: PreambleAction,
429 ) -> (Range<u64>, Range<u64>) {
430 if let Some(rep) = rep {
431 let mut rep = rep.as_ref();
432 let mut items_in_preamble = 0;
435 let first_row_start = match preamble_action {
436 PreambleAction::Skip | PreambleAction::Take => {
437 let first_row_start = if let Some(def) = def.as_ref() {
438 let mut first_row_start = None;
439 for (idx, (rep, def)) in rep.iter().zip(def.as_ref()).enumerate() {
440 if *rep == max_rep {
441 first_row_start = Some(idx);
442 break;
443 }
444 if *def <= max_visible_def {
445 items_in_preamble += 1;
446 }
447 }
448 first_row_start
449 } else {
450 let first_row_start = rep.iter().position(|&r| r == max_rep);
451 items_in_preamble = first_row_start.unwrap_or(rep.len());
452 first_row_start
453 };
454 if first_row_start.is_none() {
457 assert!(preamble_action == PreambleAction::Take);
458 return (0..total_items, 0..rep.len() as u64);
459 }
460 let first_row_start = first_row_start.unwrap() as u64;
461 rep = &rep[first_row_start as usize..];
462 first_row_start
463 }
464 PreambleAction::Absent => {
465 debug_assert!(rep[0] == max_rep);
466 0
467 }
468 };
469
470 if range.start == range.end {
472 debug_assert!(preamble_action == PreambleAction::Take);
473 return (0..items_in_preamble as u64, 0..first_row_start);
474 }
475 assert!(range.start < range.end);
476
477 let mut rows_seen = 0;
478 let mut new_start = 0;
479 let mut new_levels_start = 0;
480
481 if let Some(def) = def {
482 let def = &def.as_ref()[first_row_start as usize..];
483
484 let mut lead_invis_seen = 0;
486
487 if range.start > 0 {
488 if def[0] > max_visible_def {
489 lead_invis_seen += 1;
490 }
491 for (idx, (rep, def)) in rep.iter().zip(def).skip(1).enumerate() {
492 if *rep == max_rep {
493 rows_seen += 1;
494 if rows_seen == range.start {
495 new_start = idx as u64 + 1 - lead_invis_seen;
496 new_levels_start = idx as u64 + 1;
497 break;
498 }
499 if *def > max_visible_def {
500 lead_invis_seen += 1;
501 }
502 }
503 }
504 }
505
506 rows_seen += 1;
507
508 let mut new_end = u64::MAX;
509 let mut new_levels_end = rep.len() as u64;
510 let new_start_is_visible = def[new_levels_start as usize] <= max_visible_def;
511 let mut tail_invis_seen = if new_start_is_visible { 0 } else { 1 };
512 for (idx, (rep, def)) in rep[(new_levels_start + 1) as usize..]
513 .iter()
514 .zip(&def[(new_levels_start + 1) as usize..])
515 .enumerate()
516 {
517 if *rep == max_rep {
518 rows_seen += 1;
519 if rows_seen == range.end + 1 {
520 new_end = idx as u64 + new_start + 1 - tail_invis_seen;
521 new_levels_end = idx as u64 + new_levels_start + 1;
522 break;
523 }
524 if *def > max_visible_def {
525 tail_invis_seen += 1;
526 }
527 }
528 }
529
530 if new_end == u64::MAX {
531 new_levels_end = rep.len() as u64;
532 let total_invis_seen = lead_invis_seen + tail_invis_seen;
534 new_end = rep.len() as u64 - total_invis_seen;
535 }
536
537 assert_ne!(new_end, u64::MAX);
538
539 if preamble_action == PreambleAction::Skip {
541 new_start += first_row_start;
544 new_end += first_row_start;
545 new_levels_start += first_row_start;
546 new_levels_end += first_row_start;
547 } else if preamble_action == PreambleAction::Take {
548 debug_assert_eq!(new_start, 0);
549 debug_assert_eq!(new_levels_start, 0);
550 new_end += first_row_start;
551 new_levels_end += first_row_start;
552 }
553
554 (new_start..new_end, new_levels_start..new_levels_end)
555 } else {
556 if range.start > 0 {
562 for (idx, rep) in rep.iter().skip(1).enumerate() {
563 if *rep == max_rep {
564 rows_seen += 1;
565 if rows_seen == range.start {
566 new_start = idx as u64 + 1;
567 break;
568 }
569 }
570 }
571 }
572 let mut new_end = rep.len() as u64;
573 if range.end < total_items {
575 for (idx, rep) in rep[(new_start + 1) as usize..].iter().enumerate() {
576 if *rep == max_rep {
577 rows_seen += 1;
578 if rows_seen == range.end {
579 new_end = idx as u64 + new_start + 1;
580 break;
581 }
582 }
583 }
584 }
585
586 if preamble_action == PreambleAction::Skip {
588 new_start += first_row_start;
589 new_end += first_row_start;
590 } else if preamble_action == PreambleAction::Take {
591 debug_assert_eq!(new_start, 0);
592 new_end += first_row_start;
593 }
594
595 (new_start..new_end, new_start..new_end)
596 }
597 } else {
598 (range.clone(), range)
601 }
602 }
603
604 fn decode_miniblock_chunk(
606 &self,
607 buf: &LanceBuffer,
608 items_in_chunk: u64,
609 ) -> Result<DecodedMiniBlockChunk> {
610 let mut offset = 0;
611 let num_levels = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
612 offset += 2;
613
614 let rep_size = if self.rep_decompressor.is_some() {
615 let rep_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
616 offset += 2;
617 Some(rep_size)
618 } else {
619 None
620 };
621 let def_size = if self.def_decompressor.is_some() {
622 let def_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
623 offset += 2;
624 Some(def_size)
625 } else {
626 None
627 };
628 let buffer_sizes = (0..self.num_buffers)
629 .map(|_| {
630 let size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
631 offset += 2;
632 size
633 })
634 .collect::<Vec<_>>();
635
636 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
637
638 let rep = rep_size.map(|rep_size| {
639 let rep = buf.slice_with_length(offset, rep_size as usize);
640 offset += rep_size as usize;
641 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
642 rep
643 });
644
645 let def = def_size.map(|def_size| {
646 let def = buf.slice_with_length(offset, def_size as usize);
647 offset += def_size as usize;
648 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
649 def
650 });
651
652 let buffers = buffer_sizes
653 .into_iter()
654 .map(|buf_size| {
655 let buf = buf.slice_with_length(offset, buf_size as usize);
656 offset += buf_size as usize;
657 offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
658 buf
659 })
660 .collect::<Vec<_>>();
661
662 let values = self
663 .value_decompressor
664 .decompress(buffers, items_in_chunk)?;
665
666 let rep = rep
667 .map(|rep| {
668 Self::decode_levels(
669 self.rep_decompressor.as_ref().unwrap().as_ref(),
670 rep,
671 num_levels,
672 )
673 })
674 .transpose()?;
675 let def = def
676 .map(|def| {
677 Self::decode_levels(
678 self.def_decompressor.as_ref().unwrap().as_ref(),
679 def,
680 num_levels,
681 )
682 })
683 .transpose()?;
684
685 Ok(DecodedMiniBlockChunk { rep, def, values })
686 }
687}
688
689impl DecodePageTask for DecodeMiniBlockTask {
690 fn decode(self: Box<Self>) -> Result<DecodedPage> {
691 let mut repbuf: Option<LevelBuffer> = None;
693 let mut defbuf: Option<LevelBuffer> = None;
694
695 let max_rep = self.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
696
697 let estimated_size_bytes = self
699 .instructions
700 .iter()
701 .map(|(_, chunk)| chunk.data.len())
702 .sum::<usize>()
703 * 2;
704 let mut data_builder =
705 DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
706
707 let mut level_offset = 0;
709 for (instructions, chunk) in self.instructions.iter() {
711 let DecodedMiniBlockChunk { rep, def, values } =
715 self.decode_miniblock_chunk(&chunk.data, chunk.items_in_chunk)?;
716
717 let row_range_start =
719 instructions.rows_to_skip + instructions.chunk_instructions.rows_to_skip;
720 let row_range_end = row_range_start + instructions.rows_to_take;
721
722 let (item_range, level_range) = Self::map_range(
724 row_range_start..row_range_end,
725 rep.as_ref(),
726 def.as_ref(),
727 max_rep,
728 self.max_visible_level,
729 chunk.items_in_chunk,
730 instructions.preamble_action,
731 );
732
733 Self::extend_levels(level_range.clone(), &mut repbuf, &rep, level_offset);
735 Self::extend_levels(level_range.clone(), &mut defbuf, &def, level_offset);
736 level_offset += (level_range.end - level_range.start) as usize;
737 data_builder.append(&values, item_range);
738 }
739
740 let data = data_builder.finish();
741
742 let unraveler = RepDefUnraveler::new(repbuf, defbuf, self.def_meaning.clone());
743
744 if let Some(dictionary) = &self.dictionary_data {
746 let estimated_size_bytes = dictionary.data_size()
748 * (data.num_values() + dictionary.num_values() - 1)
749 / dictionary.num_values();
750 let mut data_builder = DataBlockBuilder::with_capacity_estimate(estimated_size_bytes);
751
752 if let DataBlock::FixedWidth(mut fixed_width_data_block) = data {
754 let indices = fixed_width_data_block.data.borrow_to_typed_slice::<u8>();
755 let indices = indices.as_ref();
756
757 indices.iter().for_each(|&idx| {
758 data_builder.append(dictionary, idx as u64..idx as u64 + 1);
759 });
760
761 let data = data_builder.finish();
762 return Ok(DecodedPage {
763 data,
764 repdef: unraveler,
765 });
766 }
767 }
768
769 Ok(DecodedPage {
770 data,
771 repdef: unraveler,
772 })
773 }
774}
775
776#[derive(Debug)]
779struct LoadedChunk {
780 data: LanceBuffer,
781 items_in_chunk: u64,
782 byte_range: Range<u64>,
783 chunk_idx: usize,
784}
785
786impl Clone for LoadedChunk {
787 fn clone(&self) -> Self {
788 Self {
789 data: self.data.try_clone().unwrap(),
791 items_in_chunk: self.items_in_chunk,
792 byte_range: self.byte_range.clone(),
793 chunk_idx: self.chunk_idx,
794 }
795 }
796}
797
798#[derive(Debug)]
801struct MiniBlockDecoder {
802 rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
803 def_decompressor: Option<Arc<dyn BlockDecompressor>>,
804 value_decompressor: Arc<dyn MiniBlockDecompressor>,
805 def_meaning: Arc<[DefinitionInterpretation]>,
806 loaded_chunks: VecDeque<LoadedChunk>,
807 instructions: VecDeque<ChunkInstructions>,
808 offset_in_current_chunk: u64,
809 num_rows: u64,
810 num_buffers: u64,
811 dictionary: Option<Arc<DataBlock>>,
812}
813
814impl StructuralPageDecoder for MiniBlockDecoder {
817 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
818 let mut items_desired = num_rows;
819 let mut need_preamble = false;
820 let mut skip_in_chunk = self.offset_in_current_chunk;
821 let mut drain_instructions = Vec::new();
822 while items_desired > 0 || need_preamble {
823 let (instructions, consumed) = self
824 .instructions
825 .front()
826 .unwrap()
827 .drain_from_instruction(&mut items_desired, &mut need_preamble, &mut skip_in_chunk);
828
829 while self.loaded_chunks.front().unwrap().chunk_idx
830 != instructions.chunk_instructions.chunk_idx
831 {
832 self.loaded_chunks.pop_front();
833 }
834 drain_instructions.push((instructions, self.loaded_chunks.front().unwrap().clone()));
835 if consumed {
836 self.instructions.pop_front();
837 }
838 }
839 self.offset_in_current_chunk = skip_in_chunk;
842
843 let max_visible_level = self
844 .def_meaning
845 .iter()
846 .take_while(|l| !l.is_list())
847 .map(|l| l.num_def_levels())
848 .sum::<u16>();
849
850 Ok(Box::new(DecodeMiniBlockTask {
851 instructions: drain_instructions,
852 def_decompressor: self.def_decompressor.clone(),
853 rep_decompressor: self.rep_decompressor.clone(),
854 value_decompressor: self.value_decompressor.clone(),
855 dictionary_data: self.dictionary.clone(),
856 def_meaning: self.def_meaning.clone(),
857 num_buffers: self.num_buffers,
858 max_visible_level,
859 }))
860 }
861
862 fn num_rows(&self) -> u64 {
863 self.num_rows
864 }
865}
866
867#[derive(Debug)]
868struct CachedComplexAllNullState {
869 rep: Option<ScalarBuffer<u16>>,
870 def: Option<ScalarBuffer<u16>>,
871}
872
873impl DeepSizeOf for CachedComplexAllNullState {
874 fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
875 self.rep.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
876 + self.def.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
877 }
878}
879
880impl CachedPageData for CachedComplexAllNullState {
881 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
882 self
883 }
884}
885
886#[derive(Debug)]
895pub struct ComplexAllNullScheduler {
896 buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
898 def_meaning: Arc<[DefinitionInterpretation]>,
899 repdef: Option<Arc<CachedComplexAllNullState>>,
900}
901
902impl ComplexAllNullScheduler {
903 pub fn new(
904 buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
905 def_meaning: Arc<[DefinitionInterpretation]>,
906 ) -> Self {
907 Self {
908 buffer_offsets_and_sizes,
909 def_meaning,
910 repdef: None,
911 }
912 }
913}
914
915impl StructuralPageScheduler for ComplexAllNullScheduler {
916 fn initialize<'a>(
917 &'a mut self,
918 io: &Arc<dyn EncodingsIo>,
919 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
920 let (rep_pos, rep_size) = self.buffer_offsets_and_sizes[0];
922 let (def_pos, def_size) = self.buffer_offsets_and_sizes[1];
923 let has_rep = rep_size > 0;
924 let has_def = def_size > 0;
925
926 let mut reads = Vec::with_capacity(2);
927 if has_rep {
928 reads.push(rep_pos..rep_pos + rep_size);
929 }
930 if has_def {
931 reads.push(def_pos..def_pos + def_size);
932 }
933
934 let data = io.submit_request(reads, 0);
935
936 async move {
937 let data = data.await?;
938 let mut data_iter = data.into_iter();
939
940 let rep = if has_rep {
941 let rep = data_iter.next().unwrap();
942 let mut rep = LanceBuffer::from_bytes(rep, 2);
943 let rep = rep.borrow_to_typed_slice::<u16>();
944 Some(rep)
945 } else {
946 None
947 };
948
949 let def = if has_def {
950 let def = data_iter.next().unwrap();
951 let mut def = LanceBuffer::from_bytes(def, 2);
952 let def = def.borrow_to_typed_slice::<u16>();
953 Some(def)
954 } else {
955 None
956 };
957
958 let repdef = Arc::new(CachedComplexAllNullState { rep, def });
959
960 self.repdef = Some(repdef.clone());
961
962 Ok(repdef as Arc<dyn CachedPageData>)
963 }
964 .boxed()
965 }
966
967 fn load(&mut self, data: &Arc<dyn CachedPageData>) {
968 self.repdef = Some(
969 data.clone()
970 .as_arc_any()
971 .downcast::<CachedComplexAllNullState>()
972 .unwrap(),
973 );
974 }
975
976 fn schedule_ranges(
977 &self,
978 ranges: &[Range<u64>],
979 _io: &Arc<dyn EncodingsIo>,
980 ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
981 let ranges = VecDeque::from_iter(ranges.iter().cloned());
982 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
983 Ok(std::future::ready(Ok(Box::new(ComplexAllNullPageDecoder {
984 ranges,
985 rep: self.repdef.as_ref().unwrap().rep.clone(),
986 def: self.repdef.as_ref().unwrap().def.clone(),
987 num_rows,
988 def_meaning: self.def_meaning.clone(),
989 }) as Box<dyn StructuralPageDecoder>))
990 .boxed())
991 }
992}
993
994#[derive(Debug)]
995pub struct ComplexAllNullPageDecoder {
996 ranges: VecDeque<Range<u64>>,
997 rep: Option<ScalarBuffer<u16>>,
998 def: Option<ScalarBuffer<u16>>,
999 num_rows: u64,
1000 def_meaning: Arc<[DefinitionInterpretation]>,
1001}
1002
1003impl ComplexAllNullPageDecoder {
1004 fn drain_ranges(&mut self, num_rows: u64) -> Vec<Range<u64>> {
1005 let mut rows_desired = num_rows;
1006 let mut ranges = Vec::with_capacity(self.ranges.len());
1007 while rows_desired > 0 {
1008 let front = self.ranges.front_mut().unwrap();
1009 let avail = front.end - front.start;
1010 if avail > rows_desired {
1011 ranges.push(front.start..front.start + rows_desired);
1012 front.start += rows_desired;
1013 rows_desired = 0;
1014 } else {
1015 ranges.push(self.ranges.pop_front().unwrap());
1016 rows_desired -= avail;
1017 }
1018 }
1019 ranges
1020 }
1021}
1022
1023impl StructuralPageDecoder for ComplexAllNullPageDecoder {
1024 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
1025 let drained_ranges = self.drain_ranges(num_rows);
1026 Ok(Box::new(DecodeComplexAllNullTask {
1027 ranges: drained_ranges,
1028 rep: self.rep.clone(),
1029 def: self.def.clone(),
1030 def_meaning: self.def_meaning.clone(),
1031 }))
1032 }
1033
1034 fn num_rows(&self) -> u64 {
1035 self.num_rows
1036 }
1037}
1038
1039#[derive(Debug)]
1042pub struct DecodeComplexAllNullTask {
1043 ranges: Vec<Range<u64>>,
1044 rep: Option<ScalarBuffer<u16>>,
1045 def: Option<ScalarBuffer<u16>>,
1046 def_meaning: Arc<[DefinitionInterpretation]>,
1047}
1048
1049impl DecodeComplexAllNullTask {
1050 fn decode_level(
1051 &self,
1052 levels: &Option<ScalarBuffer<u16>>,
1053 num_values: u64,
1054 ) -> Option<Vec<u16>> {
1055 levels.as_ref().map(|levels| {
1056 let mut referenced_levels = Vec::with_capacity(num_values as usize);
1057 for range in &self.ranges {
1058 referenced_levels.extend(
1059 levels[range.start as usize..range.end as usize]
1060 .iter()
1061 .copied(),
1062 );
1063 }
1064 referenced_levels
1065 })
1066 }
1067}
1068
1069impl DecodePageTask for DecodeComplexAllNullTask {
1070 fn decode(self: Box<Self>) -> Result<DecodedPage> {
1071 let num_values = self.ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1072 let data = DataBlock::AllNull(AllNullDataBlock { num_values });
1073 let rep = self.decode_level(&self.rep, num_values);
1074 let def = self.decode_level(&self.def, num_values);
1075 let unraveler = RepDefUnraveler::new(rep, def, self.def_meaning);
1076 Ok(DecodedPage {
1077 data,
1078 repdef: unraveler,
1079 })
1080 }
1081}
1082
1083#[derive(Debug, Default)]
1088pub struct SimpleAllNullScheduler {}
1089
1090impl StructuralPageScheduler for SimpleAllNullScheduler {
1091 fn initialize<'a>(
1092 &'a mut self,
1093 _io: &Arc<dyn EncodingsIo>,
1094 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
1095 std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
1096 }
1097
1098 fn load(&mut self, _cache: &Arc<dyn CachedPageData>) {}
1099
1100 fn schedule_ranges(
1101 &self,
1102 ranges: &[Range<u64>],
1103 _io: &Arc<dyn EncodingsIo>,
1104 ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
1105 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
1106 Ok(std::future::ready(Ok(
1107 Box::new(SimpleAllNullPageDecoder { num_rows }) as Box<dyn StructuralPageDecoder>
1108 ))
1109 .boxed())
1110 }
1111}
1112
1113#[derive(Debug)]
1116struct SimpleAllNullDecodePageTask {
1117 num_values: u64,
1118}
1119impl DecodePageTask for SimpleAllNullDecodePageTask {
1120 fn decode(self: Box<Self>) -> Result<DecodedPage> {
1121 let unraveler = RepDefUnraveler::new(
1122 None,
1123 Some(vec![1; self.num_values as usize]),
1124 Arc::new([DefinitionInterpretation::NullableItem]),
1125 );
1126 Ok(DecodedPage {
1127 data: DataBlock::AllNull(AllNullDataBlock {
1128 num_values: self.num_values,
1129 }),
1130 repdef: unraveler,
1131 })
1132 }
1133}
1134
1135#[derive(Debug)]
1136pub struct SimpleAllNullPageDecoder {
1137 num_rows: u64,
1138}
1139
1140impl StructuralPageDecoder for SimpleAllNullPageDecoder {
1141 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
1142 Ok(Box::new(SimpleAllNullDecodePageTask {
1143 num_values: num_rows,
1144 }))
1145 }
1146
1147 fn num_rows(&self) -> u64 {
1148 self.num_rows
1149 }
1150}
1151
1152#[derive(Debug, Clone)]
1153struct MiniBlockSchedulerDictionary {
1154 dictionary_decompressor: Arc<dyn BlockDecompressor>,
1156 dictionary_buf_position_and_size: (u64, u64),
1157 dictionary_data_alignment: u64,
1158 num_dictionary_items: u64,
1159}
1160
1161#[derive(Debug)]
1162struct RepIndexBlock {
1163 first_row: u64,
1167 starts_including_trailer: u64,
1170 has_preamble: bool,
1172 has_trailer: bool,
1174}
1175
1176impl DeepSizeOf for RepIndexBlock {
1177 fn deep_size_of_children(&self, _context: &mut Context) -> usize {
1178 0
1179 }
1180}
1181
1182#[derive(Debug)]
1183struct RepetitionIndex {
1184 blocks: Vec<RepIndexBlock>,
1185}
1186
1187impl DeepSizeOf for RepetitionIndex {
1188 fn deep_size_of_children(&self, context: &mut Context) -> usize {
1189 self.blocks.deep_size_of_children(context)
1190 }
1191}
1192
1193impl RepetitionIndex {
1194 fn decode(rep_index: &[Vec<u64>]) -> Self {
1195 let mut chunk_has_preamble = false;
1196 let mut offset = 0;
1197 let mut blocks = Vec::with_capacity(rep_index.len());
1198 for chunk_rep in rep_index {
1199 let ends_count = chunk_rep[0];
1200 let partial_count = chunk_rep[1];
1201
1202 let chunk_has_trailer = partial_count > 0;
1203 let mut starts_including_trailer = ends_count;
1204 if chunk_has_trailer {
1205 starts_including_trailer += 1;
1206 }
1207 if chunk_has_preamble {
1208 starts_including_trailer -= 1;
1209 }
1210
1211 blocks.push(RepIndexBlock {
1212 first_row: offset,
1213 starts_including_trailer,
1214 has_preamble: chunk_has_preamble,
1215 has_trailer: chunk_has_trailer,
1216 });
1217
1218 chunk_has_preamble = chunk_has_trailer;
1219 offset += starts_including_trailer;
1220 }
1221
1222 Self { blocks }
1223 }
1224}
1225
1226#[derive(Debug)]
1228struct MiniBlockCacheableState {
1229 chunk_meta: Vec<ChunkMeta>,
1231 rep_index: RepetitionIndex,
1233 dictionary: Option<Arc<DataBlock>>,
1235}
1236
1237impl DeepSizeOf for MiniBlockCacheableState {
1238 fn deep_size_of_children(&self, context: &mut Context) -> usize {
1239 self.rep_index.deep_size_of_children(context)
1240 + self
1241 .dictionary
1242 .as_ref()
1243 .map(|dict| dict.data_size() as usize)
1244 .unwrap_or(0)
1245 }
1246}
1247
1248impl CachedPageData for MiniBlockCacheableState {
1249 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
1250 self
1251 }
1252}
1253
1254#[derive(Debug)]
1281pub struct MiniBlockScheduler {
1282 buffer_offsets_and_sizes: Vec<(u64, u64)>,
1284 priority: u64,
1285 items_in_page: u64,
1286 repetition_index_depth: u16,
1287 num_buffers: u64,
1288 rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
1289 def_decompressor: Option<Arc<dyn BlockDecompressor>>,
1290 value_decompressor: Arc<dyn MiniBlockDecompressor>,
1291 def_meaning: Arc<[DefinitionInterpretation]>,
1292 dictionary: Option<MiniBlockSchedulerDictionary>,
1293 page_meta: Option<Arc<MiniBlockCacheableState>>,
1295}
1296
1297impl MiniBlockScheduler {
1298 fn try_new(
1299 buffer_offsets_and_sizes: &[(u64, u64)],
1300 priority: u64,
1301 items_in_page: u64,
1302 layout: &pb::MiniBlockLayout,
1303 decompressors: &dyn DecompressorStrategy,
1304 ) -> Result<Self> {
1305 let rep_decompressor = layout
1306 .rep_compression
1307 .as_ref()
1308 .map(|rep_compression| {
1309 decompressors
1310 .create_block_decompressor(rep_compression)
1311 .map(Arc::from)
1312 })
1313 .transpose()?;
1314 let def_decompressor = layout
1315 .def_compression
1316 .as_ref()
1317 .map(|def_compression| {
1318 decompressors
1319 .create_block_decompressor(def_compression)
1320 .map(Arc::from)
1321 })
1322 .transpose()?;
1323 let def_meaning = layout
1324 .layers
1325 .iter()
1326 .map(|l| ProtobufUtils::repdef_layer_to_def_interp(*l))
1327 .collect::<Vec<_>>();
1328 let value_decompressor = decompressors
1329 .create_miniblock_decompressor(layout.value_compression.as_ref().unwrap())?;
1330 let dictionary = if let Some(dictionary_encoding) = layout.dictionary.as_ref() {
1331 let num_dictionary_items = layout.num_dictionary_items;
1332 match dictionary_encoding.array_encoding.as_ref().unwrap() {
1333 pb::array_encoding::ArrayEncoding::Variable(_) => {
1334 Some(MiniBlockSchedulerDictionary {
1335 dictionary_decompressor: decompressors
1336 .create_block_decompressor(dictionary_encoding)?
1337 .into(),
1338 dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1339 dictionary_data_alignment: 4,
1340 num_dictionary_items,
1341 })
1342 }
1343 pb::array_encoding::ArrayEncoding::Flat(_) => Some(MiniBlockSchedulerDictionary {
1344 dictionary_decompressor: decompressors
1345 .create_block_decompressor(dictionary_encoding)?
1346 .into(),
1347 dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1348 dictionary_data_alignment: 16,
1349 num_dictionary_items,
1350 }),
1351 _ => {
1352 unreachable!("Currently only encodings `BinaryBlock` and `Flat` used for encoding MiniBlock dictionary.")
1353 }
1354 }
1355 } else {
1356 None
1357 };
1358
1359 Ok(Self {
1360 buffer_offsets_and_sizes: buffer_offsets_and_sizes.to_vec(),
1361 rep_decompressor,
1362 def_decompressor,
1363 value_decompressor: value_decompressor.into(),
1364 repetition_index_depth: layout.repetition_index_depth as u16,
1365 num_buffers: layout.num_buffers,
1366 priority,
1367 items_in_page,
1368 dictionary,
1369 def_meaning: def_meaning.into(),
1370 page_meta: None,
1371 })
1372 }
1373
1374 fn lookup_chunks(&self, chunk_indices: &[usize]) -> Vec<LoadedChunk> {
1375 let page_meta = self.page_meta.as_ref().unwrap();
1376 chunk_indices
1377 .iter()
1378 .map(|&chunk_idx| {
1379 let chunk_meta = &page_meta.chunk_meta[chunk_idx];
1380 let bytes_start = chunk_meta.offset_bytes;
1381 let bytes_end = bytes_start + chunk_meta.chunk_size_bytes;
1382 LoadedChunk {
1383 byte_range: bytes_start..bytes_end,
1384 items_in_chunk: chunk_meta.num_values,
1385 chunk_idx,
1386 data: LanceBuffer::empty(),
1387 }
1388 })
1389 .collect()
1390 }
1391}
1392
1393#[derive(Debug, PartialEq, Eq, Clone, Copy)]
1394enum PreambleAction {
1395 Take,
1396 Skip,
1397 Absent,
1398}
1399
1400#[derive(Clone, Debug, PartialEq, Eq)]
1406struct ChunkInstructions {
1407 chunk_idx: usize,
1409 preamble: PreambleAction,
1415 rows_to_skip: u64,
1419 rows_to_take: u64,
1421 take_trailer: bool,
1429}
1430
1431#[derive(Debug, PartialEq, Eq)]
1449struct ChunkDrainInstructions {
1450 chunk_instructions: ChunkInstructions,
1451 rows_to_skip: u64,
1452 rows_to_take: u64,
1453 preamble_action: PreambleAction,
1454}
1455
1456impl ChunkInstructions {
1457 fn schedule_instructions(rep_index: &RepetitionIndex, user_ranges: &[Range<u64>]) -> Vec<Self> {
1463 let mut chunk_instructions = Vec::with_capacity(user_ranges.len());
1467
1468 for user_range in user_ranges {
1469 let mut rows_needed = user_range.end - user_range.start;
1470 let mut need_preamble = false;
1471
1472 let mut block_index = match rep_index
1475 .blocks
1476 .binary_search_by_key(&user_range.start, |block| block.first_row)
1477 {
1478 Ok(idx) => {
1479 let mut idx = idx;
1482 while idx > 0 && rep_index.blocks[idx - 1].first_row == user_range.start {
1483 idx -= 1;
1484 }
1485 idx
1486 }
1487 Err(idx) => idx - 1,
1489 };
1490
1491 let mut to_skip = user_range.start - rep_index.blocks[block_index].first_row;
1492
1493 while rows_needed > 0 || need_preamble {
1494 let chunk = &rep_index.blocks[block_index];
1495 let rows_avail = chunk.starts_including_trailer - to_skip;
1496 debug_assert!(rows_avail > 0);
1497
1498 let rows_to_take = rows_avail.min(rows_needed);
1499 rows_needed -= rows_to_take;
1500
1501 let mut take_trailer = false;
1502 let preamble = if chunk.has_preamble {
1503 if need_preamble {
1504 PreambleAction::Take
1505 } else {
1506 PreambleAction::Skip
1507 }
1508 } else {
1509 PreambleAction::Absent
1510 };
1511 let mut rows_to_take_no_trailer = rows_to_take;
1512
1513 if rows_to_take == rows_avail && chunk.has_trailer {
1515 take_trailer = true;
1516 need_preamble = true;
1517 rows_to_take_no_trailer -= 1;
1518 } else {
1519 need_preamble = false;
1520 };
1521
1522 chunk_instructions.push(Self {
1523 preamble,
1524 chunk_idx: block_index,
1525 rows_to_skip: to_skip,
1526 rows_to_take: rows_to_take_no_trailer,
1527 take_trailer,
1528 });
1529
1530 to_skip = 0;
1531 block_index += 1;
1532 }
1533 }
1534
1535 if user_ranges.len() > 1 {
1539 let mut merged_instructions = Vec::with_capacity(chunk_instructions.len());
1541 let mut instructions_iter = chunk_instructions.into_iter();
1542 merged_instructions.push(instructions_iter.next().unwrap());
1543 for instruction in instructions_iter {
1544 let last = merged_instructions.last_mut().unwrap();
1545 if last.chunk_idx == instruction.chunk_idx
1546 && last.rows_to_take + last.rows_to_skip == instruction.rows_to_skip
1547 {
1548 last.rows_to_take += instruction.rows_to_take;
1549 last.take_trailer |= instruction.take_trailer;
1550 } else {
1551 merged_instructions.push(instruction);
1552 }
1553 }
1554 merged_instructions
1555 } else {
1556 chunk_instructions
1557 }
1558 }
1559
1560 fn drain_from_instruction(
1561 &self,
1562 rows_desired: &mut u64,
1563 need_preamble: &mut bool,
1564 skip_in_chunk: &mut u64,
1565 ) -> (ChunkDrainInstructions, bool) {
1566 debug_assert!(!*need_preamble || *skip_in_chunk == 0);
1568 let mut rows_avail = self.rows_to_take - *skip_in_chunk;
1569 let has_preamble = self.preamble != PreambleAction::Absent;
1570 let preamble_action = match (*need_preamble, has_preamble) {
1571 (true, true) => PreambleAction::Take,
1572 (true, false) => panic!("Need preamble but there isn't one"),
1573 (false, true) => PreambleAction::Skip,
1574 (false, false) => PreambleAction::Absent,
1575 };
1576
1577 if self.take_trailer {
1579 rows_avail += 1;
1580 }
1581
1582 let rows_taking = if *rows_desired >= rows_avail {
1585 *need_preamble = self.take_trailer;
1588 rows_avail
1589 } else {
1590 *need_preamble = false;
1593 *rows_desired
1594 };
1595 let rows_skipped = *skip_in_chunk;
1596
1597 let consumed_chunk = if *rows_desired >= rows_avail {
1599 *rows_desired -= rows_avail;
1600 *skip_in_chunk = 0;
1601 true
1602 } else {
1603 *skip_in_chunk += *rows_desired;
1604 *rows_desired = 0;
1605 false
1606 };
1607
1608 (
1609 ChunkDrainInstructions {
1610 chunk_instructions: self.clone(),
1611 rows_to_skip: rows_skipped,
1612 rows_to_take: rows_taking,
1613 preamble_action,
1614 },
1615 consumed_chunk,
1616 )
1617 }
1618}
1619
1620impl StructuralPageScheduler for MiniBlockScheduler {
1621 fn initialize<'a>(
1622 &'a mut self,
1623 io: &Arc<dyn EncodingsIo>,
1624 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
1625 let (meta_buf_position, meta_buf_size) = self.buffer_offsets_and_sizes[0];
1629 let value_buf_position = self.buffer_offsets_and_sizes[1].0;
1630 let mut bufs_needed = 1;
1631 if self.dictionary.is_some() {
1632 bufs_needed += 1;
1633 }
1634 if self.repetition_index_depth > 0 {
1635 bufs_needed += 1;
1636 }
1637 let mut required_ranges = Vec::with_capacity(bufs_needed);
1638 required_ranges.push(meta_buf_position..meta_buf_position + meta_buf_size);
1639 if let Some(ref dictionary) = self.dictionary {
1640 required_ranges.push(
1641 dictionary.dictionary_buf_position_and_size.0
1642 ..dictionary.dictionary_buf_position_and_size.0
1643 + dictionary.dictionary_buf_position_and_size.1,
1644 );
1645 }
1646 if self.repetition_index_depth > 0 {
1647 let (rep_index_pos, rep_index_size) = self.buffer_offsets_and_sizes.last().unwrap();
1648 required_ranges.push(*rep_index_pos..*rep_index_pos + *rep_index_size);
1649 }
1650 let io_req = io.submit_request(required_ranges, 0);
1651
1652 async move {
1653 let mut buffers = io_req.await?.into_iter().fuse();
1654 let meta_bytes = buffers.next().unwrap();
1655 let dictionary_bytes = self.dictionary.as_ref().and_then(|_| buffers.next());
1656 let rep_index_bytes = buffers.next();
1657
1658 assert!(meta_bytes.len() % 2 == 0);
1660 let mut bytes = LanceBuffer::from_bytes(meta_bytes, 2);
1661 let words = bytes.borrow_to_typed_slice::<u16>();
1662 let words = words.as_ref();
1663
1664 let mut chunk_meta = Vec::with_capacity(words.len());
1665
1666 let mut rows_counter = 0;
1667 let mut offset_bytes = value_buf_position;
1668 for (word_idx, word) in words.iter().enumerate() {
1669 let log_num_values = word & 0x0F;
1670 let divided_bytes = word >> 4;
1671 let num_bytes = (divided_bytes as usize + 1) * MINIBLOCK_ALIGNMENT;
1672 debug_assert!(num_bytes > 0);
1673 let num_values = if word_idx < words.len() - 1 {
1674 debug_assert!(log_num_values > 0);
1675 1 << log_num_values
1676 } else {
1677 debug_assert!(
1678 log_num_values == 0
1679 || (1 << log_num_values) == (self.items_in_page - rows_counter)
1680 );
1681 self.items_in_page - rows_counter
1682 };
1683 rows_counter += num_values;
1684
1685 chunk_meta.push(ChunkMeta {
1686 num_values,
1687 chunk_size_bytes: num_bytes as u64,
1688 offset_bytes,
1689 });
1690 offset_bytes += num_bytes as u64;
1691 }
1692
1693 let rep_index = if let Some(rep_index_data) = rep_index_bytes {
1695 assert!(rep_index_data.len() % 8 == 0);
1698 let mut repetition_index_vals = LanceBuffer::from_bytes(rep_index_data, 8);
1699 let repetition_index_vals = repetition_index_vals.borrow_to_typed_slice::<u64>();
1700 repetition_index_vals
1702 .as_ref()
1703 .chunks_exact(self.repetition_index_depth as usize + 1)
1704 .map(|c| c.to_vec())
1705 .collect::<Vec<_>>()
1706 } else {
1707 chunk_meta
1710 .iter()
1711 .map(|c| vec![c.num_values, 0])
1712 .collect::<Vec<_>>()
1713 };
1714
1715 let mut page_meta = MiniBlockCacheableState {
1716 chunk_meta,
1717 rep_index: RepetitionIndex::decode(&rep_index),
1718 dictionary: None,
1719 };
1720
1721 if let Some(ref mut dictionary) = self.dictionary {
1723 let dictionary_data = dictionary_bytes.unwrap();
1724 page_meta.dictionary =
1725 Some(Arc::new(dictionary.dictionary_decompressor.decompress(
1726 LanceBuffer::from_bytes(
1727 dictionary_data,
1728 dictionary.dictionary_data_alignment,
1729 ),
1730 dictionary.num_dictionary_items,
1731 )?));
1732 };
1733 let page_meta = Arc::new(page_meta);
1734 self.page_meta = Some(page_meta.clone());
1735 Ok(page_meta as Arc<dyn CachedPageData>)
1736 }
1737 .boxed()
1738 }
1739
1740 fn load(&mut self, data: &Arc<dyn CachedPageData>) {
1741 self.page_meta = Some(
1742 data.clone()
1743 .as_arc_any()
1744 .downcast::<MiniBlockCacheableState>()
1745 .unwrap(),
1746 );
1747 }
1748
1749 fn schedule_ranges(
1750 &self,
1751 ranges: &[Range<u64>],
1752 io: &Arc<dyn EncodingsIo>,
1753 ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
1754 let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
1755
1756 let page_meta = self.page_meta.as_ref().unwrap();
1757
1758 let chunk_instructions =
1759 ChunkInstructions::schedule_instructions(&page_meta.rep_index, ranges);
1760
1761 debug_assert_eq!(
1762 num_rows,
1763 chunk_instructions
1764 .iter()
1765 .map(|ci| {
1766 let taken = ci.rows_to_take;
1767 if ci.take_trailer {
1768 taken + 1
1769 } else {
1770 taken
1771 }
1772 })
1773 .sum::<u64>()
1774 );
1775
1776 let chunks_needed = chunk_instructions
1777 .iter()
1778 .map(|ci| ci.chunk_idx)
1779 .unique()
1780 .collect::<Vec<_>>();
1781 let mut loaded_chunks = self.lookup_chunks(&chunks_needed);
1782 let chunk_ranges = loaded_chunks
1783 .iter()
1784 .map(|c| c.byte_range.clone())
1785 .collect::<Vec<_>>();
1786 let loaded_chunk_data = io.submit_request(chunk_ranges, self.priority);
1787
1788 let rep_decompressor = self.rep_decompressor.clone();
1789 let def_decompressor = self.def_decompressor.clone();
1790 let value_decompressor = self.value_decompressor.clone();
1791 let num_buffers = self.num_buffers;
1792 let dictionary = page_meta
1793 .dictionary
1794 .as_ref()
1795 .map(|dictionary| dictionary.clone());
1796 let def_meaning = self.def_meaning.clone();
1797
1798 let res = async move {
1799 let loaded_chunk_data = loaded_chunk_data.await?;
1800 for (loaded_chunk, chunk_data) in loaded_chunks.iter_mut().zip(loaded_chunk_data) {
1801 loaded_chunk.data = LanceBuffer::from_bytes(chunk_data, 1);
1802 }
1803
1804 Ok(Box::new(MiniBlockDecoder {
1805 rep_decompressor,
1806 def_decompressor,
1807 value_decompressor,
1808 def_meaning,
1809 loaded_chunks: VecDeque::from_iter(loaded_chunks),
1810 instructions: VecDeque::from(chunk_instructions),
1811 offset_in_current_chunk: 0,
1812 dictionary,
1813 num_rows,
1814 num_buffers,
1815 }) as Box<dyn StructuralPageDecoder>)
1816 }
1817 .boxed();
1818 Ok(res)
1819 }
1820}
1821
1822#[derive(Debug)]
1823struct FullZipRepIndexDetails {
1824 buf_position: u64,
1825 bytes_per_value: u64, }
1827
1828#[derive(Debug)]
1829enum PerValueDecompressor {
1830 Fixed(Arc<dyn FixedPerValueDecompressor>),
1831 Variable(Arc<dyn VariablePerValueDecompressor>),
1832}
1833
1834#[derive(Debug)]
1835struct FullZipDecodeDetails {
1836 value_decompressor: PerValueDecompressor,
1837 def_meaning: Arc<[DefinitionInterpretation]>,
1838 ctrl_word_parser: ControlWordParser,
1839 max_rep: u16,
1840 max_visible_def: u16,
1841}
1842
1843#[derive(Debug)]
1851pub struct FullZipScheduler {
1852 data_buf_position: u64,
1853 rep_index: Option<FullZipRepIndexDetails>,
1854 priority: u64,
1855 rows_in_page: u64,
1856 bits_per_offset: u8,
1857 details: Arc<FullZipDecodeDetails>,
1858}
1859
1860impl FullZipScheduler {
1861 fn try_new(
1862 buffer_offsets_and_sizes: &[(u64, u64)],
1863 priority: u64,
1864 rows_in_page: u64,
1865 layout: &pb::FullZipLayout,
1866 decompressors: &dyn DecompressorStrategy,
1867 bits_per_offset: u8,
1868 ) -> Result<Self> {
1869 let (data_buf_position, _) = buffer_offsets_and_sizes[0];
1873 let rep_index = buffer_offsets_and_sizes.get(1).map(|(pos, len)| {
1874 let num_reps = rows_in_page + 1;
1875 let bytes_per_rep = len / num_reps;
1876 debug_assert_eq!(len % num_reps, 0);
1877 debug_assert!(
1878 bytes_per_rep == 1
1879 || bytes_per_rep == 2
1880 || bytes_per_rep == 4
1881 || bytes_per_rep == 8
1882 );
1883 FullZipRepIndexDetails {
1884 buf_position: *pos,
1885 bytes_per_value: bytes_per_rep,
1886 }
1887 });
1888
1889 let value_decompressor = match layout.details {
1890 Some(pb::full_zip_layout::Details::BitsPerValue(_)) => {
1891 let decompressor = decompressors.create_fixed_per_value_decompressor(
1892 layout.value_compression.as_ref().unwrap(),
1893 )?;
1894 PerValueDecompressor::Fixed(decompressor.into())
1895 }
1896 Some(pb::full_zip_layout::Details::BitsPerOffset(_)) => {
1897 let decompressor = decompressors.create_variable_per_value_decompressor(
1898 layout.value_compression.as_ref().unwrap(),
1899 )?;
1900 PerValueDecompressor::Variable(decompressor.into())
1901 }
1902 None => {
1903 panic!("Full-zip layout must have a `details` field");
1904 }
1905 };
1906 let ctrl_word_parser = ControlWordParser::new(
1907 layout.bits_rep.try_into().unwrap(),
1908 layout.bits_def.try_into().unwrap(),
1909 );
1910 let def_meaning = layout
1911 .layers
1912 .iter()
1913 .map(|l| ProtobufUtils::repdef_layer_to_def_interp(*l))
1914 .collect::<Vec<_>>();
1915
1916 let max_rep = def_meaning.iter().filter(|d| d.is_list()).count() as u16;
1917 let max_visible_def = def_meaning
1918 .iter()
1919 .filter(|d| !d.is_list())
1920 .map(|d| d.num_def_levels())
1921 .sum();
1922
1923 let details = Arc::new(FullZipDecodeDetails {
1924 value_decompressor,
1925 def_meaning: def_meaning.into(),
1926 ctrl_word_parser,
1927 max_rep,
1928 max_visible_def,
1929 });
1930 Ok(Self {
1931 data_buf_position,
1932 rep_index,
1933 details,
1934 priority,
1935 rows_in_page,
1936 bits_per_offset,
1937 })
1938 }
1939
1940 #[allow(clippy::too_many_arguments)]
1946 async fn indirect_schedule_ranges(
1947 data_buffer_pos: u64,
1948 row_ranges: Vec<Range<u64>>,
1949 rep_index_ranges: Vec<Range<u64>>,
1950 bytes_per_rep: u64,
1951 io: Arc<dyn EncodingsIo>,
1952 priority: u64,
1953 bits_per_offset: u8,
1954 details: Arc<FullZipDecodeDetails>,
1955 ) -> Result<Box<dyn StructuralPageDecoder>> {
1956 let byte_ranges = io
1957 .submit_request(rep_index_ranges, priority)
1958 .await?
1959 .into_iter()
1960 .map(|d| LanceBuffer::from_bytes(d, 1))
1961 .collect::<Vec<_>>();
1962 let byte_ranges = LanceBuffer::concat(&byte_ranges);
1963 let byte_ranges = ByteUnpacker::new(byte_ranges, bytes_per_rep as usize)
1964 .chunks(2)
1965 .into_iter()
1966 .map(|mut c| {
1967 let start = c.next().unwrap() + data_buffer_pos;
1968 let end = c.next().unwrap() + data_buffer_pos;
1969 start..end
1970 })
1971 .collect::<Vec<_>>();
1972
1973 let data = io.submit_request(byte_ranges, priority);
1974
1975 let data = data.await?;
1976 let data = data
1977 .into_iter()
1978 .map(|d| LanceBuffer::from_bytes(d, 1))
1979 .collect();
1980 let num_rows = row_ranges.into_iter().map(|r| r.end - r.start).sum();
1981
1982 match &details.value_decompressor {
1983 PerValueDecompressor::Fixed(decompressor) => {
1984 let bits_per_value = decompressor.bits_per_value();
1985 assert!(bits_per_value > 0);
1986 if bits_per_value % 8 != 0 {
1987 unimplemented!("Bit-packed full-zip");
1990 }
1991 let bytes_per_value = bits_per_value / 8;
1992 let total_bytes_per_value =
1993 bytes_per_value as usize + details.ctrl_word_parser.bytes_per_word();
1994 Ok(Box::new(FixedFullZipDecoder {
1995 details,
1996 data,
1997 num_rows,
1998 offset_in_current: 0,
1999 bytes_per_value: bytes_per_value as usize,
2000 total_bytes_per_value,
2001 }) as Box<dyn StructuralPageDecoder>)
2002 }
2003 PerValueDecompressor::Variable(_decompressor) => {
2004 Ok(Box::new(VariableFullZipDecoder::new(
2007 details,
2008 data,
2009 num_rows,
2010 bits_per_offset,
2011 bits_per_offset,
2012 )))
2013 }
2014 }
2015 }
2016
2017 fn schedule_ranges_rep(
2019 &self,
2020 ranges: &[Range<u64>],
2021 io: &Arc<dyn EncodingsIo>,
2022 rep_index: &FullZipRepIndexDetails,
2023 ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
2024 let rep_index_ranges = ranges
2025 .iter()
2026 .flat_map(|r| {
2027 let first_val_start =
2028 rep_index.buf_position + (r.start * rep_index.bytes_per_value);
2029 let first_val_end = first_val_start + rep_index.bytes_per_value;
2030 let last_val_start = rep_index.buf_position + (r.end * rep_index.bytes_per_value);
2031 let last_val_end = last_val_start + rep_index.bytes_per_value;
2032 [first_val_start..first_val_end, last_val_start..last_val_end]
2033 })
2034 .collect::<Vec<_>>();
2035
2036 Ok(Self::indirect_schedule_ranges(
2039 self.data_buf_position,
2040 ranges.to_vec(),
2041 rep_index_ranges,
2042 rep_index.bytes_per_value,
2043 io.clone(),
2044 self.priority,
2045 self.bits_per_offset,
2046 self.details.clone(),
2047 )
2048 .boxed())
2049 }
2050
2051 fn schedule_ranges_simple(
2055 &self,
2056 ranges: &[Range<u64>],
2057 io: &dyn EncodingsIo,
2058 ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
2059 let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
2061
2062 let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor else {
2063 unreachable!()
2064 };
2065
2066 let bits_per_value = decompressor.bits_per_value();
2068 assert_eq!(bits_per_value % 8, 0);
2069 let bytes_per_value = bits_per_value / 8;
2070 let bytes_per_cw = self.details.ctrl_word_parser.bytes_per_word();
2071 let total_bytes_per_value = bytes_per_value + bytes_per_cw as u64;
2072 let byte_ranges = ranges.iter().map(|r| {
2073 debug_assert!(r.end <= self.rows_in_page);
2074 let start = self.data_buf_position + r.start * total_bytes_per_value;
2075 let end = self.data_buf_position + r.end * total_bytes_per_value;
2076 start..end
2077 });
2078
2079 let data = io.submit_request(byte_ranges.collect(), self.priority);
2081
2082 let details = self.details.clone();
2083
2084 Ok(async move {
2085 let data = data.await?;
2086 let data = data
2087 .into_iter()
2088 .map(|d| LanceBuffer::from_bytes(d, 1))
2089 .collect();
2090 Ok(Box::new(FixedFullZipDecoder {
2091 details,
2092 data,
2093 num_rows,
2094 offset_in_current: 0,
2095 bytes_per_value: bytes_per_value as usize,
2096 total_bytes_per_value: total_bytes_per_value as usize,
2097 }) as Box<dyn StructuralPageDecoder>)
2098 }
2099 .boxed())
2100 }
2101}
2102
2103impl StructuralPageScheduler for FullZipScheduler {
2104 fn initialize<'a>(
2106 &'a mut self,
2107 _io: &Arc<dyn EncodingsIo>,
2108 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
2109 std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
2110 }
2111
2112 fn load(&mut self, _cache: &Arc<dyn CachedPageData>) {}
2113
2114 fn schedule_ranges(
2115 &self,
2116 ranges: &[Range<u64>],
2117 io: &Arc<dyn EncodingsIo>,
2118 ) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
2119 if let Some(rep_index) = self.rep_index.as_ref() {
2120 self.schedule_ranges_rep(ranges, io, rep_index)
2121 } else {
2122 self.schedule_ranges_simple(ranges, io.as_ref())
2123 }
2124 }
2125}
2126
2127#[derive(Debug)]
2135struct FixedFullZipDecoder {
2136 details: Arc<FullZipDecodeDetails>,
2137 data: VecDeque<LanceBuffer>,
2138 offset_in_current: usize,
2139 bytes_per_value: usize,
2140 total_bytes_per_value: usize,
2141 num_rows: u64,
2142}
2143
2144impl FixedFullZipDecoder {
2145 fn slice_next_task(&mut self, num_rows: u64) -> FullZipDecodeTaskItem {
2146 debug_assert!(num_rows > 0);
2147 let cur_buf = self.data.front_mut().unwrap();
2148 let start = self.offset_in_current;
2149 if self.details.ctrl_word_parser.has_rep() {
2150 let mut rows_started = 0;
2153 let mut num_items = 0;
2156 while self.offset_in_current < cur_buf.len() {
2157 let control = self.details.ctrl_word_parser.parse_desc(
2158 &cur_buf[self.offset_in_current..],
2159 self.details.max_rep,
2160 self.details.max_visible_def,
2161 );
2162 if control.is_new_row {
2163 if rows_started == num_rows {
2164 break;
2165 }
2166 rows_started += 1;
2167 }
2168 num_items += 1;
2169 if control.is_visible {
2170 self.offset_in_current += self.total_bytes_per_value;
2171 } else {
2172 self.offset_in_current += self.details.ctrl_word_parser.bytes_per_word();
2173 }
2174 }
2175
2176 let task_slice = cur_buf.slice_with_length(start, self.offset_in_current - start);
2177 if self.offset_in_current == cur_buf.len() {
2178 self.data.pop_front();
2179 self.offset_in_current = 0;
2180 }
2181
2182 FullZipDecodeTaskItem {
2183 data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2184 data: task_slice,
2185 bits_per_value: self.bytes_per_value as u64 * 8,
2186 num_values: num_items,
2187 block_info: BlockInfo::new(),
2188 }),
2189 rows_in_buf: rows_started,
2190 }
2191 } else {
2192 let cur_buf = self.data.front_mut().unwrap();
2195 let bytes_avail = cur_buf.len() - self.offset_in_current;
2196 let offset_in_cur = self.offset_in_current;
2197
2198 let bytes_needed = num_rows as usize * self.total_bytes_per_value;
2199 let mut rows_taken = num_rows;
2200 let task_slice = if bytes_needed >= bytes_avail {
2201 self.offset_in_current = 0;
2202 rows_taken = bytes_avail as u64 / self.total_bytes_per_value as u64;
2203 self.data
2204 .pop_front()
2205 .unwrap()
2206 .slice_with_length(offset_in_cur, bytes_avail)
2207 } else {
2208 self.offset_in_current += bytes_needed;
2209 cur_buf.slice_with_length(offset_in_cur, bytes_needed)
2210 };
2211 FullZipDecodeTaskItem {
2212 data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
2213 data: task_slice,
2214 bits_per_value: self.bytes_per_value as u64 * 8,
2215 num_values: rows_taken,
2216 block_info: BlockInfo::new(),
2217 }),
2218 rows_in_buf: rows_taken,
2219 }
2220 }
2221 }
2222}
2223
2224impl StructuralPageDecoder for FixedFullZipDecoder {
2225 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2226 let mut task_data = Vec::with_capacity(self.data.len());
2227 let mut remaining = num_rows;
2228 while remaining > 0 {
2229 let task_item = self.slice_next_task(remaining);
2230 remaining -= task_item.rows_in_buf;
2231 task_data.push(task_item);
2232 }
2233 Ok(Box::new(FixedFullZipDecodeTask {
2234 details: self.details.clone(),
2235 data: task_data,
2236 bytes_per_value: self.bytes_per_value,
2237 num_rows: num_rows as usize,
2238 }))
2239 }
2240
2241 fn num_rows(&self) -> u64 {
2242 self.num_rows
2243 }
2244}
2245
2246#[derive(Debug)]
2251struct VariableFullZipDecoder {
2252 details: Arc<FullZipDecodeDetails>,
2253 decompressor: Arc<dyn VariablePerValueDecompressor>,
2254 data: LanceBuffer,
2255 offsets: LanceBuffer,
2256 rep: ScalarBuffer<u16>,
2257 def: ScalarBuffer<u16>,
2258 repdef_starts: Vec<usize>,
2259 data_starts: Vec<usize>,
2260 offset_starts: Vec<usize>,
2261 visible_item_counts: Vec<u64>,
2262 bits_per_offset: u8,
2263 current_idx: usize,
2264 num_rows: u64,
2265}
2266
2267impl VariableFullZipDecoder {
2268 fn new(
2269 details: Arc<FullZipDecodeDetails>,
2270 data: VecDeque<LanceBuffer>,
2271 num_rows: u64,
2272 in_bits_per_length: u8,
2273 out_bits_per_offset: u8,
2274 ) -> Self {
2275 let decompressor = match details.value_decompressor {
2276 PerValueDecompressor::Variable(ref d) => d.clone(),
2277 _ => unreachable!(),
2278 };
2279
2280 assert_eq!(in_bits_per_length % 8, 0);
2281 assert!(out_bits_per_offset == 32 || out_bits_per_offset == 64);
2282
2283 let mut decoder = Self {
2284 details,
2285 decompressor,
2286 data: LanceBuffer::empty(),
2287 offsets: LanceBuffer::empty(),
2288 rep: LanceBuffer::empty().borrow_to_typed_slice(),
2289 def: LanceBuffer::empty().borrow_to_typed_slice(),
2290 bits_per_offset: out_bits_per_offset,
2291 repdef_starts: Vec::with_capacity(num_rows as usize + 1),
2292 data_starts: Vec::with_capacity(num_rows as usize + 1),
2293 offset_starts: Vec::with_capacity(num_rows as usize + 1),
2294 visible_item_counts: Vec::with_capacity(num_rows as usize + 1),
2295 current_idx: 0,
2296 num_rows,
2297 };
2298
2299 decoder.unzip(data, in_bits_per_length, out_bits_per_offset, num_rows);
2320
2321 decoder
2322 }
2323
2324 unsafe fn parse_length(data: &[u8], bits_per_offset: u8) -> u64 {
2325 match bits_per_offset {
2326 8 => *data.get_unchecked(0) as u64,
2327 16 => u16::from_le_bytes([*data.get_unchecked(0), *data.get_unchecked(1)]) as u64,
2328 32 => u32::from_le_bytes([
2329 *data.get_unchecked(0),
2330 *data.get_unchecked(1),
2331 *data.get_unchecked(2),
2332 *data.get_unchecked(3),
2333 ]) as u64,
2334 64 => u64::from_le_bytes([
2335 *data.get_unchecked(0),
2336 *data.get_unchecked(1),
2337 *data.get_unchecked(2),
2338 *data.get_unchecked(3),
2339 *data.get_unchecked(4),
2340 *data.get_unchecked(5),
2341 *data.get_unchecked(6),
2342 *data.get_unchecked(7),
2343 ]),
2344 _ => unreachable!(),
2345 }
2346 }
2347
2348 fn unzip(
2349 &mut self,
2350 data: VecDeque<LanceBuffer>,
2351 in_bits_per_length: u8,
2352 out_bits_per_offset: u8,
2353 num_rows: u64,
2354 ) {
2355 let mut rep = Vec::with_capacity(num_rows as usize);
2357 let mut def = Vec::with_capacity(num_rows as usize);
2358 let bytes_cw = self.details.ctrl_word_parser.bytes_per_word() * num_rows as usize;
2359
2360 let bytes_per_offset = out_bits_per_offset as usize / 8;
2363 let bytes_offsets = bytes_per_offset * (num_rows as usize + 1);
2364 let mut offsets_data = Vec::with_capacity(bytes_offsets);
2365
2366 let bytes_per_length = in_bits_per_length as usize / 8;
2367 let bytes_lengths = bytes_per_length * num_rows as usize;
2368
2369 let bytes_data = data.iter().map(|d| d.len()).sum::<usize>();
2370 let mut unzipped_data =
2373 Vec::with_capacity((bytes_data - bytes_cw).saturating_sub(bytes_lengths));
2374
2375 let mut current_offset = 0_u64;
2376 let mut visible_item_count = 0_u64;
2377 for databuf in data.into_iter() {
2378 let mut databuf = databuf.as_ref();
2379 while !databuf.is_empty() {
2380 let data_start = unzipped_data.len();
2381 let offset_start = offsets_data.len();
2382 let repdef_start = rep.len().max(def.len());
2385 let ctrl_desc = self.details.ctrl_word_parser.parse_desc(
2387 databuf,
2388 self.details.max_rep,
2389 self.details.max_visible_def,
2390 );
2391 self.details
2392 .ctrl_word_parser
2393 .parse(databuf, &mut rep, &mut def);
2394 databuf = &databuf[self.details.ctrl_word_parser.bytes_per_word()..];
2395
2396 if ctrl_desc.is_new_row {
2397 self.repdef_starts.push(repdef_start);
2398 self.data_starts.push(data_start);
2399 self.offset_starts.push(offset_start);
2400 self.visible_item_counts.push(visible_item_count);
2401 }
2402 if ctrl_desc.is_visible {
2403 visible_item_count += 1;
2404 if ctrl_desc.is_valid_item {
2405 debug_assert!(databuf.len() >= bytes_per_length);
2407 let length = unsafe { Self::parse_length(databuf, in_bits_per_length) };
2408 match out_bits_per_offset {
2409 32 => offsets_data
2410 .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2411 64 => offsets_data.extend_from_slice(¤t_offset.to_le_bytes()),
2412 _ => unreachable!(),
2413 };
2414 databuf = &databuf[bytes_per_offset..];
2415 unzipped_data.extend_from_slice(&databuf[..length as usize]);
2416 databuf = &databuf[length as usize..];
2417 current_offset += length;
2418 } else {
2419 match out_bits_per_offset {
2421 32 => offsets_data
2422 .extend_from_slice(&(current_offset as u32).to_le_bytes()),
2423 64 => offsets_data.extend_from_slice(¤t_offset.to_le_bytes()),
2424 _ => unreachable!(),
2425 }
2426 }
2427 }
2428 }
2429 }
2430 self.repdef_starts.push(rep.len().max(def.len()));
2431 self.data_starts.push(unzipped_data.len());
2432 self.offset_starts.push(offsets_data.len());
2433 self.visible_item_counts.push(visible_item_count);
2434 match out_bits_per_offset {
2435 32 => offsets_data.extend_from_slice(&(current_offset as u32).to_le_bytes()),
2436 64 => offsets_data.extend_from_slice(¤t_offset.to_le_bytes()),
2437 _ => unreachable!(),
2438 };
2439 self.rep = ScalarBuffer::from(rep);
2440 self.def = ScalarBuffer::from(def);
2441 self.data = LanceBuffer::Owned(unzipped_data);
2442 self.offsets = LanceBuffer::Owned(offsets_data);
2443 }
2444}
2445
2446impl StructuralPageDecoder for VariableFullZipDecoder {
2447 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
2448 let start = self.current_idx;
2449 let end = start + num_rows as usize;
2450
2451 let data = self.data.borrow_and_clone();
2459
2460 let offset_start = self.offset_starts[start];
2461 let offset_end = self.offset_starts[end] + (self.bits_per_offset as usize / 8);
2462 let offsets = self
2463 .offsets
2464 .slice_with_length(offset_start, offset_end - offset_start);
2465
2466 let repdef_start = self.repdef_starts[start];
2467 let repdef_end = self.repdef_starts[end];
2468 let rep = if self.rep.is_empty() {
2469 self.rep.clone()
2470 } else {
2471 self.rep.slice(repdef_start, repdef_end - repdef_start)
2472 };
2473 let def = if self.def.is_empty() {
2474 self.def.clone()
2475 } else {
2476 self.def.slice(repdef_start, repdef_end - repdef_start)
2477 };
2478
2479 let visible_item_counts_start = self.visible_item_counts[start];
2480 let visible_item_counts_end = self.visible_item_counts[end];
2481 let num_visible_items = visible_item_counts_end - visible_item_counts_start;
2482
2483 self.current_idx += num_rows as usize;
2484
2485 Ok(Box::new(VariableFullZipDecodeTask {
2486 details: self.details.clone(),
2487 decompressor: self.decompressor.clone(),
2488 data,
2489 offsets,
2490 bits_per_offset: self.bits_per_offset,
2491 num_visible_items,
2492 rep,
2493 def,
2494 }))
2495 }
2496
2497 fn num_rows(&self) -> u64 {
2498 self.num_rows
2499 }
2500}
2501
2502#[derive(Debug)]
2503struct VariableFullZipDecodeTask {
2504 details: Arc<FullZipDecodeDetails>,
2505 decompressor: Arc<dyn VariablePerValueDecompressor>,
2506 data: LanceBuffer,
2507 offsets: LanceBuffer,
2508 bits_per_offset: u8,
2509 num_visible_items: u64,
2510 rep: ScalarBuffer<u16>,
2511 def: ScalarBuffer<u16>,
2512}
2513
2514impl DecodePageTask for VariableFullZipDecodeTask {
2515 fn decode(self: Box<Self>) -> Result<DecodedPage> {
2516 let block = VariableWidthBlock {
2517 data: self.data,
2518 offsets: self.offsets,
2519 bits_per_offset: self.bits_per_offset,
2520 num_values: self.num_visible_items,
2521 block_info: BlockInfo::new(),
2522 };
2523 let decomopressed = self.decompressor.decompress(block)?;
2524 let rep = self.rep.to_vec();
2525 let def = self.def.to_vec();
2526 let unraveler =
2527 RepDefUnraveler::new(Some(rep), Some(def), self.details.def_meaning.clone());
2528 Ok(DecodedPage {
2529 data: decomopressed,
2530 repdef: unraveler,
2531 })
2532 }
2533}
2534
2535#[derive(Debug)]
2536struct FullZipDecodeTaskItem {
2537 data: PerValueDataBlock,
2538 rows_in_buf: u64,
2539}
2540
2541#[derive(Debug)]
2544struct FixedFullZipDecodeTask {
2545 details: Arc<FullZipDecodeDetails>,
2546 data: Vec<FullZipDecodeTaskItem>,
2547 num_rows: usize,
2548 bytes_per_value: usize,
2549}
2550
2551impl DecodePageTask for FixedFullZipDecodeTask {
2552 fn decode(self: Box<Self>) -> Result<DecodedPage> {
2553 let estimated_size_bytes = self
2555 .data
2556 .iter()
2557 .map(|task_item| task_item.data.data_size() as usize)
2558 .sum::<usize>()
2559 * 2;
2560 let mut data_builder =
2561 DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
2562
2563 if self.details.ctrl_word_parser.bytes_per_word() == 0 {
2564 for task_item in self.data.into_iter() {
2568 let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
2569 unreachable!()
2570 };
2571 let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
2572 else {
2573 unreachable!()
2574 };
2575 debug_assert_eq!(fixed_data.num_values, task_item.rows_in_buf);
2576 let decompressed = decompressor.decompress(fixed_data, task_item.rows_in_buf)?;
2577 data_builder.append(&decompressed, 0..task_item.rows_in_buf);
2578 }
2579
2580 let unraveler = RepDefUnraveler::new(None, None, self.details.def_meaning.clone());
2581
2582 Ok(DecodedPage {
2583 data: data_builder.finish(),
2584 repdef: unraveler,
2585 })
2586 } else {
2587 let mut rep = Vec::with_capacity(self.num_rows);
2589 let mut def = Vec::with_capacity(self.num_rows);
2590
2591 for task_item in self.data.into_iter() {
2592 let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
2593 unreachable!()
2594 };
2595 let mut buf_slice = fixed_data.data.as_ref();
2596 let num_values = fixed_data.num_values as usize;
2597 let mut values = Vec::with_capacity(
2600 fixed_data.data.len()
2601 - (self.details.ctrl_word_parser.bytes_per_word() * num_values),
2602 );
2603 let mut visible_items = 0;
2604 for _ in 0..num_values {
2605 self.details
2607 .ctrl_word_parser
2608 .parse(buf_slice, &mut rep, &mut def);
2609 buf_slice = &buf_slice[self.details.ctrl_word_parser.bytes_per_word()..];
2610
2611 let is_visible = def
2612 .last()
2613 .map(|d| *d <= self.details.max_visible_def)
2614 .unwrap_or(true);
2615 if is_visible {
2616 values.extend_from_slice(buf_slice[..self.bytes_per_value].as_ref());
2618 buf_slice = &buf_slice[self.bytes_per_value..];
2619 visible_items += 1;
2620 }
2621 }
2622
2623 let values_buf = LanceBuffer::Owned(values);
2625 let fixed_data = FixedWidthDataBlock {
2626 bits_per_value: self.bytes_per_value as u64 * 8,
2627 block_info: BlockInfo::new(),
2628 data: values_buf,
2629 num_values: visible_items,
2630 };
2631 let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
2632 else {
2633 unreachable!()
2634 };
2635 let decompressed = decompressor.decompress(fixed_data, visible_items)?;
2636 data_builder.append(&decompressed, 0..visible_items);
2637 }
2638
2639 let repetition = if rep.is_empty() { None } else { Some(rep) };
2640 let definition = if def.is_empty() { None } else { Some(def) };
2641
2642 let unraveler =
2643 RepDefUnraveler::new(repetition, definition, self.details.def_meaning.clone());
2644 let data = data_builder.finish();
2645
2646 Ok(DecodedPage {
2647 data,
2648 repdef: unraveler,
2649 })
2650 }
2651 }
2652}
2653
2654#[derive(Debug)]
2655struct StructuralPrimitiveFieldSchedulingJob<'a> {
2656 scheduler: &'a StructuralPrimitiveFieldScheduler,
2657 ranges: Vec<Range<u64>>,
2658 page_idx: usize,
2659 range_idx: usize,
2660 global_row_offset: u64,
2661}
2662
2663impl<'a> StructuralPrimitiveFieldSchedulingJob<'a> {
2664 pub fn new(scheduler: &'a StructuralPrimitiveFieldScheduler, ranges: Vec<Range<u64>>) -> Self {
2665 Self {
2666 scheduler,
2667 ranges,
2668 page_idx: 0,
2669 range_idx: 0,
2670 global_row_offset: 0,
2671 }
2672 }
2673}
2674
2675impl StructuralSchedulingJob for StructuralPrimitiveFieldSchedulingJob<'_> {
2676 fn schedule_next(
2677 &mut self,
2678 context: &mut SchedulerContext,
2679 ) -> Result<Option<ScheduledScanLine>> {
2680 if self.range_idx >= self.ranges.len() {
2681 return Ok(None);
2682 }
2683 let mut range = self.ranges[self.range_idx].clone();
2685 let priority = range.start;
2686
2687 let mut cur_page = &self.scheduler.page_schedulers[self.page_idx];
2688 trace!(
2689 "Current range is {:?} and current page has {} rows",
2690 range,
2691 cur_page.num_rows
2692 );
2693 while cur_page.num_rows + self.global_row_offset <= range.start {
2695 self.global_row_offset += cur_page.num_rows;
2696 self.page_idx += 1;
2697 trace!("Skipping entire page of {} rows", cur_page.num_rows);
2698 cur_page = &self.scheduler.page_schedulers[self.page_idx];
2699 }
2700
2701 let mut ranges_in_page = Vec::new();
2705 while cur_page.num_rows + self.global_row_offset > range.start {
2706 range.start = range.start.max(self.global_row_offset);
2707 let start_in_page = range.start - self.global_row_offset;
2708 let end_in_page = start_in_page + (range.end - range.start);
2709 let end_in_page = end_in_page.min(cur_page.num_rows);
2710 let last_in_range = (end_in_page + self.global_row_offset) >= range.end;
2711
2712 ranges_in_page.push(start_in_page..end_in_page);
2713 if last_in_range {
2714 self.range_idx += 1;
2715 if self.range_idx == self.ranges.len() {
2716 break;
2717 }
2718 range = self.ranges[self.range_idx].clone();
2719 } else {
2720 break;
2721 }
2722 }
2723
2724 let num_rows_in_next = ranges_in_page.iter().map(|r| r.end - r.start).sum();
2725 trace!(
2726 "Scheduling {} rows across {} ranges from page with {} rows (priority={}, column_index={}, page_index={})",
2727 num_rows_in_next,
2728 ranges_in_page.len(),
2729 cur_page.num_rows,
2730 priority,
2731 self.scheduler.column_index,
2732 cur_page.page_index,
2733 );
2734
2735 self.global_row_offset += cur_page.num_rows;
2736 self.page_idx += 1;
2737
2738 let page_decoder = cur_page
2739 .scheduler
2740 .schedule_ranges(&ranges_in_page, context.io())?;
2741
2742 let cur_path = context.current_path();
2743 let page_index = cur_page.page_index;
2744 let unloaded_page = async move {
2745 let page_decoder = page_decoder.await?;
2746 Ok(LoadedPage {
2747 decoder: page_decoder,
2748 path: cur_path,
2749 page_index,
2750 })
2751 }
2752 .boxed();
2753
2754 Ok(Some(ScheduledScanLine {
2755 decoders: vec![MessageType::UnloadedPage(UnloadedPage(unloaded_page))],
2756 rows_scheduled: num_rows_in_next,
2757 }))
2758 }
2759}
2760
2761#[derive(Debug)]
2762struct PageInfoAndScheduler {
2763 page_index: usize,
2764 num_rows: u64,
2765 scheduler: Box<dyn StructuralPageScheduler>,
2766}
2767
2768#[derive(Debug)]
2773pub struct StructuralPrimitiveFieldScheduler {
2774 page_schedulers: Vec<PageInfoAndScheduler>,
2775 column_index: u32,
2776}
2777
2778impl StructuralPrimitiveFieldScheduler {
2779 pub fn try_new(
2780 column_info: &ColumnInfo,
2781 decompressors: &dyn DecompressorStrategy,
2782 ) -> Result<Self> {
2783 let page_schedulers = column_info
2784 .page_infos
2785 .iter()
2786 .enumerate()
2787 .map(|(page_index, page_info)| {
2788 Self::page_info_to_scheduler(
2789 page_info,
2790 page_index,
2791 column_info.index as usize,
2792 decompressors,
2793 )
2794 })
2795 .collect::<Result<Vec<_>>>()?;
2796 Ok(Self {
2797 page_schedulers,
2798 column_index: column_info.index,
2799 })
2800 }
2801
2802 fn page_info_to_scheduler(
2803 page_info: &PageInfo,
2804 page_index: usize,
2805 _column_index: usize,
2806 decompressors: &dyn DecompressorStrategy,
2807 ) -> Result<PageInfoAndScheduler> {
2808 let scheduler: Box<dyn StructuralPageScheduler> =
2809 match page_info.encoding.as_structural().layout.as_ref() {
2810 Some(pb::page_layout::Layout::MiniBlockLayout(mini_block)) => {
2811 Box::new(MiniBlockScheduler::try_new(
2812 &page_info.buffer_offsets_and_sizes,
2813 page_info.priority,
2814 mini_block.num_items,
2815 mini_block,
2816 decompressors,
2817 )?)
2818 }
2819 Some(pb::page_layout::Layout::FullZipLayout(full_zip)) => {
2820 Box::new(FullZipScheduler::try_new(
2821 &page_info.buffer_offsets_and_sizes,
2822 page_info.priority,
2823 page_info.num_rows,
2824 full_zip,
2825 decompressors,
2826 32,
2827 )?)
2828 }
2829 Some(pb::page_layout::Layout::AllNullLayout(all_null)) => {
2830 let def_meaning = all_null
2831 .layers
2832 .iter()
2833 .map(|l| ProtobufUtils::repdef_layer_to_def_interp(*l))
2834 .collect::<Vec<_>>();
2835 if def_meaning.len() == 1
2836 && def_meaning[0] == DefinitionInterpretation::NullableItem
2837 {
2838 Box::new(SimpleAllNullScheduler::default())
2839 as Box<dyn StructuralPageScheduler>
2840 } else {
2841 Box::new(ComplexAllNullScheduler::new(
2842 page_info.buffer_offsets_and_sizes.clone(),
2843 def_meaning.into(),
2844 )) as Box<dyn StructuralPageScheduler>
2845 }
2846 }
2847 _ => todo!(),
2848 };
2849 Ok(PageInfoAndScheduler {
2850 page_index,
2851 num_rows: page_info.num_rows,
2852 scheduler,
2853 })
2854 }
2855}
2856
2857pub trait CachedPageData: Any + Send + Sync + DeepSizeOf + 'static {
2858 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static>;
2859}
2860
2861pub struct NoCachedPageData;
2862
2863impl DeepSizeOf for NoCachedPageData {
2864 fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
2865 0
2866 }
2867}
2868impl CachedPageData for NoCachedPageData {
2869 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
2870 self
2871 }
2872}
2873
2874pub struct CachedFieldData {
2875 pages: Vec<Arc<dyn CachedPageData>>,
2876}
2877
2878impl DeepSizeOf for CachedFieldData {
2879 fn deep_size_of_children(&self, ctx: &mut Context) -> usize {
2880 self.pages.deep_size_of_children(ctx)
2881 }
2882}
2883
2884impl StructuralFieldScheduler for StructuralPrimitiveFieldScheduler {
2885 fn initialize<'a>(
2886 &'a mut self,
2887 _filter: &'a FilterExpression,
2888 context: &'a SchedulerContext,
2889 ) -> BoxFuture<'a, Result<()>> {
2890 let cache_key = self.column_index.to_string();
2891 if let Some(cached_data) = context.cache().get_by_str::<CachedFieldData>(&cache_key) {
2892 self.page_schedulers
2893 .iter_mut()
2894 .zip(cached_data.pages.iter())
2895 .for_each(|(page_scheduler, cached_data)| {
2896 page_scheduler.scheduler.load(cached_data);
2897 });
2898 return std::future::ready(Ok(())).boxed();
2899 };
2900
2901 let cache = context.cache().clone();
2902 let page_data = self
2903 .page_schedulers
2904 .iter_mut()
2905 .map(|s| s.scheduler.initialize(context.io()))
2906 .collect::<FuturesOrdered<_>>();
2907
2908 async move {
2909 let page_data = page_data.try_collect::<Vec<_>>().await?;
2910 let cached_data = Arc::new(CachedFieldData { pages: page_data });
2911 cache.insert_by_str::<CachedFieldData>(&cache_key, cached_data);
2912 Ok(())
2913 }
2914 .boxed()
2915 }
2916
2917 fn schedule_ranges<'a>(
2918 &'a self,
2919 ranges: &[Range<u64>],
2920 _filter: &FilterExpression,
2921 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
2922 let ranges = ranges.to_vec();
2923 Ok(Box::new(StructuralPrimitiveFieldSchedulingJob::new(
2924 self, ranges,
2925 )))
2926 }
2927}
2928
2929pub struct PrimitiveFieldDecoder {
2930 data_type: DataType,
2931 unloaded_physical_decoder: Option<BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>>,
2932 physical_decoder: Option<Arc<dyn PrimitivePageDecoder>>,
2933 should_validate: bool,
2934 num_rows: u64,
2935 rows_drained: u64,
2936 column_index: u32,
2937 page_index: u32,
2938}
2939
2940impl PrimitiveFieldDecoder {
2941 pub fn new_from_data(
2942 physical_decoder: Arc<dyn PrimitivePageDecoder>,
2943 data_type: DataType,
2944 num_rows: u64,
2945 should_validate: bool,
2946 ) -> Self {
2947 Self {
2948 data_type,
2949 unloaded_physical_decoder: None,
2950 physical_decoder: Some(physical_decoder),
2951 should_validate,
2952 num_rows,
2953 rows_drained: 0,
2954 column_index: u32::MAX,
2955 page_index: u32::MAX,
2956 }
2957 }
2958}
2959
2960impl Debug for PrimitiveFieldDecoder {
2961 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2962 f.debug_struct("PrimitiveFieldDecoder")
2963 .field("data_type", &self.data_type)
2964 .field("num_rows", &self.num_rows)
2965 .field("rows_drained", &self.rows_drained)
2966 .finish()
2967 }
2968}
2969
2970struct PrimitiveFieldDecodeTask {
2971 rows_to_skip: u64,
2972 rows_to_take: u64,
2973 should_validate: bool,
2974 physical_decoder: Arc<dyn PrimitivePageDecoder>,
2975 data_type: DataType,
2976}
2977
2978impl DecodeArrayTask for PrimitiveFieldDecodeTask {
2979 fn decode(self: Box<Self>) -> Result<ArrayRef> {
2980 let block = self
2981 .physical_decoder
2982 .decode(self.rows_to_skip, self.rows_to_take)?;
2983
2984 let array = make_array(block.into_arrow(self.data_type.clone(), self.should_validate)?);
2985
2986 if let DataType::Dictionary(_, _) = self.data_type {
2993 let dict = array.as_any_dictionary();
2994 if let Some(nulls) = array.logical_nulls() {
2995 let new_indices = dict.keys().to_data();
2996 let new_array = make_array(
2997 new_indices
2998 .into_builder()
2999 .nulls(Some(nulls))
3000 .add_child_data(dict.values().to_data())
3001 .data_type(dict.data_type().clone())
3002 .build()?,
3003 );
3004 return Ok(new_array);
3005 }
3006 }
3007 Ok(array)
3008 }
3009}
3010
3011impl LogicalPageDecoder for PrimitiveFieldDecoder {
3012 fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<Result<()>> {
3015 log::trace!(
3016 "primitive wait for more than {} rows on column {} and page {} (page has {} rows)",
3017 loaded_need,
3018 self.column_index,
3019 self.page_index,
3020 self.num_rows
3021 );
3022 async move {
3023 let physical_decoder = self.unloaded_physical_decoder.take().unwrap().await?;
3024 self.physical_decoder = Some(Arc::from(physical_decoder));
3025 Ok(())
3026 }
3027 .boxed()
3028 }
3029
3030 fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
3031 if self.physical_decoder.as_ref().is_none() {
3032 return Err(lance_core::Error::Internal {
3033 message: format!("drain was called on primitive field decoder for data type {} on column {} but the decoder was never awaited", self.data_type, self.column_index),
3034 location: location!(),
3035 });
3036 }
3037
3038 let rows_to_skip = self.rows_drained;
3039 let rows_to_take = num_rows;
3040
3041 self.rows_drained += rows_to_take;
3042
3043 let task = Box::new(PrimitiveFieldDecodeTask {
3044 rows_to_skip,
3045 rows_to_take,
3046 should_validate: self.should_validate,
3047 physical_decoder: self.physical_decoder.as_ref().unwrap().clone(),
3048 data_type: self.data_type.clone(),
3049 });
3050
3051 Ok(NextDecodeTask {
3052 task,
3053 num_rows: rows_to_take,
3054 })
3055 }
3056
3057 fn rows_loaded(&self) -> u64 {
3058 if self.unloaded_physical_decoder.is_some() {
3059 0
3060 } else {
3061 self.num_rows
3062 }
3063 }
3064
3065 fn rows_drained(&self) -> u64 {
3066 if self.unloaded_physical_decoder.is_some() {
3067 0
3068 } else {
3069 self.rows_drained
3070 }
3071 }
3072
3073 fn num_rows(&self) -> u64 {
3074 self.num_rows
3075 }
3076
3077 fn data_type(&self) -> &DataType {
3078 &self.data_type
3079 }
3080}
3081
3082#[derive(Debug)]
3085pub struct StructuralCompositeDecodeArrayTask {
3086 tasks: Vec<Box<dyn DecodePageTask>>,
3087 should_validate: bool,
3088 data_type: DataType,
3089}
3090
3091impl StructuralCompositeDecodeArrayTask {
3092 fn restore_validity(
3093 array: Arc<dyn Array>,
3094 unraveler: &mut CompositeRepDefUnraveler,
3095 ) -> Arc<dyn Array> {
3096 let validity = unraveler.unravel_validity(array.len());
3097 let Some(validity) = validity else {
3098 return array;
3099 };
3100 if array.data_type() == &DataType::Null {
3101 return array;
3103 }
3104 assert_eq!(validity.len(), array.len());
3105 make_array(unsafe {
3108 array
3109 .to_data()
3110 .into_builder()
3111 .nulls(Some(validity))
3112 .build_unchecked()
3113 })
3114 }
3115}
3116
3117impl StructuralDecodeArrayTask for StructuralCompositeDecodeArrayTask {
3118 fn decode(self: Box<Self>) -> Result<DecodedArray> {
3119 let mut arrays = Vec::with_capacity(self.tasks.len());
3120 let mut unravelers = Vec::with_capacity(self.tasks.len());
3121 for task in self.tasks {
3122 let decoded = task.decode()?;
3123 unravelers.push(decoded.repdef);
3124
3125 let array = make_array(
3126 decoded
3127 .data
3128 .into_arrow(self.data_type.clone(), self.should_validate)?,
3129 );
3130
3131 arrays.push(array);
3132 }
3133 let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
3134 let array = arrow_select::concat::concat(&array_refs)?;
3135 let mut repdef = CompositeRepDefUnraveler::new(unravelers);
3136
3137 let array = Self::restore_validity(array, &mut repdef);
3138
3139 Ok(DecodedArray { array, repdef })
3140 }
3141}
3142
3143#[derive(Debug)]
3144pub struct StructuralPrimitiveFieldDecoder {
3145 field: Arc<ArrowField>,
3146 page_decoders: VecDeque<Box<dyn StructuralPageDecoder>>,
3147 should_validate: bool,
3148 rows_drained_in_current: u64,
3149}
3150
3151impl StructuralPrimitiveFieldDecoder {
3152 pub fn new(field: &Arc<ArrowField>, should_validate: bool) -> Self {
3153 Self {
3154 field: field.clone(),
3155 page_decoders: VecDeque::new(),
3156 should_validate,
3157 rows_drained_in_current: 0,
3158 }
3159 }
3160}
3161
3162impl StructuralFieldDecoder for StructuralPrimitiveFieldDecoder {
3163 fn accept_page(&mut self, child: LoadedPage) -> Result<()> {
3164 assert!(child.path.is_empty());
3165 self.page_decoders.push_back(child.decoder);
3166 Ok(())
3167 }
3168
3169 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
3170 let mut remaining = num_rows;
3171 let mut tasks = Vec::new();
3172 while remaining > 0 {
3173 let cur_page = self.page_decoders.front_mut().unwrap();
3174 let num_in_page = cur_page.num_rows() - self.rows_drained_in_current;
3175 let to_take = num_in_page.min(remaining);
3176
3177 let task = cur_page.drain(to_take)?;
3178 tasks.push(task);
3179
3180 if to_take == num_in_page {
3181 self.page_decoders.pop_front();
3182 self.rows_drained_in_current = 0;
3183 } else {
3184 self.rows_drained_in_current += to_take;
3185 }
3186
3187 remaining -= to_take;
3188 }
3189 Ok(Box::new(StructuralCompositeDecodeArrayTask {
3190 tasks,
3191 should_validate: self.should_validate,
3192 data_type: self.field.data_type().clone(),
3193 }))
3194 }
3195
3196 fn data_type(&self) -> &DataType {
3197 self.field.data_type()
3198 }
3199}
3200
3201#[derive(Debug)]
3202pub struct AccumulationQueue {
3203 cache_bytes: u64,
3204 keep_original_array: bool,
3205 buffered_arrays: Vec<ArrayRef>,
3206 current_bytes: u64,
3207 row_number: u64,
3209 num_rows: u64,
3211 column_index: u32,
3213}
3214
3215impl AccumulationQueue {
3216 pub fn new(cache_bytes: u64, column_index: u32, keep_original_array: bool) -> Self {
3217 Self {
3218 cache_bytes,
3219 buffered_arrays: Vec::new(),
3220 current_bytes: 0,
3221 column_index,
3222 keep_original_array,
3223 row_number: u64::MAX,
3224 num_rows: 0,
3225 }
3226 }
3227
3228 pub fn insert(
3231 &mut self,
3232 array: ArrayRef,
3233 row_number: u64,
3234 num_rows: u64,
3235 ) -> Option<(Vec<ArrayRef>, u64, u64)> {
3236 if self.row_number == u64::MAX {
3237 self.row_number = row_number;
3238 }
3239 self.num_rows += num_rows;
3240 self.current_bytes += array.get_array_memory_size() as u64;
3241 if self.current_bytes > self.cache_bytes {
3242 debug!(
3243 "Flushing column {} page of size {} bytes (unencoded)",
3244 self.column_index, self.current_bytes
3245 );
3246 self.buffered_arrays.push(array);
3248 self.current_bytes = 0;
3249 let row_number = self.row_number;
3250 self.row_number = u64::MAX;
3251 let num_rows = self.num_rows;
3252 self.num_rows = 0;
3253 Some((
3254 std::mem::take(&mut self.buffered_arrays),
3255 row_number,
3256 num_rows,
3257 ))
3258 } else {
3259 trace!(
3260 "Accumulating data for column {}. Now at {} bytes",
3261 self.column_index,
3262 self.current_bytes
3263 );
3264 if self.keep_original_array {
3265 self.buffered_arrays.push(array);
3266 } else {
3267 self.buffered_arrays.push(deep_copy_array(array.as_ref()))
3268 }
3269 None
3270 }
3271 }
3272
3273 pub fn flush(&mut self) -> Option<(Vec<ArrayRef>, u64, u64)> {
3274 if self.buffered_arrays.is_empty() {
3275 trace!(
3276 "No final flush since no data at column {}",
3277 self.column_index
3278 );
3279 None
3280 } else {
3281 trace!(
3282 "Final flush of column {} which has {} bytes",
3283 self.column_index,
3284 self.current_bytes
3285 );
3286 self.current_bytes = 0;
3287 let row_number = self.row_number;
3288 self.row_number = u64::MAX;
3289 let num_rows = self.num_rows;
3290 self.num_rows = 0;
3291 Some((
3292 std::mem::take(&mut self.buffered_arrays),
3293 row_number,
3294 num_rows,
3295 ))
3296 }
3297 }
3298}
3299
3300pub struct PrimitiveFieldEncoder {
3301 accumulation_queue: AccumulationQueue,
3302 array_encoding_strategy: Arc<dyn ArrayEncodingStrategy>,
3303 column_index: u32,
3304 field: Field,
3305 max_page_bytes: u64,
3306}
3307
3308impl PrimitiveFieldEncoder {
3309 pub fn try_new(
3310 options: &EncodingOptions,
3311 array_encoding_strategy: Arc<dyn ArrayEncodingStrategy>,
3312 column_index: u32,
3313 field: Field,
3314 ) -> Result<Self> {
3315 Ok(Self {
3316 accumulation_queue: AccumulationQueue::new(
3317 options.cache_bytes_per_column,
3318 column_index,
3319 options.keep_original_array,
3320 ),
3321 column_index,
3322 max_page_bytes: options.max_page_bytes,
3323 array_encoding_strategy,
3324 field,
3325 })
3326 }
3327
3328 fn create_encode_task(&mut self, arrays: Vec<ArrayRef>) -> Result<EncodeTask> {
3329 let encoder = self
3330 .array_encoding_strategy
3331 .create_array_encoder(&arrays, &self.field)?;
3332 let column_idx = self.column_index;
3333 let data_type = self.field.data_type();
3334
3335 Ok(tokio::task::spawn(async move {
3336 let num_values = arrays.iter().map(|arr| arr.len() as u64).sum();
3337 let data = DataBlock::from_arrays(&arrays, num_values);
3338 let mut buffer_index = 0;
3339 let array = encoder.encode(data, &data_type, &mut buffer_index)?;
3340 let (data, description) = array.into_buffers();
3341 Ok(EncodedPage {
3342 data,
3343 description: PageEncoding::Legacy(description),
3344 num_rows: num_values,
3345 column_idx,
3346 row_number: 0, })
3348 })
3349 .map(|res_res| res_res.unwrap())
3350 .boxed())
3351 }
3352
3353 fn do_flush(&mut self, arrays: Vec<ArrayRef>) -> Result<Vec<EncodeTask>> {
3355 if arrays.len() == 1 {
3356 let array = arrays.into_iter().next().unwrap();
3357 let size_bytes = array.get_buffer_memory_size();
3358 let num_parts = bit_util::ceil(size_bytes, self.max_page_bytes as usize);
3359 let num_parts = num_parts.min(array.len());
3361 if num_parts <= 1 {
3362 Ok(vec![self.create_encode_task(vec![array])?])
3364 } else {
3365 let mut tasks = Vec::with_capacity(num_parts);
3370 let mut offset = 0;
3371 let part_size = bit_util::ceil(array.len(), num_parts);
3372 for _ in 0..num_parts {
3373 let avail = array.len() - offset;
3374 if avail == 0 {
3375 break;
3376 }
3377 let chunk_size = avail.min(part_size);
3378 let part = array.slice(offset, chunk_size);
3379 let task = self.create_encode_task(vec![part])?;
3380 tasks.push(task);
3381 offset += chunk_size;
3382 }
3383 Ok(tasks)
3384 }
3385 } else {
3386 Ok(vec![self.create_encode_task(arrays)?])
3390 }
3391 }
3392}
3393
3394impl FieldEncoder for PrimitiveFieldEncoder {
3395 fn maybe_encode(
3397 &mut self,
3398 array: ArrayRef,
3399 _external_buffers: &mut OutOfLineBuffers,
3400 _repdef: RepDefBuilder,
3401 row_number: u64,
3402 num_rows: u64,
3403 ) -> Result<Vec<EncodeTask>> {
3404 if let Some(arrays) = self.accumulation_queue.insert(array, row_number, num_rows) {
3405 Ok(self.do_flush(arrays.0)?)
3406 } else {
3407 Ok(vec![])
3408 }
3409 }
3410
3411 fn flush(&mut self, _external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
3413 if let Some(arrays) = self.accumulation_queue.flush() {
3414 Ok(self.do_flush(arrays.0)?)
3415 } else {
3416 Ok(vec![])
3417 }
3418 }
3419
3420 fn num_columns(&self) -> u32 {
3421 1
3422 }
3423
3424 fn finish(
3425 &mut self,
3426 _external_buffers: &mut OutOfLineBuffers,
3427 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
3428 std::future::ready(Ok(vec![EncodedColumn::default()])).boxed()
3429 }
3430}
3431
3432struct SerializedFullZip {
3434 values: LanceBuffer,
3436 repetition_index: Option<LanceBuffer>,
3438}
3439
3440const MINIBLOCK_ALIGNMENT: usize = 8;
3460
3461pub struct PrimitiveStructuralEncoder {
3488 accumulation_queue: AccumulationQueue,
3490 accumulated_repdefs: Vec<RepDefBuilder>,
3491 compression_strategy: Arc<dyn CompressionStrategy>,
3493 column_index: u32,
3494 field: Field,
3495 encoding_metadata: Arc<HashMap<String, String>>,
3496}
3497
3498struct CompressedLevelsChunk {
3499 data: LanceBuffer,
3500 num_levels: u16,
3501}
3502
3503struct CompressedLevels {
3504 data: Vec<CompressedLevelsChunk>,
3505 compression: pb::ArrayEncoding,
3506 rep_index: Option<LanceBuffer>,
3507}
3508
3509struct SerializedMiniBlockPage {
3510 num_buffers: u64,
3511 data: LanceBuffer,
3512 metadata: LanceBuffer,
3513}
3514
3515impl PrimitiveStructuralEncoder {
3516 pub fn try_new(
3517 options: &EncodingOptions,
3518 compression_strategy: Arc<dyn CompressionStrategy>,
3519 column_index: u32,
3520 field: Field,
3521 encoding_metadata: Arc<HashMap<String, String>>,
3522 ) -> Result<Self> {
3523 Ok(Self {
3524 accumulation_queue: AccumulationQueue::new(
3525 options.cache_bytes_per_column,
3526 column_index,
3527 options.keep_original_array,
3528 ),
3529 accumulated_repdefs: Vec::new(),
3530 column_index,
3531 compression_strategy,
3532 field,
3533 encoding_metadata,
3534 })
3535 }
3536
3537 fn is_narrow(data_block: &DataBlock) -> bool {
3545 const MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE: u64 = 256;
3546
3547 if let Some(max_len_array) = data_block.get_stat(Stat::MaxLength) {
3548 let max_len_array = max_len_array
3549 .as_any()
3550 .downcast_ref::<PrimitiveArray<UInt64Type>>()
3551 .unwrap();
3552 if max_len_array.value(0) < MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE {
3553 return true;
3554 }
3555 }
3556 false
3557 }
3558
3559 fn prefers_miniblock(
3560 data_block: &DataBlock,
3561 encoding_metadata: &HashMap<String, String>,
3562 ) -> bool {
3563 if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3565 return user_requested.to_lowercase() == STRUCTURAL_ENCODING_MINIBLOCK;
3566 }
3567 Self::is_narrow(data_block)
3569 }
3570
3571 fn prefers_fullzip(encoding_metadata: &HashMap<String, String>) -> bool {
3572 if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
3576 return user_requested.to_lowercase() == STRUCTURAL_ENCODING_FULLZIP;
3577 }
3578 true
3579 }
3580
3581 fn serialize_miniblocks(
3628 miniblocks: MiniBlockCompressed,
3629 rep: Option<Vec<CompressedLevelsChunk>>,
3630 def: Option<Vec<CompressedLevelsChunk>>,
3631 ) -> SerializedMiniBlockPage {
3632 let bytes_rep = rep
3633 .as_ref()
3634 .map(|rep| rep.iter().map(|r| r.data.len()).sum::<usize>())
3635 .unwrap_or(0);
3636 let bytes_def = def
3637 .as_ref()
3638 .map(|def| def.iter().map(|d| d.data.len()).sum::<usize>())
3639 .unwrap_or(0);
3640 let bytes_data = miniblocks.data.iter().map(|d| d.len()).sum::<usize>();
3641 let mut num_buffers = miniblocks.data.len();
3642 if rep.is_some() {
3643 num_buffers += 1;
3644 }
3645 if def.is_some() {
3646 num_buffers += 1;
3647 }
3648 let max_extra = 9 * num_buffers;
3650 let mut data_buffer = Vec::with_capacity(bytes_rep + bytes_def + bytes_data + max_extra);
3651 let mut meta_buffer = Vec::with_capacity(miniblocks.chunks.len() * 2);
3652
3653 let mut rep_iter = rep.map(|r| r.into_iter());
3654 let mut def_iter = def.map(|d| d.into_iter());
3655
3656 let mut buffer_offsets = vec![0; miniblocks.data.len()];
3657 for chunk in miniblocks.chunks {
3658 let start_pos = data_buffer.len();
3659 debug_assert_eq!(start_pos % MINIBLOCK_ALIGNMENT, 0);
3661
3662 let rep = rep_iter.as_mut().map(|r| r.next().unwrap());
3663 let def = def_iter.as_mut().map(|d| d.next().unwrap());
3664
3665 let num_levels = rep
3667 .as_ref()
3668 .map(|r| r.num_levels)
3669 .unwrap_or(def.as_ref().map(|d| d.num_levels).unwrap_or(0));
3670 data_buffer.extend_from_slice(&num_levels.to_le_bytes());
3671
3672 if let Some(rep) = rep.as_ref() {
3674 let bytes_rep = u16::try_from(rep.data.len()).unwrap();
3675 data_buffer.extend_from_slice(&bytes_rep.to_le_bytes());
3676 }
3677 if let Some(def) = def.as_ref() {
3678 let bytes_def = u16::try_from(def.data.len()).unwrap();
3679 data_buffer.extend_from_slice(&bytes_def.to_le_bytes());
3680 }
3681
3682 for buffer_size in &chunk.buffer_sizes {
3683 let bytes = *buffer_size;
3684 data_buffer.extend_from_slice(&bytes.to_le_bytes());
3685 }
3686
3687 let add_padding = |data_buffer: &mut Vec<u8>| {
3689 let pad = pad_bytes::<MINIBLOCK_ALIGNMENT>(data_buffer.len());
3690 data_buffer.extend(iter::repeat_n(FILL_BYTE, pad));
3691 };
3692 add_padding(&mut data_buffer);
3693
3694 if let Some(rep) = rep.as_ref() {
3696 data_buffer.extend_from_slice(&rep.data);
3697 add_padding(&mut data_buffer);
3698 }
3699 if let Some(def) = def.as_ref() {
3700 data_buffer.extend_from_slice(&def.data);
3701 add_padding(&mut data_buffer);
3702 }
3703 for (buffer_size, (buffer, buffer_offset)) in chunk
3704 .buffer_sizes
3705 .iter()
3706 .zip(miniblocks.data.iter().zip(buffer_offsets.iter_mut()))
3707 {
3708 let start = *buffer_offset;
3709 let end = start + *buffer_size as usize;
3710 *buffer_offset += *buffer_size as usize;
3711 data_buffer.extend_from_slice(&buffer[start..end]);
3712 add_padding(&mut data_buffer);
3713 }
3714
3715 let chunk_bytes = data_buffer.len() - start_pos;
3716 assert!(chunk_bytes <= 16 * 1024);
3717 assert!(chunk_bytes > 0);
3718 assert_eq!(chunk_bytes % 8, 0);
3719 let divided_bytes = chunk_bytes / MINIBLOCK_ALIGNMENT;
3723 let divided_bytes_minus_one = (divided_bytes - 1) as u64;
3724
3725 let metadata = ((divided_bytes_minus_one << 4) | chunk.log_num_values as u64) as u16;
3726 meta_buffer.extend_from_slice(&metadata.to_le_bytes());
3727 }
3728
3729 let data_buffer = LanceBuffer::Owned(data_buffer);
3730 let metadata_buffer = LanceBuffer::Owned(meta_buffer);
3731
3732 SerializedMiniBlockPage {
3733 num_buffers: miniblocks.data.len() as u64,
3734 data: data_buffer,
3735 metadata: metadata_buffer,
3736 }
3737 }
3738
3739 fn compress_levels(
3744 mut levels: RepDefSlicer<'_>,
3745 num_elements: u64,
3746 compression_strategy: &dyn CompressionStrategy,
3747 chunks: &[MiniBlockChunk],
3748 max_rep: u16,
3750 ) -> Result<CompressedLevels> {
3751 let mut rep_index = if max_rep > 0 {
3752 Vec::with_capacity(chunks.len())
3753 } else {
3754 vec![]
3755 };
3756 let num_levels = levels.num_levels() as u64;
3758 let mut levels_buf = levels.all_levels().try_clone().unwrap();
3759 let levels_block = DataBlock::FixedWidth(FixedWidthDataBlock {
3760 data: levels_buf.borrow_and_clone(),
3761 bits_per_value: 16,
3762 num_values: num_levels,
3763 block_info: BlockInfo::new(),
3764 });
3765 let levels_field = Field::new_arrow("", DataType::UInt16, false)?;
3766 let (compressor, compressor_desc) =
3768 compression_strategy.create_block_compressor(&levels_field, &levels_block)?;
3769 let mut level_chunks = Vec::with_capacity(chunks.len());
3771 let mut values_counter = 0;
3772 for (chunk_idx, chunk) in chunks.iter().enumerate() {
3773 let chunk_num_values = chunk.num_values(values_counter, num_elements);
3774 values_counter += chunk_num_values;
3775 let mut chunk_levels = if chunk_idx < chunks.len() - 1 {
3776 levels.slice_next(chunk_num_values as usize)
3777 } else {
3778 levels.slice_rest()
3779 };
3780 let num_chunk_levels = (chunk_levels.len() / 2) as u64;
3781 if max_rep > 0 {
3782 let rep_values = chunk_levels.borrow_to_typed_slice::<u16>();
3792 let rep_values = rep_values.as_ref();
3793
3794 let mut num_rows = rep_values.iter().skip(1).filter(|v| **v == max_rep).count();
3797 let num_leftovers = if chunk_idx < chunks.len() - 1 {
3798 rep_values
3799 .iter()
3800 .rev()
3801 .position(|v| *v == max_rep)
3802 .map(|pos| pos + 1)
3804 .unwrap_or(rep_values.len())
3805 } else {
3806 0
3808 };
3809
3810 if chunk_idx != 0 && rep_values[0] == max_rep {
3811 let rep_len = rep_index.len();
3815 if rep_index[rep_len - 1] != 0 {
3816 rep_index[rep_len - 2] += 1;
3818 rep_index[rep_len - 1] = 0;
3819 }
3820 }
3821
3822 if chunk_idx == chunks.len() - 1 {
3823 num_rows += 1;
3825 }
3826 rep_index.push(num_rows as u64);
3827 rep_index.push(num_leftovers as u64);
3828 }
3829 let chunk_levels_block = DataBlock::FixedWidth(FixedWidthDataBlock {
3830 data: chunk_levels,
3831 bits_per_value: 16,
3832 num_values: num_chunk_levels,
3833 block_info: BlockInfo::new(),
3834 });
3835 let compressed_levels = compressor.compress(chunk_levels_block)?;
3836 level_chunks.push(CompressedLevelsChunk {
3837 data: compressed_levels,
3838 num_levels: num_chunk_levels as u16,
3839 });
3840 }
3841 debug_assert_eq!(levels.num_levels_remaining(), 0);
3842 let rep_index = if rep_index.is_empty() {
3843 None
3844 } else {
3845 Some(LanceBuffer::reinterpret_vec(rep_index))
3846 };
3847 Ok(CompressedLevels {
3848 data: level_chunks,
3849 compression: compressor_desc,
3850 rep_index,
3851 })
3852 }
3853
3854 fn encode_simple_all_null(
3855 column_idx: u32,
3856 num_rows: u64,
3857 row_number: u64,
3858 ) -> Result<EncodedPage> {
3859 let description = ProtobufUtils::simple_all_null_layout();
3860 Ok(EncodedPage {
3861 column_idx,
3862 data: vec![],
3863 description: PageEncoding::Structural(description),
3864 num_rows,
3865 row_number,
3866 })
3867 }
3868
3869 fn encode_complex_all_null(
3873 column_idx: u32,
3874 repdefs: Vec<RepDefBuilder>,
3875 row_number: u64,
3876 num_rows: u64,
3877 ) -> Result<EncodedPage> {
3878 let repdef = RepDefBuilder::serialize(repdefs);
3879
3880 let rep_bytes = if let Some(rep) = repdef.repetition_levels.as_ref() {
3882 LanceBuffer::reinterpret_slice(rep.clone())
3883 } else {
3884 LanceBuffer::empty()
3885 };
3886
3887 let def_bytes = if let Some(def) = repdef.definition_levels.as_ref() {
3888 LanceBuffer::reinterpret_slice(def.clone())
3889 } else {
3890 LanceBuffer::empty()
3891 };
3892
3893 let description = ProtobufUtils::all_null_layout(&repdef.def_meaning);
3894 Ok(EncodedPage {
3895 column_idx,
3896 data: vec![rep_bytes, def_bytes],
3897 description: PageEncoding::Structural(description),
3898 num_rows,
3899 row_number,
3900 })
3901 }
3902
3903 #[allow(clippy::too_many_arguments)]
3904 fn encode_miniblock(
3905 column_idx: u32,
3906 field: &Field,
3907 compression_strategy: &dyn CompressionStrategy,
3908 data: DataBlock,
3909 repdefs: Vec<RepDefBuilder>,
3910 row_number: u64,
3911 dictionary_data: Option<DataBlock>,
3912 num_rows: u64,
3913 ) -> Result<EncodedPage> {
3914 let repdef = RepDefBuilder::serialize(repdefs);
3915
3916 if let DataBlock::AllNull(_null_block) = data {
3917 todo!()
3920 }
3921
3922 let data = data.remove_outer_validity();
3926
3927 let num_items = data.num_values();
3928
3929 let compressor = compression_strategy.create_miniblock_compressor(field, &data)?;
3930 let (compressed_data, value_encoding) = compressor.compress(data)?;
3931
3932 let max_rep = repdef.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
3933
3934 let mut compressed_rep = repdef
3935 .rep_slicer()
3936 .map(|rep_slicer| {
3937 Self::compress_levels(
3938 rep_slicer,
3939 num_items,
3940 compression_strategy,
3941 &compressed_data.chunks,
3942 max_rep,
3943 )
3944 })
3945 .transpose()?;
3946
3947 let (rep_index, rep_index_depth) =
3948 match compressed_rep.as_mut().and_then(|cr| cr.rep_index.as_mut()) {
3949 Some(rep_index) => (Some(rep_index.borrow_and_clone()), 1),
3950 None => (None, 0),
3951 };
3952
3953 let mut compressed_def = repdef
3954 .def_slicer()
3955 .map(|def_slicer| {
3956 Self::compress_levels(
3957 def_slicer,
3958 num_items,
3959 compression_strategy,
3960 &compressed_data.chunks,
3961 0,
3962 )
3963 })
3964 .transpose()?;
3965
3966 let rep_data = compressed_rep
3972 .as_mut()
3973 .map(|cr| std::mem::take(&mut cr.data));
3974 let def_data = compressed_def
3975 .as_mut()
3976 .map(|cd| std::mem::take(&mut cd.data));
3977
3978 let serialized = Self::serialize_miniblocks(compressed_data, rep_data, def_data);
3979
3980 let mut data = Vec::with_capacity(4);
3982 data.push(serialized.metadata);
3983 data.push(serialized.data);
3984
3985 if let Some(dictionary_data) = dictionary_data {
3986 let num_dictionary_items = dictionary_data.num_values();
3987 let dummy_dictionary_field = Field::new_arrow("", DataType::UInt16, false)?;
3989
3990 let (compressor, dictionary_encoding) = compression_strategy
3991 .create_block_compressor(&dummy_dictionary_field, &dictionary_data)?;
3992 let dictionary_buffer = compressor.compress(dictionary_data)?;
3993
3994 data.push(dictionary_buffer);
3995 if let Some(rep_index) = rep_index {
3996 data.push(rep_index);
3997 }
3998
3999 let description = ProtobufUtils::miniblock_layout(
4000 compressed_rep.map(|cr| cr.compression),
4001 compressed_def.map(|cd| cd.compression),
4002 value_encoding,
4003 rep_index_depth,
4004 serialized.num_buffers,
4005 Some((dictionary_encoding, num_dictionary_items)),
4006 &repdef.def_meaning,
4007 num_items,
4008 );
4009 Ok(EncodedPage {
4010 num_rows,
4011 column_idx,
4012 data,
4013 description: PageEncoding::Structural(description),
4014 row_number,
4015 })
4016 } else {
4017 let description = ProtobufUtils::miniblock_layout(
4018 compressed_rep.map(|cr| cr.compression),
4019 compressed_def.map(|cd| cd.compression),
4020 value_encoding,
4021 rep_index_depth,
4022 serialized.num_buffers,
4023 None,
4024 &repdef.def_meaning,
4025 num_items,
4026 );
4027
4028 if let Some(mut rep_index) = rep_index {
4029 let view = rep_index.borrow_to_typed_slice::<u64>();
4030 let total = view.chunks_exact(2).map(|c| c[0]).sum::<u64>();
4031 debug_assert_eq!(total, num_rows);
4032
4033 data.push(rep_index);
4034 }
4035
4036 Ok(EncodedPage {
4037 num_rows,
4038 column_idx,
4039 data,
4040 description: PageEncoding::Structural(description),
4041 row_number,
4042 })
4043 }
4044 }
4045
4046 fn serialize_full_zip_fixed(
4048 fixed: FixedWidthDataBlock,
4049 mut repdef: ControlWordIterator,
4050 num_values: u64,
4051 ) -> SerializedFullZip {
4052 let len = fixed.data.len() + repdef.bytes_per_word() * num_values as usize;
4053 let mut zipped_data = Vec::with_capacity(len);
4054
4055 let max_rep_index_val = if repdef.has_repetition() {
4056 len as u64
4057 } else {
4058 0
4060 };
4061 let mut rep_index_builder =
4062 BytepackedIntegerEncoder::with_capacity(num_values as usize + 1, max_rep_index_val);
4063
4064 assert_eq!(
4067 fixed.bits_per_value % 8,
4068 0,
4069 "Non-byte aligned full-zip compression not yet supported"
4070 );
4071
4072 let bytes_per_value = fixed.bits_per_value as usize / 8;
4073
4074 let mut data_iter = fixed.data.chunks_exact(bytes_per_value);
4075 let mut offset = 0;
4076 while let Some(control) = repdef.append_next(&mut zipped_data) {
4077 if control.is_new_row {
4078 debug_assert!(offset <= len);
4080 unsafe { rep_index_builder.append(offset as u64) };
4082 }
4083 if control.is_visible {
4084 let value = data_iter.next().unwrap();
4085 zipped_data.extend_from_slice(value);
4086 }
4087 offset = zipped_data.len();
4088 }
4089
4090 debug_assert_eq!(zipped_data.len(), len);
4091 unsafe {
4094 rep_index_builder.append(zipped_data.len() as u64);
4095 }
4096
4097 let zipped_data = LanceBuffer::Owned(zipped_data);
4098 let rep_index = rep_index_builder.into_data();
4099 let rep_index = if rep_index.is_empty() {
4100 None
4101 } else {
4102 Some(LanceBuffer::Owned(rep_index))
4103 };
4104 SerializedFullZip {
4105 values: zipped_data,
4106 repetition_index: rep_index,
4107 }
4108 }
4109
4110 fn serialize_full_zip_variable(
4114 mut variable: VariableWidthBlock,
4115 mut repdef: ControlWordIterator,
4116 num_items: u64,
4117 ) -> SerializedFullZip {
4118 let bytes_per_offset = variable.bits_per_offset as usize / 8;
4119 assert_eq!(
4120 variable.bits_per_offset % 8,
4121 0,
4122 "Only byte-aligned offsets supported"
4123 );
4124 let len = variable.data.len()
4125 + repdef.bytes_per_word() * num_items as usize
4126 + bytes_per_offset * variable.num_values as usize;
4127 let mut buf = Vec::with_capacity(len);
4128
4129 let max_rep_index_val = len as u64;
4130 let mut rep_index_builder =
4131 BytepackedIntegerEncoder::with_capacity(num_items as usize + 1, max_rep_index_val);
4132
4133 match bytes_per_offset {
4135 4 => {
4136 let offs = variable.offsets.borrow_to_typed_slice::<u32>();
4137 let mut rep_offset = 0;
4138 let mut windows_iter = offs.as_ref().windows(2);
4139 while let Some(control) = repdef.append_next(&mut buf) {
4140 if control.is_new_row {
4141 debug_assert!(rep_offset <= len);
4143 unsafe { rep_index_builder.append(rep_offset as u64) };
4145 }
4146 if control.is_visible {
4147 let window = windows_iter.next().unwrap();
4148 if control.is_valid_item {
4149 buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
4150 buf.extend_from_slice(
4151 &variable.data[window[0] as usize..window[1] as usize],
4152 );
4153 }
4154 }
4155 rep_offset = buf.len();
4156 }
4157 }
4158 8 => {
4159 let offs = variable.offsets.borrow_to_typed_slice::<u64>();
4160 let mut rep_offset = 0;
4161 let mut windows_iter = offs.as_ref().windows(2);
4162 while let Some(control) = repdef.append_next(&mut buf) {
4163 if control.is_new_row {
4164 debug_assert!(rep_offset <= len);
4166 unsafe { rep_index_builder.append(rep_offset as u64) };
4168 }
4169 if control.is_visible {
4170 let window = windows_iter.next().unwrap();
4171 if control.is_valid_item {
4172 buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
4173 buf.extend_from_slice(
4174 &variable.data[window[0] as usize..window[1] as usize],
4175 );
4176 }
4177 }
4178 rep_offset = buf.len();
4179 }
4180 }
4181 _ => panic!("Unsupported offset size"),
4182 }
4183
4184 debug_assert!(buf.len() <= len);
4187 unsafe {
4190 rep_index_builder.append(buf.len() as u64);
4191 }
4192
4193 let zipped_data = LanceBuffer::Owned(buf);
4194 let rep_index = rep_index_builder.into_data();
4195 debug_assert!(!rep_index.is_empty());
4196 let rep_index = Some(LanceBuffer::Owned(rep_index));
4197 SerializedFullZip {
4198 values: zipped_data,
4199 repetition_index: rep_index,
4200 }
4201 }
4202
4203 fn serialize_full_zip(
4206 compressed_data: PerValueDataBlock,
4207 repdef: ControlWordIterator,
4208 num_items: u64,
4209 ) -> SerializedFullZip {
4210 match compressed_data {
4211 PerValueDataBlock::Fixed(fixed) => {
4212 Self::serialize_full_zip_fixed(fixed, repdef, num_items)
4213 }
4214 PerValueDataBlock::Variable(var) => {
4215 Self::serialize_full_zip_variable(var, repdef, num_items)
4216 }
4217 }
4218 }
4219
4220 fn encode_full_zip(
4221 column_idx: u32,
4222 field: &Field,
4223 compression_strategy: &dyn CompressionStrategy,
4224 data: DataBlock,
4225 repdefs: Vec<RepDefBuilder>,
4226 row_number: u64,
4227 num_lists: u64,
4228 ) -> Result<EncodedPage> {
4229 let repdef = RepDefBuilder::serialize(repdefs);
4230 let max_rep = repdef
4231 .repetition_levels
4232 .as_ref()
4233 .map_or(0, |r| r.iter().max().copied().unwrap_or(0));
4234 let max_def = repdef
4235 .definition_levels
4236 .as_ref()
4237 .map_or(0, |d| d.iter().max().copied().unwrap_or(0));
4238
4239 let data = data.remove_outer_validity();
4241
4242 let (num_items, num_visible_items) =
4246 if let Some(rep_levels) = repdef.repetition_levels.as_ref() {
4247 (rep_levels.len() as u64, data.num_values())
4250 } else {
4251 (data.num_values(), data.num_values())
4253 };
4254
4255 let max_visible_def = repdef.max_visible_level.unwrap_or(u16::MAX);
4256
4257 let repdef_iter = build_control_word_iterator(
4258 repdef.repetition_levels.as_deref(),
4259 max_rep,
4260 repdef.definition_levels.as_deref(),
4261 max_def,
4262 max_visible_def,
4263 num_items as usize,
4264 );
4265 let bits_rep = repdef_iter.bits_rep();
4266 let bits_def = repdef_iter.bits_def();
4267
4268 let compressor = compression_strategy.create_per_value(field, &data)?;
4269 let (compressed_data, value_encoding) = compressor.compress(data)?;
4270
4271 let description = match &compressed_data {
4272 PerValueDataBlock::Fixed(fixed) => ProtobufUtils::fixed_full_zip_layout(
4273 bits_rep,
4274 bits_def,
4275 fixed.bits_per_value as u32,
4276 value_encoding,
4277 &repdef.def_meaning,
4278 num_items as u32,
4279 num_visible_items as u32,
4280 ),
4281 PerValueDataBlock::Variable(variable) => ProtobufUtils::variable_full_zip_layout(
4282 bits_rep,
4283 bits_def,
4284 variable.bits_per_offset as u32,
4285 value_encoding,
4286 &repdef.def_meaning,
4287 num_items as u32,
4288 num_visible_items as u32,
4289 ),
4290 };
4291
4292 let zipped = Self::serialize_full_zip(compressed_data, repdef_iter, num_items);
4293
4294 let data = if let Some(repindex) = zipped.repetition_index {
4295 vec![zipped.values, repindex]
4296 } else {
4297 vec![zipped.values]
4298 };
4299
4300 Ok(EncodedPage {
4301 num_rows: num_lists,
4302 column_idx,
4303 data,
4304 description: PageEncoding::Structural(description),
4305 row_number,
4306 })
4307 }
4308
4309 fn dictionary_encode(mut data_block: DataBlock, cardinality: u64) -> (DataBlock, DataBlock) {
4310 match data_block {
4311 DataBlock::FixedWidth(ref mut fixed_width_data_block) => {
4312 let mut map = HashMap::new();
4315 let u128_slice = fixed_width_data_block.data.borrow_to_typed_slice::<u128>();
4316 let u128_slice = u128_slice.as_ref();
4317 let mut dictionary_buffer = Vec::with_capacity(cardinality as usize);
4318 let mut indices_buffer =
4319 Vec::with_capacity(fixed_width_data_block.num_values as usize);
4320 let mut curr_idx: u8 = 0;
4321 u128_slice.iter().for_each(|&value| {
4322 let idx = *map.entry(value).or_insert_with(|| {
4323 dictionary_buffer.push(value);
4324 curr_idx += 1;
4325 curr_idx - 1
4326 });
4327 indices_buffer.push(idx);
4328 });
4329 let dictionary_data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
4330 data: LanceBuffer::reinterpret_vec(dictionary_buffer),
4331 bits_per_value: 128,
4332 num_values: curr_idx as u64,
4333 block_info: BlockInfo::default(),
4334 });
4335 let mut indices_data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
4336 data: LanceBuffer::reinterpret_vec(indices_buffer),
4337 bits_per_value: 8,
4338 num_values: fixed_width_data_block.num_values,
4339 block_info: BlockInfo::default(),
4340 });
4341 indices_data_block.compute_stat();
4344
4345 (indices_data_block, dictionary_data_block)
4346 }
4347 DataBlock::VariableWidth(ref mut variable_width_data_block) => {
4348 match variable_width_data_block.bits_per_offset {
4349 32 => {
4350 let mut map: HashMap<U8SliceKey, u8> = HashMap::new();
4351 let offsets = variable_width_data_block
4352 .offsets
4353 .borrow_to_typed_slice::<u32>();
4354 let offsets = offsets.as_ref();
4355
4356 let max_len = variable_width_data_block.get_stat(Stat::MaxLength).expect(
4357 "VariableWidth DataBlock should have valid `Stat::DataSize` statistics",
4358 );
4359 let max_len = max_len.as_primitive::<UInt64Type>().value(0);
4360
4361 let mut dictionary_buffer: Vec<u8> =
4362 Vec::with_capacity((max_len * cardinality) as usize);
4363 let mut dictionary_offsets_buffer = vec![0];
4364 let mut curr_idx = 0;
4365 let mut indices_buffer =
4366 Vec::with_capacity(variable_width_data_block.num_values as usize);
4367
4368 offsets
4369 .iter()
4370 .zip(offsets.iter().skip(1))
4371 .for_each(|(&start, &end)| {
4372 let key =
4373 &variable_width_data_block.data[start as usize..end as usize];
4374 let idx = *map.entry(U8SliceKey(key)).or_insert_with(|| {
4375 dictionary_buffer.extend_from_slice(key);
4376 dictionary_offsets_buffer.push(dictionary_buffer.len() as u32);
4377 curr_idx += 1;
4378 curr_idx - 1
4379 });
4380 indices_buffer.push(idx);
4381 });
4382
4383 let dictionary_data_block = DataBlock::VariableWidth(VariableWidthBlock {
4384 data: LanceBuffer::reinterpret_vec(dictionary_buffer),
4385 offsets: LanceBuffer::reinterpret_vec(dictionary_offsets_buffer),
4386 bits_per_offset: 32,
4387 num_values: curr_idx as u64,
4388 block_info: BlockInfo::default(),
4389 });
4390
4391 let mut indices_data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
4392 data: LanceBuffer::Owned(indices_buffer),
4393 bits_per_value: 8,
4394 num_values: variable_width_data_block.num_values,
4395 block_info: BlockInfo::default(),
4396 });
4397 indices_data_block.compute_stat();
4400
4401 (indices_data_block, dictionary_data_block)
4402 }
4403 64 => {
4404 todo!("A follow up PR to support dictionary encoding with dictionary type `VariableWidth DataBlock` with bits_per_offset 64");
4405 }
4406 _ => {
4407 unreachable!()
4408 }
4409 }
4410 }
4411 _ => {
4412 unreachable!("dictionary encode called with data block {:?}", data_block)
4413 }
4414 }
4415 }
4416
4417 fn do_flush(
4419 &mut self,
4420 arrays: Vec<ArrayRef>,
4421 repdefs: Vec<RepDefBuilder>,
4422 row_number: u64,
4423 num_rows: u64,
4424 ) -> Result<Vec<EncodeTask>> {
4425 let column_idx = self.column_index;
4426 let compression_strategy = self.compression_strategy.clone();
4427 let field = self.field.clone();
4428 let encoding_metadata = self.encoding_metadata.clone();
4429 let task = spawn_cpu(move || {
4430 let num_values = arrays.iter().map(|arr| arr.len() as u64).sum();
4431 if num_values == 0 {
4432 return Self::encode_complex_all_null(column_idx, repdefs, row_number, num_rows);
4436 }
4437 let num_nulls = arrays
4438 .iter()
4439 .map(|arr| arr.logical_nulls().map(|n| n.null_count()).unwrap_or(0) as u64)
4440 .sum::<u64>();
4441
4442 if num_values == num_nulls {
4443 if repdefs.iter().all(|rd| rd.is_simple_validity()) {
4444 log::debug!(
4445 "Encoding column {} with {} items using simple-null layout",
4446 column_idx,
4447 num_values
4448 );
4449 Self::encode_simple_all_null(column_idx, num_values, row_number)
4451 } else {
4452 Self::encode_complex_all_null(column_idx, repdefs, row_number, num_rows)
4455 }
4456 } else {
4457 let data_block = DataBlock::from_arrays(&arrays, num_values);
4458
4459 if let DataBlock::Struct(ref struct_data_block) = data_block {
4461 if struct_data_block
4462 .children
4463 .iter()
4464 .any(|child| !matches!(child, DataBlock::FixedWidth(_)))
4465 {
4466 panic!("packed struct encoding currently only supports fixed-width fields.")
4467 }
4468 }
4469
4470 let dictionary_encoding_threshold: u64 = 100.max(data_block.num_values() / 4);
4471 let cardinality =
4472 if let Some(cardinality_array) = data_block.get_stat(Stat::Cardinality) {
4473 cardinality_array.as_primitive::<UInt64Type>().value(0)
4474 } else {
4475 u64::MAX
4476 };
4477
4478 if cardinality <= dictionary_encoding_threshold
4480 && data_block.num_values() >= 10 * cardinality
4481 {
4482 let (indices_data_block, dictionary_data_block) =
4483 Self::dictionary_encode(data_block, cardinality);
4484 Self::encode_miniblock(
4485 column_idx,
4486 &field,
4487 compression_strategy.as_ref(),
4488 indices_data_block,
4489 repdefs,
4490 row_number,
4491 Some(dictionary_data_block),
4492 num_rows,
4493 )
4494 } else if Self::prefers_miniblock(&data_block, encoding_metadata.as_ref()) {
4495 log::debug!(
4496 "Encoding column {} with {} items using mini-block layout",
4497 column_idx,
4498 num_values
4499 );
4500 Self::encode_miniblock(
4501 column_idx,
4502 &field,
4503 compression_strategy.as_ref(),
4504 data_block,
4505 repdefs,
4506 row_number,
4507 None,
4508 num_rows,
4509 )
4510 } else if Self::prefers_fullzip(encoding_metadata.as_ref()) {
4511 log::debug!(
4512 "Encoding column {} with {} items using full-zip layout",
4513 column_idx,
4514 num_values
4515 );
4516 Self::encode_full_zip(
4517 column_idx,
4518 &field,
4519 compression_strategy.as_ref(),
4520 data_block,
4521 repdefs,
4522 row_number,
4523 num_rows,
4524 )
4525 } else {
4526 Err(Error::InvalidInput { source: format!("Cannot determine structural encoding for field {}. This typically indicates an invalid value of the field metadata key {}", field.name, STRUCTURAL_ENCODING_META_KEY).into(), location: location!() })
4527 }
4528 }
4529 })
4530 .boxed();
4531 Ok(vec![task])
4532 }
4533
4534 fn extract_validity_buf(array: &dyn Array, repdef: &mut RepDefBuilder) {
4535 if let Some(validity) = array.nulls() {
4536 repdef.add_validity_bitmap(validity.clone());
4537 } else {
4538 repdef.add_no_null(array.len());
4539 }
4540 }
4541
4542 fn extract_validity(array: &dyn Array, repdef: &mut RepDefBuilder) {
4543 match array.data_type() {
4544 DataType::Null => {
4545 repdef.add_validity_bitmap(NullBuffer::new(BooleanBuffer::new_unset(array.len())));
4546 }
4547 DataType::Dictionary(_, _) => {
4548 unreachable!()
4549 }
4550 _ => Self::extract_validity_buf(array, repdef),
4559 }
4560 }
4561}
4562
4563impl FieldEncoder for PrimitiveStructuralEncoder {
4564 fn maybe_encode(
4566 &mut self,
4567 array: ArrayRef,
4568 _external_buffers: &mut OutOfLineBuffers,
4569 mut repdef: RepDefBuilder,
4570 row_number: u64,
4571 num_rows: u64,
4572 ) -> Result<Vec<EncodeTask>> {
4573 Self::extract_validity(array.as_ref(), &mut repdef);
4574 self.accumulated_repdefs.push(repdef);
4575
4576 if let Some((arrays, row_number, num_rows)) =
4577 self.accumulation_queue.insert(array, row_number, num_rows)
4578 {
4579 let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
4580 Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
4581 } else {
4582 Ok(vec![])
4583 }
4584 }
4585
4586 fn flush(&mut self, _external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
4588 if let Some((arrays, row_number, num_rows)) = self.accumulation_queue.flush() {
4589 let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
4590 Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
4591 } else {
4592 Ok(vec![])
4593 }
4594 }
4595
4596 fn num_columns(&self) -> u32 {
4597 1
4598 }
4599
4600 fn finish(
4601 &mut self,
4602 _external_buffers: &mut OutOfLineBuffers,
4603 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
4604 std::future::ready(Ok(vec![EncodedColumn::default()])).boxed()
4605 }
4606}
4607
4608#[cfg(test)]
4609#[allow(clippy::single_range_in_vec_init)]
4610mod tests {
4611 use std::{collections::VecDeque, sync::Arc};
4612
4613 use arrow_array::{ArrayRef, Int8Array, StringArray};
4614
4615 use crate::encodings::logical::primitive::{
4616 ChunkDrainInstructions, PrimitiveStructuralEncoder,
4617 };
4618
4619 use super::{
4620 ChunkInstructions, DataBlock, DecodeMiniBlockTask, PreambleAction, RepetitionIndex,
4621 };
4622
4623 #[test]
4624 fn test_is_narrow() {
4625 let int8_array = Int8Array::from(vec![1, 2, 3]);
4626 let array_ref: ArrayRef = Arc::new(int8_array);
4627 let block = DataBlock::from_array(array_ref);
4628
4629 assert!(PrimitiveStructuralEncoder::is_narrow(&block));
4630
4631 let string_array = StringArray::from(vec![Some("hello"), Some("world")]);
4632 let block = DataBlock::from_array(string_array);
4633 assert!(PrimitiveStructuralEncoder::is_narrow(&block));
4634
4635 let string_array = StringArray::from(vec![
4636 Some("hello world".repeat(100)),
4637 Some("world".to_string()),
4638 ]);
4639 let block = DataBlock::from_array(string_array);
4640 assert!((!PrimitiveStructuralEncoder::is_narrow(&block)));
4641 }
4642
4643 #[test]
4644 fn test_map_range() {
4645 let rep = Some(vec![1, 0, 0, 1, 0, 1, 1, 0, 0]);
4648 let def = Some(vec![0, 0, 0, 0, 0, 1, 0, 0, 0]);
4649 let max_visible_def = 0;
4650 let total_items = 8;
4651 let max_rep = 1;
4652
4653 let check = |range, expected_item_range, expected_level_range| {
4654 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4655 range,
4656 rep.as_ref(),
4657 def.as_ref(),
4658 max_rep,
4659 max_visible_def,
4660 total_items,
4661 PreambleAction::Absent,
4662 );
4663 assert_eq!(item_range, expected_item_range);
4664 assert_eq!(level_range, expected_level_range);
4665 };
4666
4667 check(0..1, 0..3, 0..3);
4668 check(1..2, 3..5, 3..5);
4669 check(2..3, 5..5, 5..6);
4670 check(3..4, 5..8, 6..9);
4671 check(0..2, 0..5, 0..5);
4672 check(1..3, 3..5, 3..6);
4673 check(2..4, 5..8, 5..9);
4674 check(0..3, 0..5, 0..6);
4675 check(1..4, 3..8, 3..9);
4676 check(0..4, 0..8, 0..9);
4677
4678 let rep = Some(vec![1, 1, 0, 1]);
4681 let def = Some(vec![1, 0, 0, 0]);
4682 let max_visible_def = 0;
4683 let total_items = 3;
4684
4685 let check = |range, expected_item_range, expected_level_range| {
4686 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4687 range,
4688 rep.as_ref(),
4689 def.as_ref(),
4690 max_rep,
4691 max_visible_def,
4692 total_items,
4693 PreambleAction::Absent,
4694 );
4695 assert_eq!(item_range, expected_item_range);
4696 assert_eq!(level_range, expected_level_range);
4697 };
4698
4699 check(0..1, 0..0, 0..1);
4700 check(1..2, 0..2, 1..3);
4701 check(2..3, 2..3, 3..4);
4702 check(0..2, 0..2, 0..3);
4703 check(1..3, 0..3, 1..4);
4704 check(0..3, 0..3, 0..4);
4705
4706 let rep = Some(vec![1, 1, 0, 1]);
4709 let def = Some(vec![0, 0, 0, 1]);
4710 let max_visible_def = 0;
4711 let total_items = 3;
4712
4713 let check = |range, expected_item_range, expected_level_range| {
4714 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4715 range,
4716 rep.as_ref(),
4717 def.as_ref(),
4718 max_rep,
4719 max_visible_def,
4720 total_items,
4721 PreambleAction::Absent,
4722 );
4723 assert_eq!(item_range, expected_item_range);
4724 assert_eq!(level_range, expected_level_range);
4725 };
4726
4727 check(0..1, 0..1, 0..1);
4728 check(1..2, 1..3, 1..3);
4729 check(2..3, 3..3, 3..4);
4730 check(0..2, 0..3, 0..3);
4731 check(1..3, 1..3, 1..4);
4732 check(0..3, 0..3, 0..4);
4733
4734 let rep = Some(vec![1, 0, 1, 0, 1, 0]);
4737 let def: Option<&[u16]> = None;
4738 let max_visible_def = 0;
4739 let total_items = 6;
4740
4741 let check = |range, expected_item_range, expected_level_range| {
4742 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4743 range,
4744 rep.as_ref(),
4745 def.as_ref(),
4746 max_rep,
4747 max_visible_def,
4748 total_items,
4749 PreambleAction::Absent,
4750 );
4751 assert_eq!(item_range, expected_item_range);
4752 assert_eq!(level_range, expected_level_range);
4753 };
4754
4755 check(0..1, 0..2, 0..2);
4756 check(1..2, 2..4, 2..4);
4757 check(2..3, 4..6, 4..6);
4758 check(0..2, 0..4, 0..4);
4759 check(1..3, 2..6, 2..6);
4760 check(0..3, 0..6, 0..6);
4761
4762 let rep: Option<&[u16]> = None;
4765 let def = Some(vec![0, 0, 1, 0]);
4766 let max_visible_def = 1;
4767 let total_items = 4;
4768
4769 let check = |range, expected_item_range, expected_level_range| {
4770 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4771 range,
4772 rep.as_ref(),
4773 def.as_ref(),
4774 max_rep,
4775 max_visible_def,
4776 total_items,
4777 PreambleAction::Absent,
4778 );
4779 assert_eq!(item_range, expected_item_range);
4780 assert_eq!(level_range, expected_level_range);
4781 };
4782
4783 check(0..1, 0..1, 0..1);
4784 check(1..2, 1..2, 1..2);
4785 check(2..3, 2..3, 2..3);
4786 check(0..2, 0..2, 0..2);
4787 check(1..3, 1..3, 1..3);
4788 check(0..3, 0..3, 0..3);
4789
4790 let rep = Some(vec![0, 1, 0, 1]);
4795 let def = Some(vec![0, 0, 0, 1]);
4796 let max_visible_def = 0;
4797 let total_items = 3;
4798
4799 let check = |range, expected_item_range, expected_level_range| {
4800 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4801 range,
4802 rep.as_ref(),
4803 def.as_ref(),
4804 max_rep,
4805 max_visible_def,
4806 total_items,
4807 PreambleAction::Take,
4808 );
4809 assert_eq!(item_range, expected_item_range);
4810 assert_eq!(level_range, expected_level_range);
4811 };
4812
4813 check(0..1, 0..3, 0..3);
4815 check(0..2, 0..3, 0..4);
4816
4817 let check = |range, expected_item_range, expected_level_range| {
4818 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4819 range,
4820 rep.as_ref(),
4821 def.as_ref(),
4822 max_rep,
4823 max_visible_def,
4824 total_items,
4825 PreambleAction::Skip,
4826 );
4827 assert_eq!(item_range, expected_item_range);
4828 assert_eq!(level_range, expected_level_range);
4829 };
4830
4831 check(0..1, 1..3, 1..3);
4832 check(1..2, 3..3, 3..4);
4833 check(0..2, 1..3, 1..4);
4834
4835 let rep = Some(vec![0, 1, 1, 0]);
4840 let def = Some(vec![0, 1, 0, 0]);
4841 let max_visible_def = 0;
4842 let total_items = 4;
4843
4844 let check = |range, expected_item_range, expected_level_range| {
4845 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4846 range,
4847 rep.as_ref(),
4848 def.as_ref(),
4849 max_rep,
4850 max_visible_def,
4851 total_items,
4852 PreambleAction::Take,
4853 );
4854 assert_eq!(item_range, expected_item_range);
4855 assert_eq!(level_range, expected_level_range);
4856 };
4857
4858 check(0..1, 0..1, 0..2);
4860 check(0..2, 0..3, 0..4);
4861
4862 let check = |range, expected_item_range, expected_level_range| {
4863 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4864 range,
4865 rep.as_ref(),
4866 def.as_ref(),
4867 max_rep,
4868 max_visible_def,
4869 total_items,
4870 PreambleAction::Skip,
4871 );
4872 assert_eq!(item_range, expected_item_range);
4873 assert_eq!(level_range, expected_level_range);
4874 };
4875
4876 check(0..1, 1..1, 1..2);
4878 check(1..2, 1..3, 2..4);
4879 check(0..2, 1..3, 1..4);
4880
4881 let rep = Some(vec![0, 1, 0, 1]);
4884 let def: Option<Vec<u16>> = None;
4885 let max_visible_def = 0;
4886 let total_items = 4;
4887
4888 let check = |range, expected_item_range, expected_level_range| {
4889 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4890 range,
4891 rep.as_ref(),
4892 def.as_ref(),
4893 max_rep,
4894 max_visible_def,
4895 total_items,
4896 PreambleAction::Take,
4897 );
4898 assert_eq!(item_range, expected_item_range);
4899 assert_eq!(level_range, expected_level_range);
4900 };
4901
4902 check(0..1, 0..3, 0..3);
4904 check(0..2, 0..4, 0..4);
4905
4906 let check = |range, expected_item_range, expected_level_range| {
4907 let (item_range, level_range) = DecodeMiniBlockTask::map_range(
4908 range,
4909 rep.as_ref(),
4910 def.as_ref(),
4911 max_rep,
4912 max_visible_def,
4913 total_items,
4914 PreambleAction::Skip,
4915 );
4916 assert_eq!(item_range, expected_item_range);
4917 assert_eq!(level_range, expected_level_range);
4918 };
4919
4920 check(0..1, 1..3, 1..3);
4921 check(1..2, 3..4, 3..4);
4922 check(0..2, 1..4, 1..4);
4923 }
4924
4925 #[test]
4926 fn test_schedule_instructions() {
4927 let repetition_index = vec![vec![5, 2], vec![3, 0], vec![4, 7], vec![2, 0]];
4928 let repetition_index = RepetitionIndex::decode(&repetition_index);
4929
4930 let check = |user_ranges, expected_instructions| {
4931 let instructions =
4932 ChunkInstructions::schedule_instructions(&repetition_index, user_ranges);
4933 assert_eq!(instructions, expected_instructions);
4934 };
4935
4936 let expected_take_all = vec![
4938 ChunkInstructions {
4939 chunk_idx: 0,
4940 preamble: PreambleAction::Absent,
4941 rows_to_skip: 0,
4942 rows_to_take: 5,
4943 take_trailer: true,
4944 },
4945 ChunkInstructions {
4946 chunk_idx: 1,
4947 preamble: PreambleAction::Take,
4948 rows_to_skip: 0,
4949 rows_to_take: 2,
4950 take_trailer: false,
4951 },
4952 ChunkInstructions {
4953 chunk_idx: 2,
4954 preamble: PreambleAction::Absent,
4955 rows_to_skip: 0,
4956 rows_to_take: 4,
4957 take_trailer: true,
4958 },
4959 ChunkInstructions {
4960 chunk_idx: 3,
4961 preamble: PreambleAction::Take,
4962 rows_to_skip: 0,
4963 rows_to_take: 1,
4964 take_trailer: false,
4965 },
4966 ];
4967
4968 check(&[0..14], expected_take_all.clone());
4970
4971 check(
4973 &[
4974 0..1,
4975 1..2,
4976 2..3,
4977 3..4,
4978 4..5,
4979 5..6,
4980 6..7,
4981 7..8,
4982 8..9,
4983 9..10,
4984 10..11,
4985 11..12,
4986 12..13,
4987 13..14,
4988 ],
4989 expected_take_all,
4990 );
4991
4992 check(
4996 &[0..1, 3..4],
4997 vec![
4998 ChunkInstructions {
4999 chunk_idx: 0,
5000 preamble: PreambleAction::Absent,
5001 rows_to_skip: 0,
5002 rows_to_take: 1,
5003 take_trailer: false,
5004 },
5005 ChunkInstructions {
5006 chunk_idx: 0,
5007 preamble: PreambleAction::Absent,
5008 rows_to_skip: 3,
5009 rows_to_take: 1,
5010 take_trailer: false,
5011 },
5012 ],
5013 );
5014
5015 check(
5017 &[5..6],
5018 vec![
5019 ChunkInstructions {
5020 chunk_idx: 0,
5021 preamble: PreambleAction::Absent,
5022 rows_to_skip: 5,
5023 rows_to_take: 0,
5024 take_trailer: true,
5025 },
5026 ChunkInstructions {
5027 chunk_idx: 1,
5028 preamble: PreambleAction::Take,
5029 rows_to_skip: 0,
5030 rows_to_take: 0,
5031 take_trailer: false,
5032 },
5033 ],
5034 );
5035
5036 check(
5038 &[7..10],
5039 vec![
5040 ChunkInstructions {
5041 chunk_idx: 1,
5042 preamble: PreambleAction::Skip,
5043 rows_to_skip: 1,
5044 rows_to_take: 1,
5045 take_trailer: false,
5046 },
5047 ChunkInstructions {
5048 chunk_idx: 2,
5049 preamble: PreambleAction::Absent,
5050 rows_to_skip: 0,
5051 rows_to_take: 2,
5052 take_trailer: false,
5053 },
5054 ],
5055 );
5056 }
5057
5058 #[test]
5059 fn test_drain_instructions() {
5060 fn drain_from_instructions(
5061 instructions: &mut VecDeque<ChunkInstructions>,
5062 mut rows_desired: u64,
5063 need_preamble: &mut bool,
5064 skip_in_chunk: &mut u64,
5065 ) -> Vec<ChunkDrainInstructions> {
5066 let mut drain_instructions = Vec::with_capacity(instructions.len());
5068 while rows_desired > 0 || *need_preamble {
5069 let (next_instructions, consumed_chunk) = instructions
5070 .front()
5071 .unwrap()
5072 .drain_from_instruction(&mut rows_desired, need_preamble, skip_in_chunk);
5073 if consumed_chunk {
5074 instructions.pop_front();
5075 }
5076 drain_instructions.push(next_instructions);
5077 }
5078 drain_instructions
5079 }
5080
5081 let repetition_index = vec![vec![5, 2], vec![3, 0], vec![4, 7], vec![2, 0]];
5082 let repetition_index = RepetitionIndex::decode(&repetition_index);
5083 let user_ranges = vec![1..7, 10..14];
5084
5085 let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
5087
5088 let mut to_drain = VecDeque::from(scheduled.clone());
5089
5090 let mut need_preamble = false;
5093 let mut skip_in_chunk = 0;
5094
5095 let next_batch =
5096 drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
5097
5098 assert!(!need_preamble);
5099 assert_eq!(skip_in_chunk, 4);
5100 assert_eq!(
5101 next_batch,
5102 vec![ChunkDrainInstructions {
5103 chunk_instructions: scheduled[0].clone(),
5104 rows_to_take: 4,
5105 rows_to_skip: 0,
5106 preamble_action: PreambleAction::Absent,
5107 }]
5108 );
5109
5110 let next_batch =
5111 drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
5112
5113 assert!(!need_preamble);
5114 assert_eq!(skip_in_chunk, 2);
5115
5116 assert_eq!(
5117 next_batch,
5118 vec![
5119 ChunkDrainInstructions {
5120 chunk_instructions: scheduled[0].clone(),
5121 rows_to_take: 1,
5122 rows_to_skip: 4,
5123 preamble_action: PreambleAction::Absent,
5124 },
5125 ChunkDrainInstructions {
5126 chunk_instructions: scheduled[1].clone(),
5127 rows_to_take: 1,
5128 rows_to_skip: 0,
5129 preamble_action: PreambleAction::Take,
5130 },
5131 ChunkDrainInstructions {
5132 chunk_instructions: scheduled[2].clone(),
5133 rows_to_take: 2,
5134 rows_to_skip: 0,
5135 preamble_action: PreambleAction::Absent,
5136 }
5137 ]
5138 );
5139
5140 let next_batch =
5141 drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
5142
5143 assert!(!need_preamble);
5144 assert_eq!(skip_in_chunk, 0);
5145
5146 assert_eq!(
5147 next_batch,
5148 vec![
5149 ChunkDrainInstructions {
5150 chunk_instructions: scheduled[2].clone(),
5151 rows_to_take: 1,
5152 rows_to_skip: 2,
5153 preamble_action: PreambleAction::Absent,
5154 },
5155 ChunkDrainInstructions {
5156 chunk_instructions: scheduled[3].clone(),
5157 rows_to_take: 1,
5158 rows_to_skip: 0,
5159 preamble_action: PreambleAction::Take,
5160 },
5161 ]
5162 );
5163
5164 let repetition_index = vec![vec![5, 2], vec![3, 3], vec![20, 0]];
5166 let repetition_index = RepetitionIndex::decode(&repetition_index);
5167 let user_ranges = vec![0..28];
5168
5169 let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
5171
5172 let mut to_drain = VecDeque::from(scheduled.clone());
5173
5174 let mut need_preamble = false;
5177 let mut skip_in_chunk = 0;
5178
5179 let next_batch =
5180 drain_from_instructions(&mut to_drain, 7, &mut need_preamble, &mut skip_in_chunk);
5181
5182 assert_eq!(
5183 next_batch,
5184 vec![
5185 ChunkDrainInstructions {
5186 chunk_instructions: scheduled[0].clone(),
5187 rows_to_take: 6,
5188 rows_to_skip: 0,
5189 preamble_action: PreambleAction::Absent,
5190 },
5191 ChunkDrainInstructions {
5192 chunk_instructions: scheduled[1].clone(),
5193 rows_to_take: 1,
5194 rows_to_skip: 0,
5195 preamble_action: PreambleAction::Take,
5196 },
5197 ]
5198 );
5199
5200 assert!(!need_preamble);
5201 assert_eq!(skip_in_chunk, 1);
5202
5203 let next_batch =
5206 drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
5207
5208 assert_eq!(
5209 next_batch,
5210 vec![
5211 ChunkDrainInstructions {
5212 chunk_instructions: scheduled[1].clone(),
5213 rows_to_take: 2,
5214 rows_to_skip: 1,
5215 preamble_action: PreambleAction::Skip,
5216 },
5217 ChunkDrainInstructions {
5218 chunk_instructions: scheduled[2].clone(),
5219 rows_to_take: 0,
5220 rows_to_skip: 0,
5221 preamble_action: PreambleAction::Take,
5222 },
5223 ]
5224 );
5225
5226 assert!(!need_preamble);
5227 assert_eq!(skip_in_chunk, 0);
5228 }
5229}