use std::{
collections::{HashMap, VecDeque},
fmt::Debug,
iter,
ops::Range,
sync::Arc,
vec,
};
use arrow::array::AsArray;
use arrow_array::{make_array, types::UInt64Type, Array, ArrayRef, PrimitiveArray};
use arrow_buffer::{bit_util, BooleanBuffer, NullBuffer, ScalarBuffer};
use arrow_schema::{DataType, Field as ArrowField};
use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, TryStreamExt};
use itertools::Itertools;
use lance_arrow::deepcopy::deep_copy_array;
use lance_core::utils::bit::pad_bytes;
use lance_core::utils::hash::U8SliceKey;
use log::{debug, trace};
use snafu::{location, Location};
use crate::data::{AllNullDataBlock, DataBlock, VariableWidthBlock};
use crate::decoder::PerValueDecompressor;
use crate::encoder::PerValueDataBlock;
use crate::repdef::{
build_control_word_iterator, CompositeRepDefUnraveler, ControlWordIterator, ControlWordParser,
DefinitionInterpretation, RepDefSlicer,
};
use crate::statistics::{ComputeStat, GetStat, Stat};
use lance_core::{datatypes::Field, utils::tokio::spawn_cpu, Result};
use crate::{
buffer::LanceBuffer,
data::{BlockInfo, DataBlockBuilder, FixedWidthDataBlock},
decoder::{
BlockDecompressor, ColumnInfo, DecodeArrayTask, DecodePageTask, DecodedArray, DecodedPage,
DecompressorStrategy, FieldScheduler, FilterExpression, LoadedPage, LogicalPageDecoder,
MessageType, MiniBlockDecompressor, NextDecodeTask, PageEncoding, PageInfo, PageScheduler,
PrimitivePageDecoder, PriorityRange, ScheduledScanLine, SchedulerContext, SchedulingJob,
StructuralDecodeArrayTask, StructuralFieldDecoder, StructuralFieldScheduler,
StructuralPageDecoder, StructuralSchedulingJob, UnloadedPage,
},
encoder::{
ArrayEncodingStrategy, CompressionStrategy, EncodeTask, EncodedColumn, EncodedPage,
EncodingOptions, FieldEncoder, MiniBlockChunk, MiniBlockCompressed, OutOfLineBuffers,
},
encodings::physical::{decoder_from_array_encoding, ColumnBuffers, PageBuffers},
format::{pb, ProtobufUtils},
repdef::{LevelBuffer, RepDefBuilder, RepDefUnraveler},
EncodingsIo,
};
#[derive(Debug)]
struct PrimitivePage {
scheduler: Box<dyn PageScheduler>,
num_rows: u64,
page_index: u32,
}
#[derive(Debug)]
pub struct PrimitiveFieldScheduler {
data_type: DataType,
page_schedulers: Vec<PrimitivePage>,
num_rows: u64,
should_validate: bool,
column_index: u32,
}
impl PrimitiveFieldScheduler {
pub fn new(
column_index: u32,
data_type: DataType,
pages: Arc<[PageInfo]>,
buffers: ColumnBuffers,
should_validate: bool,
) -> Self {
let page_schedulers = pages
.iter()
.enumerate()
.filter(|(page_index, page)| {
log::trace!("Skipping empty page with index {}", page_index);
page.num_rows > 0
})
.map(|(page_index, page)| {
let page_buffers = PageBuffers {
column_buffers: buffers,
positions_and_sizes: &page.buffer_offsets_and_sizes,
};
let scheduler = decoder_from_array_encoding(
page.encoding.as_legacy(),
&page_buffers,
&data_type,
);
PrimitivePage {
scheduler,
num_rows: page.num_rows,
page_index: page_index as u32,
}
})
.collect::<Vec<_>>();
let num_rows = page_schedulers.iter().map(|p| p.num_rows).sum();
Self {
data_type,
page_schedulers,
num_rows,
should_validate,
column_index,
}
}
}
#[derive(Debug)]
struct PrimitiveFieldSchedulingJob<'a> {
scheduler: &'a PrimitiveFieldScheduler,
ranges: Vec<Range<u64>>,
page_idx: usize,
range_idx: usize,
range_offset: u64,
global_row_offset: u64,
}
impl<'a> PrimitiveFieldSchedulingJob<'a> {
pub fn new(scheduler: &'a PrimitiveFieldScheduler, ranges: Vec<Range<u64>>) -> Self {
Self {
scheduler,
ranges,
page_idx: 0,
range_idx: 0,
range_offset: 0,
global_row_offset: 0,
}
}
}
impl SchedulingJob for PrimitiveFieldSchedulingJob<'_> {
fn schedule_next(
&mut self,
context: &mut SchedulerContext,
priority: &dyn PriorityRange,
) -> Result<ScheduledScanLine> {
debug_assert!(self.range_idx < self.ranges.len());
let mut range = self.ranges[self.range_idx].clone();
range.start += self.range_offset;
let mut cur_page = &self.scheduler.page_schedulers[self.page_idx];
trace!(
"Current range is {:?} and current page has {} rows",
range,
cur_page.num_rows
);
while cur_page.num_rows + self.global_row_offset <= range.start {
self.global_row_offset += cur_page.num_rows;
self.page_idx += 1;
trace!("Skipping entire page of {} rows", cur_page.num_rows);
cur_page = &self.scheduler.page_schedulers[self.page_idx];
}
let mut ranges_in_page = Vec::new();
while cur_page.num_rows + self.global_row_offset > range.start {
range.start = range.start.max(self.global_row_offset);
let start_in_page = range.start - self.global_row_offset;
let end_in_page = start_in_page + (range.end - range.start);
let end_in_page = end_in_page.min(cur_page.num_rows);
let last_in_range = (end_in_page + self.global_row_offset) >= range.end;
ranges_in_page.push(start_in_page..end_in_page);
if last_in_range {
self.range_idx += 1;
if self.range_idx == self.ranges.len() {
break;
}
range = self.ranges[self.range_idx].clone();
} else {
break;
}
}
let num_rows_in_next = ranges_in_page.iter().map(|r| r.end - r.start).sum();
trace!(
"Scheduling {} rows across {} ranges from page with {} rows (priority={}, column_index={}, page_index={})",
num_rows_in_next,
ranges_in_page.len(),
cur_page.num_rows,
priority.current_priority(),
self.scheduler.column_index,
cur_page.page_index,
);
self.global_row_offset += cur_page.num_rows;
self.page_idx += 1;
let physical_decoder = cur_page.scheduler.schedule_ranges(
&ranges_in_page,
context.io(),
priority.current_priority(),
);
let logical_decoder = PrimitiveFieldDecoder {
data_type: self.scheduler.data_type.clone(),
column_index: self.scheduler.column_index,
unloaded_physical_decoder: Some(physical_decoder),
physical_decoder: None,
rows_drained: 0,
num_rows: num_rows_in_next,
should_validate: self.scheduler.should_validate,
page_index: cur_page.page_index,
};
let decoder = Box::new(logical_decoder);
let decoder_ready = context.locate_decoder(decoder);
Ok(ScheduledScanLine {
decoders: vec![MessageType::DecoderReady(decoder_ready)],
rows_scheduled: num_rows_in_next,
})
}
fn num_rows(&self) -> u64 {
self.ranges.iter().map(|r| r.end - r.start).sum()
}
}
impl FieldScheduler for PrimitiveFieldScheduler {
fn num_rows(&self) -> u64 {
self.num_rows
}
fn schedule_ranges<'a>(
&'a self,
ranges: &[std::ops::Range<u64>],
_filter: &FilterExpression,
) -> Result<Box<dyn SchedulingJob + 'a>> {
Ok(Box::new(PrimitiveFieldSchedulingJob::new(
self,
ranges.to_vec(),
)))
}
fn initialize<'a>(
&'a self,
_filter: &'a FilterExpression,
_context: &'a SchedulerContext,
) -> BoxFuture<'a, Result<()>> {
std::future::ready(Ok(())).boxed()
}
}
trait StructuralPageScheduler: std::fmt::Debug + Send {
fn initialize<'a>(&'a mut self, io: &Arc<dyn EncodingsIo>) -> BoxFuture<'a, Result<()>>;
fn schedule_ranges(
&self,
ranges: &[Range<u64>],
io: &dyn EncodingsIo,
) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>>;
}
#[derive(Debug)]
struct ChunkMeta {
num_values: u64,
chunk_size_bytes: u64,
offset_bytes: u64,
}
#[derive(Debug)]
struct DecodedMiniBlockChunk {
rep: Option<ScalarBuffer<u16>>,
def: Option<ScalarBuffer<u16>>,
values: DataBlock,
}
#[derive(Debug)]
struct DecodeMiniBlockTask {
rep_decompressor: Arc<dyn BlockDecompressor>,
def_decompressor: Arc<dyn BlockDecompressor>,
value_decompressor: Arc<dyn MiniBlockDecompressor>,
dictionary_data: Option<Arc<DataBlock>>,
def_meaning: Arc<[DefinitionInterpretation]>,
max_visible_level: u16,
instructions: Vec<(ChunkDrainInstructions, LoadedChunk)>,
}
impl DecodeMiniBlockTask {
fn decode_levels(
rep_decompressor: &dyn BlockDecompressor,
levels: LanceBuffer,
) -> Result<Option<ScalarBuffer<u16>>> {
let rep = rep_decompressor.decompress(levels)?;
match rep {
DataBlock::FixedWidth(mut rep) => Ok(Some(rep.data.borrow_to_typed_slice::<u16>())),
DataBlock::Constant(constant) => {
assert_eq!(constant.data.len(), 2);
if constant.data[0] == 0 && constant.data[1] == 0 {
Ok(None)
} else {
todo!()
}
}
_ => unreachable!(),
}
}
fn extend_levels(
range: Range<u64>,
levels: &mut Option<LevelBuffer>,
level_buf: &Option<impl AsRef<[u16]>>,
dest_offset: usize,
) {
if let Some(level_buf) = level_buf {
if levels.is_none() {
let mut new_levels_vec =
LevelBuffer::with_capacity(dest_offset + (range.end - range.start) as usize);
new_levels_vec.extend(iter::repeat(0).take(dest_offset));
*levels = Some(new_levels_vec);
}
levels.as_mut().unwrap().extend(
level_buf.as_ref()[range.start as usize..range.end as usize]
.iter()
.copied(),
);
} else if let Some(levels) = levels {
let num_values = (range.end - range.start) as usize;
levels.extend(iter::repeat(0).take(num_values));
}
}
fn map_range(
range: Range<u64>,
rep: Option<&impl AsRef<[u16]>>,
def: Option<&impl AsRef<[u16]>>,
max_rep: u16,
max_visible_def: u16,
total_items: u64,
preamble_action: PreambleAction,
) -> (Range<u64>, Range<u64>) {
if let Some(rep) = rep {
let mut rep = rep.as_ref();
let mut items_in_preamble = 0;
let first_row_start = match preamble_action {
PreambleAction::Skip | PreambleAction::Take => {
let first_row_start = if let Some(def) = def.as_ref() {
let mut first_row_start = None;
for (idx, (rep, def)) in rep.iter().zip(def.as_ref()).enumerate() {
if *rep == max_rep {
first_row_start = Some(idx);
break;
}
if *def <= max_visible_def {
items_in_preamble += 1;
}
}
first_row_start
} else {
let first_row_start = rep.iter().position(|&r| r == max_rep);
items_in_preamble = first_row_start.unwrap_or(rep.len());
first_row_start
};
if first_row_start.is_none() {
assert!(preamble_action == PreambleAction::Take);
return (0..total_items, 0..rep.len() as u64);
}
let first_row_start = first_row_start.unwrap() as u64;
rep = &rep[first_row_start as usize..];
first_row_start
}
PreambleAction::Absent => {
debug_assert!(rep[0] == max_rep);
0
}
};
if range.start == range.end {
debug_assert!(preamble_action == PreambleAction::Take);
return (0..items_in_preamble as u64, 0..first_row_start);
}
assert!(range.start < range.end);
let mut rows_seen = 0;
let mut new_start = 0;
let mut new_levels_start = 0;
if let Some(def) = def {
let def = &def.as_ref()[first_row_start as usize..];
let mut lead_invis_seen = 0;
if range.start > 0 {
if def[0] > max_visible_def {
lead_invis_seen += 1;
}
for (idx, (rep, def)) in rep.iter().zip(def).skip(1).enumerate() {
if *rep == max_rep {
rows_seen += 1;
if rows_seen == range.start {
new_start = idx as u64 + 1 - lead_invis_seen;
new_levels_start = idx as u64 + 1;
break;
}
if *def > max_visible_def {
lead_invis_seen += 1;
}
}
}
}
rows_seen += 1;
let mut new_end = u64::MAX;
let mut new_levels_end = rep.len() as u64;
let new_start_is_visible = def[new_levels_start as usize] <= max_visible_def;
let mut tail_invis_seen = if new_start_is_visible { 0 } else { 1 };
for (idx, (rep, def)) in rep[(new_levels_start + 1) as usize..]
.iter()
.zip(&def[(new_levels_start + 1) as usize..])
.enumerate()
{
if *rep == max_rep {
rows_seen += 1;
if rows_seen == range.end + 1 {
new_end = idx as u64 + new_start + 1 - tail_invis_seen;
new_levels_end = idx as u64 + new_levels_start + 1;
break;
}
if *def > max_visible_def {
tail_invis_seen += 1;
}
}
}
if new_end == u64::MAX {
new_levels_end = rep.len() as u64;
let total_invis_seen = lead_invis_seen + tail_invis_seen;
new_end = rep.len() as u64 - total_invis_seen;
}
assert_ne!(new_end, u64::MAX);
if preamble_action == PreambleAction::Skip {
new_start += first_row_start;
new_end += first_row_start;
new_levels_start += first_row_start;
new_levels_end += first_row_start;
} else if preamble_action == PreambleAction::Take {
debug_assert_eq!(new_start, 0);
debug_assert_eq!(new_levels_start, 0);
new_end += first_row_start;
new_levels_end += first_row_start;
}
(new_start..new_end, new_levels_start..new_levels_end)
} else {
if range.start > 0 {
for (idx, rep) in rep.iter().skip(1).enumerate() {
if *rep == max_rep {
rows_seen += 1;
if rows_seen == range.start {
new_start = idx as u64 + 1;
break;
}
}
}
}
let mut new_end = rep.len() as u64;
if range.end < total_items {
for (idx, rep) in rep[(new_start + 1) as usize..].iter().enumerate() {
if *rep == max_rep {
rows_seen += 1;
if rows_seen == range.end {
new_end = idx as u64 + new_start + 1;
break;
}
}
}
}
if preamble_action == PreambleAction::Skip {
new_start += first_row_start;
new_end += first_row_start;
} else if preamble_action == PreambleAction::Take {
debug_assert_eq!(new_start, 0);
new_end += first_row_start;
}
(new_start..new_end, new_start..new_end)
}
} else {
(range.clone(), range)
}
}
fn decode_miniblock_chunk(
&self,
buf: &LanceBuffer,
items_in_chunk: u64,
) -> Result<DecodedMiniBlockChunk> {
let bytes_rep = u16::from_le_bytes([buf[0], buf[1]]) as usize;
let bytes_def = u16::from_le_bytes([buf[2], buf[3]]) as usize;
let bytes_val = u16::from_le_bytes([buf[4], buf[5]]) as usize;
debug_assert!(buf.len() >= bytes_rep + bytes_def + bytes_val + 6);
debug_assert!(
buf.len()
<= bytes_rep
+ bytes_def
+ bytes_val
+ 6
+ 1 + (2 * MINIBLOCK_MAX_PADDING) );
let p1 = bytes_rep % 2;
let rep = buf.slice_with_length(6, bytes_rep);
let def = buf.slice_with_length(6 + bytes_rep + p1, bytes_def);
let p2 = pad_bytes::<MINIBLOCK_ALIGNMENT>(6 + bytes_rep + p1 + bytes_def);
let values = buf.slice_with_length(6 + bytes_rep + bytes_def + p2, bytes_val);
let values = self.value_decompressor.decompress(values, items_in_chunk)?;
let rep = Self::decode_levels(self.rep_decompressor.as_ref(), rep)?;
let def = Self::decode_levels(self.def_decompressor.as_ref(), def)?;
Ok(DecodedMiniBlockChunk { rep, def, values })
}
}
impl DecodePageTask for DecodeMiniBlockTask {
fn decode(self: Box<Self>) -> Result<DecodedPage> {
let mut repbuf: Option<LevelBuffer> = None;
let mut defbuf: Option<LevelBuffer> = None;
let max_rep = self.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
let estimated_size_bytes = self
.instructions
.iter()
.map(|(_, chunk)| chunk.data.len())
.sum::<usize>()
* 2;
let mut data_builder =
DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
let mut level_offset = 0;
for (instructions, chunk) in self.instructions.iter() {
let DecodedMiniBlockChunk { rep, def, values } =
self.decode_miniblock_chunk(&chunk.data, chunk.items_in_chunk)?;
let row_range_start =
instructions.rows_to_skip + instructions.chunk_instructions.rows_to_skip;
let row_range_end = row_range_start + instructions.rows_to_take;
let (item_range, level_range) = Self::map_range(
row_range_start..row_range_end,
rep.as_ref(),
def.as_ref(),
max_rep,
self.max_visible_level,
chunk.items_in_chunk,
instructions.preamble_action,
);
Self::extend_levels(level_range.clone(), &mut repbuf, &rep, level_offset);
Self::extend_levels(level_range.clone(), &mut defbuf, &def, level_offset);
level_offset += (level_range.end - level_range.start) as usize;
data_builder.append(&values, item_range);
}
let data = data_builder.finish();
let unraveler = RepDefUnraveler::new(repbuf, defbuf, self.def_meaning.clone());
if let Some(dictionary) = &self.dictionary_data {
let estimated_size_bytes = dictionary.data_size()
* (data.num_values() + dictionary.num_values() - 1)
/ dictionary.num_values();
let mut data_builder = DataBlockBuilder::with_capacity_estimate(estimated_size_bytes);
if let DataBlock::FixedWidth(mut fixed_width_data_block) = data {
let indices = fixed_width_data_block.data.borrow_to_typed_slice::<u8>();
let indices = indices.as_ref();
indices.iter().for_each(|&idx| {
data_builder.append(dictionary, idx as u64..idx as u64 + 1);
});
let data = data_builder.finish();
return Ok(DecodedPage {
data,
repdef: unraveler,
});
}
}
Ok(DecodedPage {
data,
repdef: unraveler,
})
}
}
#[derive(Debug)]
struct LoadedChunk {
data: LanceBuffer,
items_in_chunk: u64,
byte_range: Range<u64>,
chunk_idx: usize,
}
impl Clone for LoadedChunk {
fn clone(&self) -> Self {
Self {
data: self.data.try_clone().unwrap(),
items_in_chunk: self.items_in_chunk,
byte_range: self.byte_range.clone(),
chunk_idx: self.chunk_idx,
}
}
}
#[derive(Debug)]
struct MiniBlockDecoder {
rep_decompressor: Arc<dyn BlockDecompressor>,
def_decompressor: Arc<dyn BlockDecompressor>,
value_decompressor: Arc<dyn MiniBlockDecompressor>,
def_meaning: Arc<[DefinitionInterpretation]>,
loaded_chunks: VecDeque<LoadedChunk>,
instructions: VecDeque<ChunkInstructions>,
offset_in_current_chunk: u64,
num_rows: u64,
dictionary: Option<Arc<DataBlock>>,
}
impl StructuralPageDecoder for MiniBlockDecoder {
fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
let mut rows_desired = num_rows;
let mut need_preamble = false;
let mut skip_in_chunk = self.offset_in_current_chunk;
let mut drain_instructions = Vec::new();
while rows_desired > 0 || need_preamble {
let (instructions, consumed) = self
.instructions
.front()
.unwrap()
.drain_from_instruction(&mut rows_desired, &mut need_preamble, &mut skip_in_chunk);
while self.loaded_chunks.front().unwrap().chunk_idx
!= instructions.chunk_instructions.chunk_idx
{
self.loaded_chunks.pop_front();
}
drain_instructions.push((instructions, self.loaded_chunks.front().unwrap().clone()));
if consumed {
self.instructions.pop_front();
}
}
self.offset_in_current_chunk = skip_in_chunk;
let max_visible_level = self
.def_meaning
.iter()
.take_while(|l| !l.is_list())
.map(|l| l.num_def_levels())
.sum::<u16>();
Ok(Box::new(DecodeMiniBlockTask {
instructions: drain_instructions,
def_decompressor: self.def_decompressor.clone(),
rep_decompressor: self.rep_decompressor.clone(),
value_decompressor: self.value_decompressor.clone(),
dictionary_data: self.dictionary.clone(),
def_meaning: self.def_meaning.clone(),
max_visible_level,
}))
}
fn num_rows(&self) -> u64 {
self.num_rows
}
}
#[derive(Debug)]
pub struct ComplexAllNullScheduler {
buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
def_meaning: Arc<[DefinitionInterpretation]>,
rep: Option<ScalarBuffer<u16>>,
def: Option<ScalarBuffer<u16>>,
}
impl ComplexAllNullScheduler {
pub fn new(
buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
def_meaning: Arc<[DefinitionInterpretation]>,
) -> Self {
Self {
buffer_offsets_and_sizes,
def_meaning,
rep: None,
def: None,
}
}
}
impl StructuralPageScheduler for ComplexAllNullScheduler {
fn initialize<'a>(&'a mut self, io: &Arc<dyn EncodingsIo>) -> BoxFuture<'a, Result<()>> {
let (rep_pos, rep_size) = self.buffer_offsets_and_sizes[0];
let (def_pos, def_size) = self.buffer_offsets_and_sizes[1];
let has_rep = rep_size > 0;
let has_def = def_size > 0;
let mut reads = Vec::with_capacity(2);
if has_rep {
reads.push(rep_pos..rep_pos + rep_size);
}
if has_def {
reads.push(def_pos..def_pos + def_size);
}
let data = io.submit_request(reads, 0);
async move {
let data = data.await?;
let mut data_iter = data.into_iter();
if has_rep {
let rep = data_iter.next().unwrap();
let mut rep = LanceBuffer::from_bytes(rep, 2);
let rep = rep.borrow_to_typed_slice::<u16>();
self.rep = Some(rep);
} else {
self.rep = None
};
if has_def {
let def = data_iter.next().unwrap();
let mut def = LanceBuffer::from_bytes(def, 2);
let def = def.borrow_to_typed_slice::<u16>();
self.def = Some(def);
} else {
self.def = None;
}
Ok(())
}
.boxed()
}
fn schedule_ranges(
&self,
ranges: &[Range<u64>],
_io: &dyn EncodingsIo,
) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
let ranges = VecDeque::from_iter(ranges.iter().cloned());
let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
Ok(std::future::ready(Ok(Box::new(ComplexAllNullPageDecoder {
ranges,
rep: self.rep.clone(),
def: self.def.clone(),
num_rows,
def_meaning: self.def_meaning.clone(),
}) as Box<dyn StructuralPageDecoder>))
.boxed())
}
}
#[derive(Debug)]
pub struct ComplexAllNullPageDecoder {
ranges: VecDeque<Range<u64>>,
rep: Option<ScalarBuffer<u16>>,
def: Option<ScalarBuffer<u16>>,
num_rows: u64,
def_meaning: Arc<[DefinitionInterpretation]>,
}
impl ComplexAllNullPageDecoder {
fn drain_ranges(&mut self, num_rows: u64) -> Vec<Range<u64>> {
let mut rows_desired = num_rows;
let mut ranges = Vec::with_capacity(self.ranges.len());
while rows_desired > 0 {
let front = self.ranges.front_mut().unwrap();
let avail = front.end - front.start;
if avail > rows_desired {
ranges.push(front.start..front.start + rows_desired);
front.start += rows_desired;
rows_desired = 0;
} else {
ranges.push(self.ranges.pop_front().unwrap());
rows_desired -= avail;
}
}
ranges
}
}
impl StructuralPageDecoder for ComplexAllNullPageDecoder {
fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
let drained_ranges = self.drain_ranges(num_rows);
Ok(Box::new(DecodeComplexAllNullTask {
ranges: drained_ranges,
rep: self.rep.clone(),
def: self.def.clone(),
def_meaning: self.def_meaning.clone(),
}))
}
fn num_rows(&self) -> u64 {
self.num_rows
}
}
#[derive(Debug)]
pub struct DecodeComplexAllNullTask {
ranges: Vec<Range<u64>>,
rep: Option<ScalarBuffer<u16>>,
def: Option<ScalarBuffer<u16>>,
def_meaning: Arc<[DefinitionInterpretation]>,
}
impl DecodeComplexAllNullTask {
fn decode_level(
&self,
levels: &Option<ScalarBuffer<u16>>,
num_values: u64,
) -> Option<Vec<u16>> {
levels.as_ref().map(|levels| {
let mut referenced_levels = Vec::with_capacity(num_values as usize);
for range in &self.ranges {
referenced_levels.extend(
levels[range.start as usize..range.end as usize]
.iter()
.copied(),
);
}
referenced_levels
})
}
}
impl DecodePageTask for DecodeComplexAllNullTask {
fn decode(self: Box<Self>) -> Result<DecodedPage> {
let num_values = self.ranges.iter().map(|r| r.end - r.start).sum::<u64>();
let data = DataBlock::AllNull(AllNullDataBlock { num_values });
let rep = self.decode_level(&self.rep, num_values);
let def = self.decode_level(&self.def, num_values);
let unraveler = RepDefUnraveler::new(rep, def, self.def_meaning);
Ok(DecodedPage {
data,
repdef: unraveler,
})
}
}
#[derive(Debug, Default)]
pub struct SimpleAllNullScheduler {}
impl StructuralPageScheduler for SimpleAllNullScheduler {
fn initialize<'a>(&'a mut self, _io: &Arc<dyn EncodingsIo>) -> BoxFuture<'a, Result<()>> {
std::future::ready(Ok(())).boxed()
}
fn schedule_ranges(
&self,
ranges: &[Range<u64>],
_io: &dyn EncodingsIo,
) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
Ok(std::future::ready(Ok(
Box::new(SimpleAllNullPageDecoder { num_rows }) as Box<dyn StructuralPageDecoder>
))
.boxed())
}
}
#[derive(Debug)]
struct SimpleAllNullDecodePageTask {
num_values: u64,
}
impl DecodePageTask for SimpleAllNullDecodePageTask {
fn decode(self: Box<Self>) -> Result<DecodedPage> {
let unraveler = RepDefUnraveler::new(
None,
Some(vec![1; self.num_values as usize]),
Arc::new([DefinitionInterpretation::NullableItem]),
);
Ok(DecodedPage {
data: DataBlock::AllNull(AllNullDataBlock {
num_values: self.num_values,
}),
repdef: unraveler,
})
}
}
#[derive(Debug)]
pub struct SimpleAllNullPageDecoder {
num_rows: u64,
}
impl StructuralPageDecoder for SimpleAllNullPageDecoder {
fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
Ok(Box::new(SimpleAllNullDecodePageTask {
num_values: num_rows,
}))
}
fn num_rows(&self) -> u64 {
self.num_rows
}
}
#[derive(Debug, Clone)]
struct MiniBlockSchedulerDictionary {
dictionary_decompressor: Arc<dyn BlockDecompressor>,
dictionary_buf_position_and_size: (u64, u64),
dictionary_data_alignment: u64,
dictionary_data: Arc<DataBlock>,
}
#[derive(Debug)]
pub struct MiniBlockScheduler {
buffer_offsets_and_sizes: Vec<(u64, u64)>,
priority: u64,
items_in_page: u64,
repetition_index_depth: u16,
rep_decompressor: Arc<dyn BlockDecompressor>,
def_decompressor: Arc<dyn BlockDecompressor>,
value_decompressor: Arc<dyn MiniBlockDecompressor>,
def_meaning: Arc<[DefinitionInterpretation]>,
chunk_meta: Vec<ChunkMeta>,
rep_index: Vec<Vec<u64>>,
dictionary: Option<MiniBlockSchedulerDictionary>,
}
impl MiniBlockScheduler {
fn try_new(
buffer_offsets_and_sizes: &[(u64, u64)],
priority: u64,
items_in_page: u64,
layout: &pb::MiniBlockLayout,
decompressors: &dyn DecompressorStrategy,
) -> Result<Self> {
let rep_decompressor =
decompressors.create_block_decompressor(layout.rep_compression.as_ref().unwrap())?;
let def_decompressor =
decompressors.create_block_decompressor(layout.def_compression.as_ref().unwrap())?;
let def_meaning = layout
.layers
.iter()
.map(|l| ProtobufUtils::repdef_layer_to_def_interp(*l))
.collect::<Vec<_>>();
let value_decompressor = decompressors
.create_miniblock_decompressor(layout.value_compression.as_ref().unwrap())?;
let dictionary = if let Some(dictionary_encoding) = layout.dictionary.as_ref() {
match dictionary_encoding.array_encoding.as_ref().unwrap() {
pb::array_encoding::ArrayEncoding::BinaryBlock(_) => {
Some(MiniBlockSchedulerDictionary {
dictionary_decompressor: decompressors
.create_block_decompressor(dictionary_encoding)?
.into(),
dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
dictionary_data_alignment: 4,
dictionary_data: Arc::new(DataBlock::Empty()),
})
}
pb::array_encoding::ArrayEncoding::Flat(_) => Some(MiniBlockSchedulerDictionary {
dictionary_decompressor: decompressors
.create_block_decompressor(dictionary_encoding)?
.into(),
dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
dictionary_data_alignment: 16,
dictionary_data: Arc::new(DataBlock::Empty()),
}),
_ => {
unreachable!("Currently only encodings `BinaryBlock` and `Flat` used for encoding MiniBlock dictionary.")
}
}
} else {
None
};
Ok(Self {
buffer_offsets_and_sizes: buffer_offsets_and_sizes.to_vec(),
rep_decompressor: rep_decompressor.into(),
def_decompressor: def_decompressor.into(),
value_decompressor: value_decompressor.into(),
repetition_index_depth: layout.repetition_index_depth as u16,
priority,
items_in_page,
chunk_meta: Vec::new(),
rep_index: Vec::new(),
dictionary,
def_meaning: def_meaning.into(),
})
}
fn lookup_chunks(&self, chunk_indices: &[usize]) -> Vec<LoadedChunk> {
chunk_indices
.iter()
.map(|&chunk_idx| {
let chunk_meta = &self.chunk_meta[chunk_idx];
let bytes_start = chunk_meta.offset_bytes;
let bytes_end = bytes_start + chunk_meta.chunk_size_bytes;
LoadedChunk {
byte_range: bytes_start..bytes_end,
items_in_chunk: chunk_meta.num_values,
chunk_idx,
data: LanceBuffer::empty(),
}
})
.collect()
}
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
enum PreambleAction {
Take,
Skip,
Absent,
}
#[derive(Clone, Debug, PartialEq, Eq)]
struct ChunkInstructions {
chunk_idx: usize,
preamble: PreambleAction,
rows_to_skip: u64,
rows_to_take: u64,
take_trailer: bool,
}
#[derive(Debug, PartialEq, Eq)]
struct ChunkDrainInstructions {
chunk_instructions: ChunkInstructions,
rows_to_skip: u64,
rows_to_take: u64,
preamble_action: PreambleAction,
}
impl ChunkInstructions {
fn schedule_instructions(rep_index: &[Vec<u64>], user_ranges: &[Range<u64>]) -> Vec<Self> {
let rep_len = rep_index.len();
let mut rep_iter = rep_index.iter().enumerate();
let (mut cur_rep_idx, mut cur_rep) = rep_iter.next().unwrap();
let mut offset = 0;
let mut chunk_has_preamble = false;
let mut chunk_has_trailer = cur_rep[1] > 0;
let mut chunk_instructions = Vec::with_capacity(rep_len + user_ranges.len());
for user_range in user_ranges {
let mut to_skip = user_range.start - offset;
let mut rows_needed = user_range.end - user_range.start;
let mut need_preamble = false;
while rows_needed > 0 || need_preamble {
let mut rows_in_chunk_incl_trailer = cur_rep[0];
if chunk_has_trailer {
rows_in_chunk_incl_trailer += 1;
}
if chunk_has_preamble {
rows_in_chunk_incl_trailer -= 1;
}
let mut consumed_chunk = false;
if rows_in_chunk_incl_trailer <= to_skip {
consumed_chunk = true;
need_preamble = false;
} else {
let rows_available = rows_in_chunk_incl_trailer - to_skip;
let rows_to_take = if rows_available > rows_needed {
rows_needed
} else {
consumed_chunk = true;
rows_available
};
rows_needed -= rows_to_take;
let mut take_trailer = false;
let preamble = if chunk_has_preamble {
if need_preamble {
PreambleAction::Take
} else {
PreambleAction::Skip
}
} else {
PreambleAction::Absent
};
let mut rows_to_take_no_trailer = rows_to_take;
if rows_to_take == rows_available && chunk_has_trailer {
take_trailer = true;
need_preamble = true;
rows_to_take_no_trailer -= 1;
} else {
need_preamble = false;
};
chunk_instructions.push(Self {
preamble,
chunk_idx: cur_rep_idx,
rows_to_skip: to_skip,
rows_to_take: rows_to_take_no_trailer,
take_trailer,
});
}
if consumed_chunk {
to_skip = to_skip.saturating_sub(rows_in_chunk_incl_trailer);
offset += rows_in_chunk_incl_trailer;
chunk_has_preamble = chunk_has_trailer;
if let Some((next_rep_idx, next_rep)) = rep_iter.next() {
cur_rep_idx = next_rep_idx;
cur_rep = next_rep;
chunk_has_trailer = cur_rep[1] > 0;
}
}
}
}
if user_ranges.len() > 1 {
let mut merged_instructions = Vec::with_capacity(chunk_instructions.len());
let mut instructions_iter = chunk_instructions.into_iter();
merged_instructions.push(instructions_iter.next().unwrap());
for instruction in instructions_iter {
let last = merged_instructions.last_mut().unwrap();
if last.chunk_idx == instruction.chunk_idx
&& last.rows_to_take + last.rows_to_skip == instruction.rows_to_skip
{
last.rows_to_take += instruction.rows_to_take;
last.take_trailer |= instruction.take_trailer;
} else {
merged_instructions.push(instruction);
}
}
merged_instructions
} else {
chunk_instructions
}
}
fn drain_from_instruction(
&self,
rows_desired: &mut u64,
need_preamble: &mut bool,
skip_in_chunk: &mut u64,
) -> (ChunkDrainInstructions, bool) {
debug_assert!(!*need_preamble || *skip_in_chunk == 0);
let mut rows_avail = self.rows_to_take - *skip_in_chunk;
let has_preamble = self.preamble != PreambleAction::Absent;
let preamble_action = match (*need_preamble, has_preamble) {
(true, true) => PreambleAction::Take,
(true, false) => panic!("Need preamble but there isn't one"),
(false, true) => PreambleAction::Skip,
(false, false) => PreambleAction::Absent,
};
if self.take_trailer {
rows_avail += 1;
}
let rows_taking = if *rows_desired >= rows_avail {
*need_preamble = self.take_trailer;
rows_avail
} else {
*need_preamble = false;
*rows_desired
};
let rows_skipped = *skip_in_chunk;
let consumed_chunk = if *rows_desired >= rows_avail {
*rows_desired -= rows_avail;
*skip_in_chunk = 0;
true
} else {
*skip_in_chunk += *rows_desired;
*rows_desired = 0;
false
};
(
ChunkDrainInstructions {
chunk_instructions: self.clone(),
rows_to_skip: rows_skipped,
rows_to_take: rows_taking,
preamble_action,
},
consumed_chunk,
)
}
}
impl StructuralPageScheduler for MiniBlockScheduler {
fn initialize<'a>(&'a mut self, io: &Arc<dyn EncodingsIo>) -> BoxFuture<'a, Result<()>> {
let (meta_buf_position, meta_buf_size) = self.buffer_offsets_and_sizes[0];
let value_buf_position = self.buffer_offsets_and_sizes[1].0;
let mut bufs_needed = 1;
if self.dictionary.is_some() {
bufs_needed += 1;
}
if self.repetition_index_depth > 0 {
bufs_needed += 1;
}
let mut required_ranges = Vec::with_capacity(bufs_needed);
required_ranges.push(meta_buf_position..meta_buf_position + meta_buf_size);
if let Some(ref dictionary) = self.dictionary {
required_ranges.push(
dictionary.dictionary_buf_position_and_size.0
..dictionary.dictionary_buf_position_and_size.0
+ dictionary.dictionary_buf_position_and_size.1,
);
}
if self.repetition_index_depth > 0 {
let (rep_index_pos, rep_index_size) = self.buffer_offsets_and_sizes.last().unwrap();
required_ranges.push(*rep_index_pos..*rep_index_pos + *rep_index_size);
}
let io_req = io.submit_request(required_ranges, 0);
async move {
let mut buffers = io_req.await?.into_iter().fuse();
let meta_bytes = buffers.next().unwrap();
let dictionary_bytes = self.dictionary.as_ref().and_then(|_| buffers.next());
let rep_index_bytes = buffers.next();
assert!(meta_bytes.len() % 2 == 0);
let mut bytes = LanceBuffer::from_bytes(meta_bytes, 2);
let words = bytes.borrow_to_typed_slice::<u16>();
let words = words.as_ref();
self.chunk_meta.reserve(words.len());
let mut rows_counter = 0;
let mut offset_bytes = value_buf_position;
for (word_idx, word) in words.iter().enumerate() {
let log_num_values = word & 0x0F;
let divided_bytes = word >> 4;
let num_bytes = (divided_bytes as usize + 1) * MINIBLOCK_ALIGNMENT;
debug_assert!(num_bytes > 0);
let num_values = if word_idx < words.len() - 1 {
debug_assert!(log_num_values > 0);
1 << log_num_values
} else {
debug_assert_eq!(log_num_values, 0);
self.items_in_page - rows_counter
};
rows_counter += num_values;
self.chunk_meta.push(ChunkMeta {
num_values,
chunk_size_bytes: num_bytes as u64,
offset_bytes,
});
offset_bytes += num_bytes as u64;
}
if let Some(rep_index_data) = rep_index_bytes {
assert!(rep_index_data.len() % 8 == 0);
let mut repetition_index_vals = LanceBuffer::from_bytes(rep_index_data, 8);
let repetition_index_vals = repetition_index_vals.borrow_to_typed_slice::<u64>();
self.rep_index = repetition_index_vals
.as_ref()
.chunks_exact(self.repetition_index_depth as usize + 1)
.map(|c| c.to_vec())
.collect();
} else {
self.rep_index = self
.chunk_meta
.iter()
.map(|c| vec![c.num_values, 0])
.collect();
};
if let Some(ref mut dictionary) = self.dictionary {
let dictionary_data = dictionary_bytes.unwrap();
dictionary.dictionary_data =
Arc::new(dictionary.dictionary_decompressor.decompress(
LanceBuffer::from_bytes(
dictionary_data,
dictionary.dictionary_data_alignment,
),
)?)
};
Ok(())
}
.boxed()
}
fn schedule_ranges(
&self,
ranges: &[Range<u64>],
io: &dyn EncodingsIo,
) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
let chunk_instructions = ChunkInstructions::schedule_instructions(&self.rep_index, ranges);
let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
debug_assert_eq!(
num_rows,
chunk_instructions
.iter()
.map(|ci| {
let taken = ci.rows_to_take;
if ci.take_trailer {
taken + 1
} else {
taken
}
})
.sum::<u64>()
);
let chunks_needed = chunk_instructions
.iter()
.map(|ci| ci.chunk_idx)
.unique()
.collect::<Vec<_>>();
let mut loaded_chunks = self.lookup_chunks(&chunks_needed);
let chunk_ranges = loaded_chunks
.iter()
.map(|c| c.byte_range.clone())
.collect::<Vec<_>>();
let loaded_chunk_data = io.submit_request(chunk_ranges, self.priority);
let rep_decompressor = self.rep_decompressor.clone();
let def_decompressor = self.def_decompressor.clone();
let value_decompressor = self.value_decompressor.clone();
let dictionary = self
.dictionary
.as_ref()
.map(|dictionary| dictionary.dictionary_data.clone());
let def_meaning = self.def_meaning.clone();
Ok(async move {
let loaded_chunk_data = loaded_chunk_data.await?;
for (loaded_chunk, chunk_data) in loaded_chunks.iter_mut().zip(loaded_chunk_data) {
loaded_chunk.data = LanceBuffer::from_bytes(chunk_data, 1);
}
Ok(Box::new(MiniBlockDecoder {
rep_decompressor,
def_decompressor,
value_decompressor,
def_meaning,
loaded_chunks: VecDeque::from_iter(loaded_chunks),
instructions: VecDeque::from(chunk_instructions),
offset_in_current_chunk: 0,
dictionary,
num_rows,
}) as Box<dyn StructuralPageDecoder>)
}
.boxed())
}
}
#[derive(Debug)]
pub struct FullZipScheduler {
data_buf_position: u64,
priority: u64,
rows_in_page: u64,
value_decompressor: Arc<dyn PerValueDecompressor>,
def_meaning: Arc<[DefinitionInterpretation]>,
ctrl_word_parser: ControlWordParser,
}
impl FullZipScheduler {
fn try_new(
buffer_offsets_and_sizes: &[(u64, u64)],
priority: u64,
rows_in_page: u64,
layout: &pb::FullZipLayout,
decompressors: &dyn DecompressorStrategy,
) -> Result<Self> {
let (data_buf_position, _) = buffer_offsets_and_sizes[0];
let value_decompressor = decompressors
.create_per_value_decompressor(layout.value_compression.as_ref().unwrap())?;
let ctrl_word_parser = ControlWordParser::new(
layout.bits_rep.try_into().unwrap(),
layout.bits_def.try_into().unwrap(),
);
let def_meaning = layout
.layers
.iter()
.map(|l| ProtobufUtils::repdef_layer_to_def_interp(*l))
.collect::<Vec<_>>();
Ok(Self {
data_buf_position,
value_decompressor: value_decompressor.into(),
def_meaning: def_meaning.into(),
priority,
rows_in_page,
ctrl_word_parser,
})
}
}
impl StructuralPageScheduler for FullZipScheduler {
fn initialize<'a>(&'a mut self, _io: &Arc<dyn EncodingsIo>) -> BoxFuture<'a, Result<()>> {
std::future::ready(Ok(())).boxed()
}
fn schedule_ranges(
&self,
ranges: &[Range<u64>],
io: &dyn EncodingsIo,
) -> Result<BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>> {
let bits_per_value = self.value_decompressor.bits_per_value();
assert_eq!(bits_per_value % 8, 0);
let bytes_per_value = bits_per_value / 8;
let bytes_per_cw = self.ctrl_word_parser.bytes_per_word();
let total_bytes_per_value = bytes_per_value + bytes_per_cw as u64;
let byte_ranges = ranges.iter().map(|r| {
debug_assert!(r.end <= self.rows_in_page);
let start = self.data_buf_position + r.start * total_bytes_per_value;
let end = self.data_buf_position + r.end * total_bytes_per_value;
start..end
});
let data = io.submit_request(byte_ranges.collect(), self.priority);
let value_decompressor = self.value_decompressor.clone();
let def_meaning = self.def_meaning.clone();
let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
let ctrl_word_parser = self.ctrl_word_parser;
Ok(async move {
let data = data.await?;
let data = data
.into_iter()
.map(|d| LanceBuffer::from_bytes(d, 1))
.collect();
Ok(Box::new(FixedFullZipDecoder {
value_decompressor,
def_meaning,
data,
num_rows,
ctrl_word_parser,
offset_in_current: 0,
bytes_per_value: bytes_per_value as usize,
total_bytes_per_value: total_bytes_per_value as usize,
}) as Box<dyn StructuralPageDecoder>)
}
.boxed())
}
}
#[derive(Debug)]
struct FixedFullZipDecoder {
value_decompressor: Arc<dyn PerValueDecompressor>,
def_meaning: Arc<[DefinitionInterpretation]>,
ctrl_word_parser: ControlWordParser,
data: VecDeque<LanceBuffer>,
offset_in_current: usize,
bytes_per_value: usize,
total_bytes_per_value: usize,
num_rows: u64,
}
impl StructuralPageDecoder for FixedFullZipDecoder {
fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
let mut task_data = Vec::with_capacity(self.data.len());
let mut remaining = num_rows;
while remaining > 0 {
let cur_buf = self.data.front_mut().unwrap();
let bytes_avail = cur_buf.len() - self.offset_in_current;
let bytes_needed = remaining as usize * self.total_bytes_per_value;
let bytes_to_take = bytes_needed.min(bytes_avail);
let task_slice = cur_buf.slice_with_length(self.offset_in_current, bytes_to_take);
let rows_in_task = (bytes_to_take / self.total_bytes_per_value) as u64;
task_data.push((task_slice, rows_in_task));
remaining -= rows_in_task;
if bytes_to_take + self.offset_in_current == cur_buf.len() {
self.data.pop_front();
self.offset_in_current = 0;
} else {
self.offset_in_current += bytes_to_take;
}
}
let num_rows = task_data.iter().map(|td| td.1).sum::<u64>() as usize;
Ok(Box::new(FixedFullZipDecodeTask {
value_decompressor: self.value_decompressor.clone(),
def_meaning: self.def_meaning.clone(),
ctrl_word_parser: self.ctrl_word_parser,
data: task_data,
bytes_per_value: self.bytes_per_value,
num_rows,
}))
}
fn num_rows(&self) -> u64 {
self.num_rows
}
}
#[derive(Debug)]
struct FixedFullZipDecodeTask {
value_decompressor: Arc<dyn PerValueDecompressor>,
def_meaning: Arc<[DefinitionInterpretation]>,
ctrl_word_parser: ControlWordParser,
data: Vec<(LanceBuffer, u64)>,
num_rows: usize,
bytes_per_value: usize,
}
impl DecodePageTask for FixedFullZipDecodeTask {
fn decode(self: Box<Self>) -> Result<DecodedPage> {
let estimated_size_bytes = self.data.iter().map(|data| data.0.len()).sum::<usize>() * 2;
let mut data_builder =
DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
if self.ctrl_word_parser.bytes_per_word() == 0 {
for (buf, rows_in_buf) in self.data.into_iter() {
let decompressed = self.value_decompressor.decompress(buf, rows_in_buf)?;
data_builder.append(&decompressed, 0..rows_in_buf);
}
let unraveler = RepDefUnraveler::new(None, None, self.def_meaning);
Ok(DecodedPage {
data: data_builder.finish(),
repdef: unraveler,
})
} else {
let mut rep = Vec::with_capacity(self.num_rows);
let mut def = Vec::with_capacity(self.num_rows);
for (buf, rows_in_buf) in self.data.into_iter() {
let mut buf_slice = buf.as_ref();
let mut values = Vec::with_capacity(
buf.len() - (self.ctrl_word_parser.bytes_per_word() * rows_in_buf as usize),
);
for _ in 0..rows_in_buf {
self.ctrl_word_parser.parse(buf_slice, &mut rep, &mut def);
buf_slice = &buf_slice[self.ctrl_word_parser.bytes_per_word()..];
values.extend_from_slice(buf_slice[..self.bytes_per_value].as_ref());
buf_slice = &buf_slice[self.bytes_per_value..];
}
let values_buf = LanceBuffer::Owned(values);
let decompressed = self
.value_decompressor
.decompress(values_buf, rows_in_buf)?;
data_builder.append(&decompressed, 0..rows_in_buf);
}
let repetition = if rep.is_empty() { None } else { Some(rep) };
let definition = if def.is_empty() { None } else { Some(def) };
let unraveler = RepDefUnraveler::new(
repetition,
definition,
self.def_meaning,
);
Ok(DecodedPage {
data: data_builder.finish(),
repdef: unraveler,
})
}
}
}
#[derive(Debug)]
struct StructuralPrimitiveFieldSchedulingJob<'a> {
scheduler: &'a StructuralPrimitiveFieldScheduler,
ranges: Vec<Range<u64>>,
page_idx: usize,
range_idx: usize,
range_offset: u64,
global_row_offset: u64,
}
impl<'a> StructuralPrimitiveFieldSchedulingJob<'a> {
pub fn new(scheduler: &'a StructuralPrimitiveFieldScheduler, ranges: Vec<Range<u64>>) -> Self {
Self {
scheduler,
ranges,
page_idx: 0,
range_idx: 0,
range_offset: 0,
global_row_offset: 0,
}
}
}
impl StructuralSchedulingJob for StructuralPrimitiveFieldSchedulingJob<'_> {
fn schedule_next(
&mut self,
context: &mut SchedulerContext,
) -> Result<Option<ScheduledScanLine>> {
if self.range_idx >= self.ranges.len() {
return Ok(None);
}
let mut range = self.ranges[self.range_idx].clone();
range.start += self.range_offset;
let priority = range.start;
let mut cur_page = &self.scheduler.page_schedulers[self.page_idx];
trace!(
"Current range is {:?} and current page has {} rows",
range,
cur_page.num_rows
);
while cur_page.num_rows + self.global_row_offset <= range.start {
self.global_row_offset += cur_page.num_rows;
self.page_idx += 1;
trace!("Skipping entire page of {} rows", cur_page.num_rows);
cur_page = &self.scheduler.page_schedulers[self.page_idx];
}
let mut ranges_in_page = Vec::new();
while cur_page.num_rows + self.global_row_offset > range.start {
range.start = range.start.max(self.global_row_offset);
let start_in_page = range.start - self.global_row_offset;
let end_in_page = start_in_page + (range.end - range.start);
let end_in_page = end_in_page.min(cur_page.num_rows);
let last_in_range = (end_in_page + self.global_row_offset) >= range.end;
ranges_in_page.push(start_in_page..end_in_page);
if last_in_range {
self.range_idx += 1;
if self.range_idx == self.ranges.len() {
break;
}
range = self.ranges[self.range_idx].clone();
} else {
break;
}
}
let num_rows_in_next = ranges_in_page.iter().map(|r| r.end - r.start).sum();
trace!(
"Scheduling {} rows across {} ranges from page with {} rows (priority={}, column_index={}, page_index={})",
num_rows_in_next,
ranges_in_page.len(),
cur_page.num_rows,
priority,
self.scheduler.column_index,
cur_page.page_index,
);
self.global_row_offset += cur_page.num_rows;
self.page_idx += 1;
let page_decoder = cur_page
.scheduler
.schedule_ranges(&ranges_in_page, context.io().as_ref())?;
let cur_path = context.current_path();
let page_index = cur_page.page_index;
let unloaded_page = async move {
let page_decoder = page_decoder.await?;
Ok(LoadedPage {
decoder: page_decoder,
path: cur_path,
page_index,
})
}
.boxed();
Ok(Some(ScheduledScanLine {
decoders: vec![MessageType::UnloadedPage(UnloadedPage(unloaded_page))],
rows_scheduled: num_rows_in_next,
}))
}
}
#[derive(Debug)]
struct PageInfoAndScheduler {
page_index: usize,
num_rows: u64,
scheduler: Box<dyn StructuralPageScheduler>,
}
#[derive(Debug)]
pub struct StructuralPrimitiveFieldScheduler {
page_schedulers: Vec<PageInfoAndScheduler>,
column_index: u32,
}
impl StructuralPrimitiveFieldScheduler {
pub fn try_new(
column_info: &ColumnInfo,
decompressors: &dyn DecompressorStrategy,
) -> Result<Self> {
let page_schedulers = column_info
.page_infos
.iter()
.enumerate()
.map(|(page_index, page_info)| {
Self::page_info_to_scheduler(page_info, page_index, decompressors)
})
.collect::<Result<Vec<_>>>()?;
Ok(Self {
page_schedulers,
column_index: column_info.index,
})
}
fn page_info_to_scheduler(
page_info: &PageInfo,
page_index: usize,
decompressors: &dyn DecompressorStrategy,
) -> Result<PageInfoAndScheduler> {
let scheduler: Box<dyn StructuralPageScheduler> =
match page_info.encoding.as_structural().layout.as_ref() {
Some(pb::page_layout::Layout::MiniBlockLayout(mini_block)) => {
Box::new(MiniBlockScheduler::try_new(
&page_info.buffer_offsets_and_sizes,
page_info.priority,
mini_block.num_items,
mini_block,
decompressors,
)?)
}
Some(pb::page_layout::Layout::FullZipLayout(full_zip)) => {
Box::new(FullZipScheduler::try_new(
&page_info.buffer_offsets_and_sizes,
page_info.priority,
page_info.num_rows,
full_zip,
decompressors,
)?)
}
Some(pb::page_layout::Layout::AllNullLayout(all_null)) => {
let def_meaning = all_null
.layers
.iter()
.map(|l| ProtobufUtils::repdef_layer_to_def_interp(*l))
.collect::<Vec<_>>();
if def_meaning.len() == 1
&& def_meaning[0] == DefinitionInterpretation::NullableItem
{
Box::new(SimpleAllNullScheduler::default())
as Box<dyn StructuralPageScheduler>
} else {
Box::new(ComplexAllNullScheduler::new(
page_info.buffer_offsets_and_sizes.clone(),
def_meaning.into(),
)) as Box<dyn StructuralPageScheduler>
}
}
_ => todo!(),
};
Ok(PageInfoAndScheduler {
page_index,
num_rows: page_info.num_rows,
scheduler,
})
}
}
impl StructuralFieldScheduler for StructuralPrimitiveFieldScheduler {
fn initialize<'a>(
&'a mut self,
_filter: &'a FilterExpression,
context: &'a SchedulerContext,
) -> BoxFuture<'a, Result<()>> {
let page_init = self
.page_schedulers
.iter_mut()
.map(|s| s.scheduler.initialize(context.io()))
.collect::<FuturesUnordered<_>>();
async move {
page_init.try_collect::<Vec<_>>().await?;
Ok(())
}
.boxed()
}
fn schedule_ranges<'a>(
&'a self,
ranges: &[Range<u64>],
_filter: &FilterExpression,
) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
let ranges = ranges.to_vec();
Ok(Box::new(StructuralPrimitiveFieldSchedulingJob::new(
self, ranges,
)))
}
}
pub struct PrimitiveFieldDecoder {
data_type: DataType,
unloaded_physical_decoder: Option<BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>>,
physical_decoder: Option<Arc<dyn PrimitivePageDecoder>>,
should_validate: bool,
num_rows: u64,
rows_drained: u64,
column_index: u32,
page_index: u32,
}
impl PrimitiveFieldDecoder {
pub fn new_from_data(
physical_decoder: Arc<dyn PrimitivePageDecoder>,
data_type: DataType,
num_rows: u64,
should_validate: bool,
) -> Self {
Self {
data_type,
unloaded_physical_decoder: None,
physical_decoder: Some(physical_decoder),
should_validate,
num_rows,
rows_drained: 0,
column_index: u32::MAX,
page_index: u32::MAX,
}
}
}
impl Debug for PrimitiveFieldDecoder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PrimitiveFieldDecoder")
.field("data_type", &self.data_type)
.field("num_rows", &self.num_rows)
.field("rows_drained", &self.rows_drained)
.finish()
}
}
struct PrimitiveFieldDecodeTask {
rows_to_skip: u64,
rows_to_take: u64,
should_validate: bool,
physical_decoder: Arc<dyn PrimitivePageDecoder>,
data_type: DataType,
}
impl DecodeArrayTask for PrimitiveFieldDecodeTask {
fn decode(self: Box<Self>) -> Result<ArrayRef> {
let block = self
.physical_decoder
.decode(self.rows_to_skip, self.rows_to_take)?;
let array = make_array(block.into_arrow(self.data_type.clone(), self.should_validate)?);
if let DataType::Dictionary(_, _) = self.data_type {
let dict = array.as_any_dictionary();
if let Some(nulls) = array.logical_nulls() {
let new_indices = dict.keys().to_data();
let new_array = make_array(
new_indices
.into_builder()
.nulls(Some(nulls))
.add_child_data(dict.values().to_data())
.data_type(dict.data_type().clone())
.build()?,
);
return Ok(new_array);
}
}
Ok(array)
}
}
impl LogicalPageDecoder for PrimitiveFieldDecoder {
fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<Result<()>> {
log::trace!(
"primitive wait for more than {} rows on column {} and page {} (page has {} rows)",
loaded_need,
self.column_index,
self.page_index,
self.num_rows
);
async move {
let physical_decoder = self.unloaded_physical_decoder.take().unwrap().await?;
self.physical_decoder = Some(Arc::from(physical_decoder));
Ok(())
}
.boxed()
}
fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
if self.physical_decoder.as_ref().is_none() {
return Err(lance_core::Error::Internal {
message: format!("drain was called on primitive field decoder for data type {} on column {} but the decoder was never awaited", self.data_type, self.column_index),
location: location!(),
});
}
let rows_to_skip = self.rows_drained;
let rows_to_take = num_rows;
self.rows_drained += rows_to_take;
let task = Box::new(PrimitiveFieldDecodeTask {
rows_to_skip,
rows_to_take,
should_validate: self.should_validate,
physical_decoder: self.physical_decoder.as_ref().unwrap().clone(),
data_type: self.data_type.clone(),
});
Ok(NextDecodeTask {
task,
num_rows: rows_to_take,
has_more: self.rows_drained != self.num_rows,
})
}
fn rows_loaded(&self) -> u64 {
if self.unloaded_physical_decoder.is_some() {
0
} else {
self.num_rows
}
}
fn rows_drained(&self) -> u64 {
if self.unloaded_physical_decoder.is_some() {
0
} else {
self.rows_drained
}
}
fn num_rows(&self) -> u64 {
self.num_rows
}
fn data_type(&self) -> &DataType {
&self.data_type
}
}
#[derive(Debug)]
pub struct StructuralCompositeDecodeArrayTask {
tasks: Vec<Box<dyn DecodePageTask>>,
data_type: DataType,
should_validate: bool,
}
impl StructuralDecodeArrayTask for StructuralCompositeDecodeArrayTask {
fn decode(self: Box<Self>) -> Result<DecodedArray> {
let mut arrays = Vec::with_capacity(self.tasks.len());
let mut unravelers = Vec::with_capacity(self.tasks.len());
for task in self.tasks {
let decoded = task.decode()?;
unravelers.push(decoded.repdef);
let array = make_array(
decoded
.data
.into_arrow(self.data_type.clone(), self.should_validate)?,
);
arrays.push(array);
}
let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
let array = arrow_select::concat::concat(&array_refs)?;
let mut repdef = CompositeRepDefUnraveler::new(unravelers);
let mut validity = repdef.unravel_validity(array.len());
if matches!(self.data_type, DataType::Null) {
validity = None;
}
if let Some(validity) = validity.as_ref() {
assert!(validity.len() == array.len());
}
let array = make_array(unsafe {
array
.to_data()
.into_builder()
.nulls(validity)
.build_unchecked()
});
Ok(DecodedArray { array, repdef })
}
}
#[derive(Debug)]
pub struct StructuralPrimitiveFieldDecoder {
field: Arc<ArrowField>,
page_decoders: VecDeque<Box<dyn StructuralPageDecoder>>,
should_validate: bool,
rows_drained_in_current: u64,
}
impl StructuralPrimitiveFieldDecoder {
pub fn new(field: &Arc<ArrowField>, should_validate: bool) -> Self {
Self {
field: field.clone(),
page_decoders: VecDeque::new(),
should_validate,
rows_drained_in_current: 0,
}
}
}
impl StructuralFieldDecoder for StructuralPrimitiveFieldDecoder {
fn accept_page(&mut self, child: LoadedPage) -> Result<()> {
assert!(child.path.is_empty());
self.page_decoders.push_back(child.decoder);
Ok(())
}
fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
let mut remaining = num_rows;
let mut tasks = Vec::new();
while remaining > 0 {
let cur_page = self.page_decoders.front_mut().unwrap();
let num_in_page = cur_page.num_rows() - self.rows_drained_in_current;
let to_take = num_in_page.min(remaining);
let task = cur_page.drain(to_take)?;
tasks.push(task);
if to_take == num_in_page {
self.page_decoders.pop_front();
self.rows_drained_in_current = 0;
} else {
self.rows_drained_in_current += to_take;
}
remaining -= to_take;
}
Ok(Box::new(StructuralCompositeDecodeArrayTask {
tasks,
data_type: self.field.data_type().clone(),
should_validate: self.should_validate,
}))
}
fn data_type(&self) -> &DataType {
self.field.data_type()
}
}
#[derive(Debug)]
pub struct AccumulationQueue {
cache_bytes: u64,
keep_original_array: bool,
buffered_arrays: Vec<ArrayRef>,
current_bytes: u64,
row_number: u64,
num_rows: u64,
column_index: u32,
}
impl AccumulationQueue {
pub fn new(cache_bytes: u64, column_index: u32, keep_original_array: bool) -> Self {
Self {
cache_bytes,
buffered_arrays: Vec::new(),
current_bytes: 0,
column_index,
keep_original_array,
row_number: u64::MAX,
num_rows: 0,
}
}
pub fn insert(
&mut self,
array: ArrayRef,
row_number: u64,
num_rows: u64,
) -> Option<(Vec<ArrayRef>, u64, u64)> {
if self.row_number == u64::MAX {
self.row_number = row_number;
}
self.num_rows += num_rows;
self.current_bytes += array.get_array_memory_size() as u64;
if self.current_bytes > self.cache_bytes {
debug!(
"Flushing column {} page of size {} bytes (unencoded)",
self.column_index, self.current_bytes
);
self.buffered_arrays.push(array);
self.current_bytes = 0;
let row_number = self.row_number;
self.row_number = u64::MAX;
let num_rows = self.num_rows;
self.num_rows = 0;
Some((
std::mem::take(&mut self.buffered_arrays),
row_number,
num_rows,
))
} else {
trace!(
"Accumulating data for column {}. Now at {} bytes",
self.column_index,
self.current_bytes
);
if self.keep_original_array {
self.buffered_arrays.push(array);
} else {
self.buffered_arrays.push(deep_copy_array(array.as_ref()))
}
None
}
}
pub fn flush(&mut self) -> Option<(Vec<ArrayRef>, u64, u64)> {
if self.buffered_arrays.is_empty() {
trace!(
"No final flush since no data at column {}",
self.column_index
);
None
} else {
trace!(
"Final flush of column {} which has {} bytes",
self.column_index,
self.current_bytes
);
self.current_bytes = 0;
let row_number = self.row_number;
self.row_number = u64::MAX;
let num_rows = self.num_rows;
self.num_rows = 0;
Some((
std::mem::take(&mut self.buffered_arrays),
row_number,
num_rows,
))
}
}
}
pub struct PrimitiveFieldEncoder {
accumulation_queue: AccumulationQueue,
array_encoding_strategy: Arc<dyn ArrayEncodingStrategy>,
column_index: u32,
field: Field,
max_page_bytes: u64,
}
impl PrimitiveFieldEncoder {
pub fn try_new(
options: &EncodingOptions,
array_encoding_strategy: Arc<dyn ArrayEncodingStrategy>,
column_index: u32,
field: Field,
) -> Result<Self> {
Ok(Self {
accumulation_queue: AccumulationQueue::new(
options.cache_bytes_per_column,
column_index,
options.keep_original_array,
),
column_index,
max_page_bytes: options.max_page_bytes,
array_encoding_strategy,
field,
})
}
fn create_encode_task(&mut self, arrays: Vec<ArrayRef>) -> Result<EncodeTask> {
let encoder = self
.array_encoding_strategy
.create_array_encoder(&arrays, &self.field)?;
let column_idx = self.column_index;
let data_type = self.field.data_type();
Ok(tokio::task::spawn(async move {
let num_values = arrays.iter().map(|arr| arr.len() as u64).sum();
let data = DataBlock::from_arrays(&arrays, num_values);
let mut buffer_index = 0;
let array = encoder.encode(data, &data_type, &mut buffer_index)?;
let (data, description) = array.into_buffers();
Ok(EncodedPage {
data,
description: PageEncoding::Legacy(description),
num_rows: num_values,
column_idx,
row_number: 0, })
})
.map(|res_res| res_res.unwrap())
.boxed())
}
fn do_flush(&mut self, arrays: Vec<ArrayRef>) -> Result<Vec<EncodeTask>> {
if arrays.len() == 1 {
let array = arrays.into_iter().next().unwrap();
let size_bytes = array.get_buffer_memory_size();
let num_parts = bit_util::ceil(size_bytes, self.max_page_bytes as usize);
let num_parts = num_parts.min(array.len());
if num_parts <= 1 {
Ok(vec![self.create_encode_task(vec![array])?])
} else {
let mut tasks = Vec::with_capacity(num_parts);
let mut offset = 0;
let part_size = bit_util::ceil(array.len(), num_parts);
for _ in 0..num_parts {
let avail = array.len() - offset;
let chunk_size = avail.min(part_size);
let part = array.slice(offset, chunk_size);
let task = self.create_encode_task(vec![part])?;
tasks.push(task);
offset += chunk_size;
}
Ok(tasks)
}
} else {
Ok(vec![self.create_encode_task(arrays)?])
}
}
}
impl FieldEncoder for PrimitiveFieldEncoder {
fn maybe_encode(
&mut self,
array: ArrayRef,
_external_buffers: &mut OutOfLineBuffers,
_repdef: RepDefBuilder,
row_number: u64,
num_rows: u64,
) -> Result<Vec<EncodeTask>> {
if let Some(arrays) = self.accumulation_queue.insert(array, row_number, num_rows) {
Ok(self.do_flush(arrays.0)?)
} else {
Ok(vec![])
}
}
fn flush(&mut self, _external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
if let Some(arrays) = self.accumulation_queue.flush() {
Ok(self.do_flush(arrays.0)?)
} else {
Ok(vec![])
}
}
fn num_columns(&self) -> u32 {
1
}
fn finish(
&mut self,
_external_buffers: &mut OutOfLineBuffers,
) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
std::future::ready(Ok(vec![EncodedColumn::default()])).boxed()
}
}
const MINIBLOCK_ALIGNMENT: usize = 8;
const MINIBLOCK_MAX_PADDING: usize = MINIBLOCK_ALIGNMENT - 1;
pub struct PrimitiveStructuralEncoder {
accumulation_queue: AccumulationQueue,
accumulated_repdefs: Vec<RepDefBuilder>,
compression_strategy: Arc<dyn CompressionStrategy>,
column_index: u32,
field: Field,
}
impl PrimitiveStructuralEncoder {
pub fn try_new(
options: &EncodingOptions,
compression_strategy: Arc<dyn CompressionStrategy>,
column_index: u32,
field: Field,
) -> Result<Self> {
Ok(Self {
accumulation_queue: AccumulationQueue::new(
options.cache_bytes_per_column,
column_index,
options.keep_original_array,
),
accumulated_repdefs: Vec::new(),
column_index,
compression_strategy,
field,
})
}
fn is_narrow(data_block: &DataBlock) -> bool {
const MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE: u64 = 256;
if let Some(max_len_array) = data_block.get_stat(Stat::MaxLength) {
let max_len_array = max_len_array
.as_any()
.downcast_ref::<PrimitiveArray<UInt64Type>>()
.unwrap();
if max_len_array.value(0) < MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE {
return true;
}
}
false
}
fn serialize_miniblocks(
miniblocks: MiniBlockCompressed,
rep: Vec<LanceBuffer>,
def: Vec<LanceBuffer>,
) -> (LanceBuffer, LanceBuffer) {
let bytes_rep = rep.iter().map(|r| r.len()).sum::<usize>();
let bytes_def = def.iter().map(|d| d.len()).sum::<usize>();
let max_bytes_repdef_len = rep.len() * 4;
let max_padding = miniblocks.chunks.len() * (1 + (2 * MINIBLOCK_MAX_PADDING));
let mut data_buffer = Vec::with_capacity(
miniblocks.data.len() + bytes_rep + bytes_def + max_bytes_repdef_len + max_padding, );
let mut meta_buffer = Vec::with_capacity(miniblocks.data.len() * 2);
let mut value_offset = 0;
for ((chunk, rep), def) in miniblocks.chunks.into_iter().zip(rep).zip(def) {
let start_len = data_buffer.len();
debug_assert_eq!(start_len % MINIBLOCK_ALIGNMENT, 0);
assert!(rep.len() < u16::MAX as usize);
assert!(def.len() < u16::MAX as usize);
let bytes_rep = rep.len() as u16;
let bytes_def = def.len() as u16;
let bytes_val = chunk.num_bytes;
data_buffer.extend_from_slice(&bytes_rep.to_le_bytes());
data_buffer.extend_from_slice(&bytes_def.to_le_bytes());
data_buffer.extend_from_slice(&bytes_val.to_le_bytes());
data_buffer.extend_from_slice(&rep);
debug_assert_eq!(data_buffer.len() % 2, 0);
data_buffer.extend_from_slice(&def);
let p2 = pad_bytes::<MINIBLOCK_ALIGNMENT>(data_buffer.len());
data_buffer.extend(iter::repeat(0).take(p2));
let num_value_bytes = chunk.num_bytes as usize;
let values =
&miniblocks.data[value_offset as usize..value_offset as usize + num_value_bytes];
debug_assert_eq!(data_buffer.len() % MINIBLOCK_ALIGNMENT, 0);
data_buffer.extend_from_slice(values);
let p3 = pad_bytes::<MINIBLOCK_ALIGNMENT>(data_buffer.len());
data_buffer.extend(iter::repeat(0).take(p3));
value_offset += num_value_bytes as u64;
let chunk_bytes = data_buffer.len() - start_len;
assert!(chunk_bytes <= 16 * 1024);
assert!(chunk_bytes > 0);
assert_eq!(chunk_bytes % 8, 0);
let divided_bytes = chunk_bytes / MINIBLOCK_ALIGNMENT;
let divided_bytes_minus_one = (divided_bytes - 1) as u64;
let metadata = ((divided_bytes_minus_one << 4) | chunk.log_num_values as u64) as u16;
meta_buffer.extend_from_slice(&metadata.to_le_bytes());
}
(
LanceBuffer::Owned(data_buffer),
LanceBuffer::Owned(meta_buffer),
)
}
fn compress_levels(
levels: Option<RepDefSlicer<'_>>,
num_values: u64,
compression_strategy: &dyn CompressionStrategy,
chunks: &[MiniBlockChunk],
max_rep: u16,
) -> Result<(Vec<LanceBuffer>, pb::ArrayEncoding, LanceBuffer)> {
if let Some(mut levels) = levels {
let mut rep_index = if max_rep > 0 {
Vec::with_capacity(chunks.len())
} else {
vec![]
};
let num_levels = levels.num_levels() as u64;
let mut levels_buf = levels.all_levels().try_clone().unwrap();
let levels_block = DataBlock::FixedWidth(FixedWidthDataBlock {
data: levels_buf.borrow_and_clone(),
bits_per_value: 16,
num_values: num_levels,
block_info: BlockInfo::new(),
});
let levels_field = Field::new_arrow("", DataType::UInt16, false)?;
let (compressor, compressor_desc) =
compression_strategy.create_block_compressor(&levels_field, &levels_block)?;
let mut buffers = Vec::with_capacity(chunks.len());
let mut values_counter = 0;
for (chunk_idx, chunk) in chunks.iter().enumerate() {
let chunk_num_values = chunk.num_values(values_counter, num_values);
values_counter += chunk_num_values;
let mut chunk_levels = if chunk_idx < chunks.len() - 1 {
levels.slice_next(chunk_num_values as usize)
} else {
levels.slice_rest()
};
let num_chunk_levels = (chunk_levels.len() / 2) as u64;
if max_rep > 0 {
let rep_values = chunk_levels.borrow_to_typed_slice::<u16>();
let rep_values = rep_values.as_ref();
let mut num_rows = rep_values.iter().skip(1).filter(|v| **v == max_rep).count();
let num_leftovers = if chunk_idx < chunks.len() - 1 {
rep_values
.iter()
.rev()
.position(|v| *v == max_rep)
.map(|pos| pos + 1)
.unwrap_or(rep_values.len())
} else {
0
};
if chunk_idx != 0 && rep_values[0] == max_rep {
let rep_len = rep_index.len();
if rep_index[rep_len - 1] != 0 {
rep_index[rep_len - 2] += 1;
rep_index[rep_len - 1] = 0;
}
}
if chunk_idx == chunks.len() - 1 {
num_rows += 1;
}
rep_index.push(num_rows as u64);
rep_index.push(num_leftovers as u64);
}
let chunk_levels_block = DataBlock::FixedWidth(FixedWidthDataBlock {
data: chunk_levels,
bits_per_value: 16,
num_values: num_chunk_levels,
block_info: BlockInfo::new(),
});
let compressed_levels = compressor.compress(chunk_levels_block)?;
buffers.push(compressed_levels);
}
debug_assert_eq!(levels.num_levels_remaining(), 0);
let rep_index = LanceBuffer::reinterpret_vec(rep_index);
Ok((buffers, compressor_desc, rep_index))
} else {
let data = chunks.iter().map(|_| LanceBuffer::empty()).collect();
let scalar = 0_u16.to_le_bytes().to_vec();
let encoding = ProtobufUtils::constant(scalar, num_values);
Ok((data, encoding, LanceBuffer::empty()))
}
}
fn encode_simple_all_null(
column_idx: u32,
num_rows: u64,
row_number: u64,
) -> Result<EncodedPage> {
let description = ProtobufUtils::simple_all_null_layout();
Ok(EncodedPage {
column_idx,
data: vec![],
description: PageEncoding::Structural(description),
num_rows,
row_number,
})
}
fn encode_complex_all_null(
column_idx: u32,
repdefs: Vec<RepDefBuilder>,
row_number: u64,
num_rows: u64,
) -> Result<EncodedPage> {
let repdef = RepDefBuilder::serialize(repdefs);
let rep_bytes = if let Some(rep) = repdef.repetition_levels.as_ref() {
LanceBuffer::reinterpret_slice(rep.clone())
} else {
LanceBuffer::empty()
};
let def_bytes = if let Some(def) = repdef.definition_levels.as_ref() {
LanceBuffer::reinterpret_slice(def.clone())
} else {
LanceBuffer::empty()
};
let description = ProtobufUtils::all_null_layout(&repdef.def_meaning);
Ok(EncodedPage {
column_idx,
data: vec![rep_bytes, def_bytes],
description: PageEncoding::Structural(description),
num_rows,
row_number,
})
}
#[allow(clippy::too_many_arguments)]
fn encode_miniblock(
column_idx: u32,
field: &Field,
compression_strategy: &dyn CompressionStrategy,
data: DataBlock,
repdefs: Vec<RepDefBuilder>,
row_number: u64,
dictionary_data: Option<DataBlock>,
num_rows: u64,
) -> Result<EncodedPage> {
let repdef = RepDefBuilder::serialize(repdefs);
if let DataBlock::AllNull(_null_block) = data {
todo!()
}
let num_items = data.num_values();
let data = data.remove_validity();
let compressor = compression_strategy.create_miniblock_compressor(field, &data)?;
let (compressed_data, value_encoding) = compressor.compress(data)?;
let max_rep = repdef.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
let (compressed_rep, rep_encoding, rep_index) = Self::compress_levels(
repdef.rep_slicer(),
num_items,
compression_strategy,
&compressed_data.chunks,
max_rep,
)?;
let (rep_index, rep_index_depth) = if rep_index.is_empty() {
(None, 0)
} else {
(Some(rep_index), 1)
};
let (compressed_def, def_encoding, _) = Self::compress_levels(
repdef.def_slicer(),
num_items,
compression_strategy,
&compressed_data.chunks,
0,
)?;
let (block_value_buffer, block_meta_buffer) =
Self::serialize_miniblocks(compressed_data, compressed_rep, compressed_def);
let mut data = Vec::with_capacity(4);
data.push(block_meta_buffer);
data.push(block_value_buffer);
if let Some(dictionary_data) = dictionary_data {
let dummy_dictionary_field = Field::new_arrow("", DataType::UInt16, false)?;
let (compressor, dictionary_encoding) = compression_strategy
.create_block_compressor(&dummy_dictionary_field, &dictionary_data)?;
let dictionary_buffer = compressor.compress(dictionary_data)?;
data.push(dictionary_buffer);
if let Some(rep_index) = rep_index {
data.push(rep_index);
}
let description = ProtobufUtils::miniblock_layout(
rep_encoding,
def_encoding,
value_encoding,
rep_index_depth,
Some(dictionary_encoding),
&repdef.def_meaning,
num_items,
);
Ok(EncodedPage {
num_rows,
column_idx,
data,
description: PageEncoding::Structural(description),
row_number,
})
} else {
let description = ProtobufUtils::miniblock_layout(
rep_encoding,
def_encoding,
value_encoding,
rep_index_depth,
None,
&repdef.def_meaning,
num_items,
);
if let Some(mut rep_index) = rep_index {
let view = rep_index.borrow_to_typed_slice::<u64>();
let total = view.chunks_exact(2).map(|c| c[0]).sum::<u64>();
debug_assert_eq!(total, num_rows);
data.push(rep_index);
}
Ok(EncodedPage {
num_rows,
column_idx,
data,
description: PageEncoding::Structural(description),
row_number,
})
}
}
fn serialize_full_zip_fixed(
fixed: FixedWidthDataBlock,
mut repdef: ControlWordIterator,
) -> LanceBuffer {
let len = fixed.data.len() + repdef.bytes_per_word() * fixed.num_values as usize;
let mut buf = Vec::with_capacity(len);
assert_eq!(
fixed.bits_per_value % 8,
0,
"Non-byte aligned full-zip compression not yet supported"
);
let bytes_per_value = fixed.bits_per_value as usize / 8;
for value in fixed.data.chunks_exact(bytes_per_value) {
repdef.append_next(&mut buf);
buf.extend_from_slice(value);
}
LanceBuffer::Owned(buf)
}
fn serialize_full_zip_variable(
mut variable: VariableWidthBlock,
mut repdef: ControlWordIterator,
) -> LanceBuffer {
let bytes_per_offset = variable.bits_per_offset as usize / 8;
assert_eq!(
variable.bits_per_offset % 8,
0,
"Only byte-aligned offsets supported"
);
let len = variable.data.len()
+ repdef.bytes_per_word() * variable.num_values as usize
+ bytes_per_offset * variable.num_values as usize;
let mut buf = Vec::with_capacity(len);
match bytes_per_offset {
4 => {
let offs = variable.offsets.borrow_to_typed_slice::<u32>();
for offsets in offs.as_ref().windows(2) {
repdef.append_next(&mut buf);
buf.extend_from_slice(&(offsets[1] - offsets[0]).to_le_bytes());
buf.extend_from_slice(&variable.data[offsets[0] as usize..offsets[1] as usize]);
}
}
8 => {
let offs = variable.offsets.borrow_to_typed_slice::<u64>();
for offsets in offs.as_ref().windows(2) {
repdef.append_next(&mut buf);
buf.extend_from_slice(&(offsets[1] - offsets[0]).to_le_bytes());
buf.extend_from_slice(&variable.data[offsets[0] as usize..offsets[1] as usize]);
}
}
_ => panic!("Unsupported offset size"),
}
LanceBuffer::Owned(buf)
}
fn serialize_full_zip(
compressed_data: PerValueDataBlock,
repdef: ControlWordIterator,
) -> LanceBuffer {
match compressed_data {
PerValueDataBlock::Fixed(fixed) => Self::serialize_full_zip_fixed(fixed, repdef),
PerValueDataBlock::Variable(var) => Self::serialize_full_zip_variable(var, repdef),
}
}
fn encode_full_zip(
column_idx: u32,
field: &Field,
compression_strategy: &dyn CompressionStrategy,
data: DataBlock,
repdefs: Vec<RepDefBuilder>,
row_number: u64,
) -> Result<EncodedPage> {
let repdef = RepDefBuilder::serialize(repdefs);
let max_rep = repdef
.repetition_levels
.as_ref()
.map_or(0, |r| r.iter().max().copied().unwrap_or(0));
let max_def = repdef
.definition_levels
.as_ref()
.map_or(0, |d| d.iter().max().copied().unwrap_or(0));
let repdef_iter = build_control_word_iterator(
repdef.repetition_levels.as_deref(),
max_rep,
repdef.definition_levels.as_deref(),
max_def,
);
let bits_rep = repdef_iter.bits_rep();
let bits_def = repdef_iter.bits_def();
let num_values = data.num_values();
let data = data.remove_validity();
let compressor = compression_strategy.create_per_value(field, &data)?;
let (compressed_data, value_encoding) = compressor.compress(data)?;
let zipped = Self::serialize_full_zip(compressed_data, repdef_iter);
let description =
ProtobufUtils::full_zip_layout(bits_rep, bits_def, value_encoding, &repdef.def_meaning);
Ok(EncodedPage {
num_rows: num_values,
column_idx,
data: vec![zipped],
description: PageEncoding::Structural(description),
row_number,
})
}
fn dictionary_encode(mut data_block: DataBlock, cardinality: u64) -> (DataBlock, DataBlock) {
match data_block {
DataBlock::FixedWidth(ref mut fixed_width_data_block) => {
let mut map = HashMap::new();
let u128_slice = fixed_width_data_block.data.borrow_to_typed_slice::<u128>();
let u128_slice = u128_slice.as_ref();
let mut dictionary_buffer = Vec::with_capacity(cardinality as usize);
let mut indices_buffer =
Vec::with_capacity(fixed_width_data_block.num_values as usize);
let mut curr_idx: u8 = 0;
u128_slice.iter().for_each(|&value| {
let idx = *map.entry(value).or_insert_with(|| {
dictionary_buffer.push(value);
curr_idx += 1;
curr_idx - 1
});
indices_buffer.push(idx);
});
let dictionary_data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
data: LanceBuffer::reinterpret_vec(dictionary_buffer),
bits_per_value: 128,
num_values: curr_idx as u64,
block_info: BlockInfo::default(),
});
let mut indices_data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
data: LanceBuffer::reinterpret_vec(indices_buffer),
bits_per_value: 8,
num_values: fixed_width_data_block.num_values,
block_info: BlockInfo::default(),
});
indices_data_block.compute_stat();
(indices_data_block, dictionary_data_block)
}
DataBlock::VariableWidth(ref mut variable_width_data_block) => {
match variable_width_data_block.bits_per_offset {
32 => {
let mut map: HashMap<U8SliceKey, u8> = HashMap::new();
let offsets = variable_width_data_block
.offsets
.borrow_to_typed_slice::<u32>();
let offsets = offsets.as_ref();
let max_len = variable_width_data_block.get_stat(Stat::MaxLength).expect(
"VariableWidth DataBlock should have valid `Stat::DataSize` statistics",
);
let max_len = max_len.as_primitive::<UInt64Type>().value(0);
let mut dictionary_buffer: Vec<u8> =
Vec::with_capacity((max_len * cardinality) as usize);
let mut dictionary_offsets_buffer = vec![0];
let mut curr_idx = 0;
let mut indices_buffer =
Vec::with_capacity(variable_width_data_block.num_values as usize);
offsets
.iter()
.zip(offsets.iter().skip(1))
.for_each(|(&start, &end)| {
let key =
&variable_width_data_block.data[start as usize..end as usize];
let idx = *map.entry(U8SliceKey(key)).or_insert_with(|| {
dictionary_buffer.extend_from_slice(key);
dictionary_offsets_buffer.push(dictionary_buffer.len() as u32);
curr_idx += 1;
curr_idx - 1
});
indices_buffer.push(idx);
});
let dictionary_data_block = DataBlock::VariableWidth(VariableWidthBlock {
data: LanceBuffer::reinterpret_vec(dictionary_buffer),
offsets: LanceBuffer::reinterpret_vec(dictionary_offsets_buffer),
bits_per_offset: 32,
num_values: curr_idx as u64,
block_info: BlockInfo::default(),
});
let mut indices_data_block = DataBlock::FixedWidth(FixedWidthDataBlock {
data: LanceBuffer::Owned(indices_buffer),
bits_per_value: 8,
num_values: variable_width_data_block.num_values,
block_info: BlockInfo::default(),
});
indices_data_block.compute_stat();
(indices_data_block, dictionary_data_block)
}
64 => {
todo!("A follow up PR to support dictionary encoding with dictionary type `VariableWidth DataBlock` with bits_per_offset 64");
}
_ => {
unreachable!()
}
}
}
_ => {
unreachable!()
}
}
}
fn do_flush(
&mut self,
arrays: Vec<ArrayRef>,
repdefs: Vec<RepDefBuilder>,
row_number: u64,
num_rows: u64,
) -> Result<Vec<EncodeTask>> {
let column_idx = self.column_index;
let compression_strategy = self.compression_strategy.clone();
let field = self.field.clone();
let task = spawn_cpu(move || {
let num_values = arrays.iter().map(|arr| arr.len() as u64).sum();
if num_values == 0 {
return Self::encode_complex_all_null(column_idx, repdefs, row_number, num_rows);
}
let num_nulls = arrays
.iter()
.map(|arr| arr.logical_nulls().map(|n| n.null_count()).unwrap_or(0) as u64)
.sum::<u64>();
if num_values == num_nulls && repdefs.iter().all(|rd| rd.is_simple_validity()) {
log::debug!(
"Encoding column {} with {} items using simple-null layout",
column_idx,
num_values
);
Self::encode_simple_all_null(column_idx, num_values, row_number)
} else {
let data_block = DataBlock::from_arrays(&arrays, num_values);
if let DataBlock::Struct(ref struct_data_block) = data_block {
if struct_data_block
.children
.iter()
.any(|child| !matches!(child, DataBlock::FixedWidth(_)))
{
panic!("packed struct encoding currently only supports fixed-width fields.")
}
}
const DICTIONARY_ENCODING_THRESHOLD: u64 = 100;
let cardinality =
if let Some(cardinality_array) = data_block.get_stat(Stat::Cardinality) {
cardinality_array.as_primitive::<UInt64Type>().value(0)
} else {
u64::MAX
};
if cardinality <= DICTIONARY_ENCODING_THRESHOLD
&& data_block.num_values() >= 10 * cardinality
{
let (indices_data_block, dictionary_data_block) =
Self::dictionary_encode(data_block, cardinality);
Self::encode_miniblock(
column_idx,
&field,
compression_strategy.as_ref(),
indices_data_block,
repdefs,
row_number,
Some(dictionary_data_block),
num_rows,
)
} else if Self::is_narrow(&data_block) {
log::debug!(
"Encoding column {} with {} items using mini-block layout",
column_idx,
num_values
);
Self::encode_miniblock(
column_idx,
&field,
compression_strategy.as_ref(),
data_block,
repdefs,
row_number,
None,
num_rows,
)
} else {
log::debug!(
"Encoding column {} with {} items using full-zip layout",
column_idx,
num_values
);
Self::encode_full_zip(
column_idx,
&field,
compression_strategy.as_ref(),
data_block,
repdefs,
row_number,
)
}
}
})
.boxed();
Ok(vec![task])
}
fn extract_validity_buf(array: &dyn Array, repdef: &mut RepDefBuilder) {
if let Some(validity) = array.nulls() {
repdef.add_validity_bitmap(validity.clone());
} else {
repdef.add_no_null(array.len());
}
}
fn extract_validity(array: &dyn Array, repdef: &mut RepDefBuilder) {
match array.data_type() {
DataType::Null => {
repdef.add_validity_bitmap(NullBuffer::new(BooleanBuffer::new_unset(array.len())));
}
DataType::Dictionary(_, _) => {
unreachable!()
}
_ => Self::extract_validity_buf(array, repdef),
}
}
}
impl FieldEncoder for PrimitiveStructuralEncoder {
fn maybe_encode(
&mut self,
array: ArrayRef,
_external_buffers: &mut OutOfLineBuffers,
mut repdef: RepDefBuilder,
row_number: u64,
num_rows: u64,
) -> Result<Vec<EncodeTask>> {
Self::extract_validity(array.as_ref(), &mut repdef);
self.accumulated_repdefs.push(repdef);
if let Some((arrays, row_number, num_rows)) =
self.accumulation_queue.insert(array, row_number, num_rows)
{
let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
} else {
Ok(vec![])
}
}
fn flush(&mut self, _external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
if let Some((arrays, row_number, num_rows)) = self.accumulation_queue.flush() {
let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
} else {
Ok(vec![])
}
}
fn num_columns(&self) -> u32 {
1
}
fn finish(
&mut self,
_external_buffers: &mut OutOfLineBuffers,
) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
std::future::ready(Ok(vec![EncodedColumn::default()])).boxed()
}
}
#[cfg(test)]
#[allow(clippy::single_range_in_vec_init)]
mod tests {
use std::{collections::VecDeque, sync::Arc};
use arrow_array::{ArrayRef, Int8Array, StringArray};
use crate::encodings::logical::primitive::{
ChunkDrainInstructions, PrimitiveStructuralEncoder,
};
use super::{ChunkInstructions, DataBlock, DecodeMiniBlockTask, PreambleAction};
#[test]
fn test_is_narrow() {
let int8_array = Int8Array::from(vec![1, 2, 3]);
let array_ref: ArrayRef = Arc::new(int8_array);
let block = DataBlock::from_array(array_ref);
assert!(PrimitiveStructuralEncoder::is_narrow(&block));
let string_array = StringArray::from(vec![Some("hello"), Some("world")]);
let block = DataBlock::from_array(string_array);
assert!(PrimitiveStructuralEncoder::is_narrow(&block));
let string_array = StringArray::from(vec![
Some("hello world".repeat(100)),
Some("world".to_string()),
]);
let block = DataBlock::from_array(string_array);
assert!((!PrimitiveStructuralEncoder::is_narrow(&block)));
}
#[test]
fn test_map_range() {
let rep = Some(vec![1, 0, 0, 1, 0, 1, 1, 0, 0]);
let def = Some(vec![0, 0, 0, 0, 0, 1, 0, 0, 0]);
let max_visible_def = 0;
let total_items = 8;
let max_rep = 1;
let check = |range, expected_item_range, expected_level_range| {
let (item_range, level_range) = DecodeMiniBlockTask::map_range(
range,
rep.as_ref(),
def.as_ref(),
max_rep,
max_visible_def,
total_items,
PreambleAction::Absent,
);
assert_eq!(item_range, expected_item_range);
assert_eq!(level_range, expected_level_range);
};
check(0..1, 0..3, 0..3);
check(1..2, 3..5, 3..5);
check(2..3, 5..5, 5..6);
check(3..4, 5..8, 6..9);
check(0..2, 0..5, 0..5);
check(1..3, 3..5, 3..6);
check(2..4, 5..8, 5..9);
check(0..3, 0..5, 0..6);
check(1..4, 3..8, 3..9);
check(0..4, 0..8, 0..9);
let rep = Some(vec![1, 1, 0, 1]);
let def = Some(vec![1, 0, 0, 0]);
let max_visible_def = 0;
let total_items = 3;
let check = |range, expected_item_range, expected_level_range| {
let (item_range, level_range) = DecodeMiniBlockTask::map_range(
range,
rep.as_ref(),
def.as_ref(),
max_rep,
max_visible_def,
total_items,
PreambleAction::Absent,
);
assert_eq!(item_range, expected_item_range);
assert_eq!(level_range, expected_level_range);
};
check(0..1, 0..0, 0..1);
check(1..2, 0..2, 1..3);
check(2..3, 2..3, 3..4);
check(0..2, 0..2, 0..3);
check(1..3, 0..3, 1..4);
check(0..3, 0..3, 0..4);
let rep = Some(vec![1, 1, 0, 1]);
let def = Some(vec![0, 0, 0, 1]);
let max_visible_def = 0;
let total_items = 3;
let check = |range, expected_item_range, expected_level_range| {
let (item_range, level_range) = DecodeMiniBlockTask::map_range(
range,
rep.as_ref(),
def.as_ref(),
max_rep,
max_visible_def,
total_items,
PreambleAction::Absent,
);
assert_eq!(item_range, expected_item_range);
assert_eq!(level_range, expected_level_range);
};
check(0..1, 0..1, 0..1);
check(1..2, 1..3, 1..3);
check(2..3, 3..3, 3..4);
check(0..2, 0..3, 0..3);
check(1..3, 1..3, 1..4);
check(0..3, 0..3, 0..4);
let rep = Some(vec![1, 0, 1, 0, 1, 0]);
let def: Option<&[u16]> = None;
let max_visible_def = 0;
let total_items = 6;
let check = |range, expected_item_range, expected_level_range| {
let (item_range, level_range) = DecodeMiniBlockTask::map_range(
range,
rep.as_ref(),
def.as_ref(),
max_rep,
max_visible_def,
total_items,
PreambleAction::Absent,
);
assert_eq!(item_range, expected_item_range);
assert_eq!(level_range, expected_level_range);
};
check(0..1, 0..2, 0..2);
check(1..2, 2..4, 2..4);
check(2..3, 4..6, 4..6);
check(0..2, 0..4, 0..4);
check(1..3, 2..6, 2..6);
check(0..3, 0..6, 0..6);
let rep: Option<&[u16]> = None;
let def = Some(vec![0, 0, 1, 0]);
let max_visible_def = 1;
let total_items = 4;
let check = |range, expected_item_range, expected_level_range| {
let (item_range, level_range) = DecodeMiniBlockTask::map_range(
range,
rep.as_ref(),
def.as_ref(),
max_rep,
max_visible_def,
total_items,
PreambleAction::Absent,
);
assert_eq!(item_range, expected_item_range);
assert_eq!(level_range, expected_level_range);
};
check(0..1, 0..1, 0..1);
check(1..2, 1..2, 1..2);
check(2..3, 2..3, 2..3);
check(0..2, 0..2, 0..2);
check(1..3, 1..3, 1..3);
check(0..3, 0..3, 0..3);
let rep = Some(vec![0, 1, 0, 1]);
let def = Some(vec![0, 0, 0, 1]);
let max_visible_def = 0;
let total_items = 3;
let check = |range, expected_item_range, expected_level_range| {
let (item_range, level_range) = DecodeMiniBlockTask::map_range(
range,
rep.as_ref(),
def.as_ref(),
max_rep,
max_visible_def,
total_items,
PreambleAction::Take,
);
assert_eq!(item_range, expected_item_range);
assert_eq!(level_range, expected_level_range);
};
check(0..1, 0..3, 0..3);
check(0..2, 0..3, 0..4);
let check = |range, expected_item_range, expected_level_range| {
let (item_range, level_range) = DecodeMiniBlockTask::map_range(
range,
rep.as_ref(),
def.as_ref(),
max_rep,
max_visible_def,
total_items,
PreambleAction::Skip,
);
assert_eq!(item_range, expected_item_range);
assert_eq!(level_range, expected_level_range);
};
check(0..1, 1..3, 1..3);
check(1..2, 3..3, 3..4);
check(0..2, 1..3, 1..4);
let rep = Some(vec![0, 1, 1, 0]);
let def = Some(vec![0, 1, 0, 0]);
let max_visible_def = 0;
let total_items = 4;
let check = |range, expected_item_range, expected_level_range| {
let (item_range, level_range) = DecodeMiniBlockTask::map_range(
range,
rep.as_ref(),
def.as_ref(),
max_rep,
max_visible_def,
total_items,
PreambleAction::Take,
);
assert_eq!(item_range, expected_item_range);
assert_eq!(level_range, expected_level_range);
};
check(0..1, 0..1, 0..2);
check(0..2, 0..3, 0..4);
let check = |range, expected_item_range, expected_level_range| {
let (item_range, level_range) = DecodeMiniBlockTask::map_range(
range,
rep.as_ref(),
def.as_ref(),
max_rep,
max_visible_def,
total_items,
PreambleAction::Skip,
);
assert_eq!(item_range, expected_item_range);
assert_eq!(level_range, expected_level_range);
};
check(0..1, 1..1, 1..2);
check(1..2, 1..3, 2..4);
check(0..2, 1..3, 1..4);
let rep = Some(vec![0, 1, 0, 1]);
let def: Option<Vec<u16>> = None;
let max_visible_def = 0;
let total_items = 4;
let check = |range, expected_item_range, expected_level_range| {
let (item_range, level_range) = DecodeMiniBlockTask::map_range(
range,
rep.as_ref(),
def.as_ref(),
max_rep,
max_visible_def,
total_items,
PreambleAction::Take,
);
assert_eq!(item_range, expected_item_range);
assert_eq!(level_range, expected_level_range);
};
check(0..1, 0..3, 0..3);
check(0..2, 0..4, 0..4);
let check = |range, expected_item_range, expected_level_range| {
let (item_range, level_range) = DecodeMiniBlockTask::map_range(
range,
rep.as_ref(),
def.as_ref(),
max_rep,
max_visible_def,
total_items,
PreambleAction::Skip,
);
assert_eq!(item_range, expected_item_range);
assert_eq!(level_range, expected_level_range);
};
check(0..1, 1..3, 1..3);
check(1..2, 3..4, 3..4);
check(0..2, 1..4, 1..4);
}
#[test]
fn test_schedule_instructions() {
let repetition_index = vec![vec![5, 2], vec![3, 0], vec![4, 7], vec![2, 0]];
let check = |user_ranges, expected_instructions| {
let instructions =
ChunkInstructions::schedule_instructions(&repetition_index, user_ranges);
assert_eq!(instructions, expected_instructions);
};
let expected_take_all = vec![
ChunkInstructions {
chunk_idx: 0,
preamble: PreambleAction::Absent,
rows_to_skip: 0,
rows_to_take: 5,
take_trailer: true,
},
ChunkInstructions {
chunk_idx: 1,
preamble: PreambleAction::Take,
rows_to_skip: 0,
rows_to_take: 2,
take_trailer: false,
},
ChunkInstructions {
chunk_idx: 2,
preamble: PreambleAction::Absent,
rows_to_skip: 0,
rows_to_take: 4,
take_trailer: true,
},
ChunkInstructions {
chunk_idx: 3,
preamble: PreambleAction::Take,
rows_to_skip: 0,
rows_to_take: 1,
take_trailer: false,
},
];
check(&[0..14], expected_take_all.clone());
check(
&[
0..1,
1..2,
2..3,
3..4,
4..5,
5..6,
6..7,
7..8,
8..9,
9..10,
10..11,
11..12,
12..13,
13..14,
],
expected_take_all,
);
check(
&[0..1, 3..4],
vec![
ChunkInstructions {
chunk_idx: 0,
preamble: PreambleAction::Absent,
rows_to_skip: 0,
rows_to_take: 1,
take_trailer: false,
},
ChunkInstructions {
chunk_idx: 0,
preamble: PreambleAction::Absent,
rows_to_skip: 3,
rows_to_take: 1,
take_trailer: false,
},
],
);
check(
&[5..6],
vec![
ChunkInstructions {
chunk_idx: 0,
preamble: PreambleAction::Absent,
rows_to_skip: 5,
rows_to_take: 0,
take_trailer: true,
},
ChunkInstructions {
chunk_idx: 1,
preamble: PreambleAction::Take,
rows_to_skip: 0,
rows_to_take: 0,
take_trailer: false,
},
],
);
check(
&[7..10],
vec![
ChunkInstructions {
chunk_idx: 1,
preamble: PreambleAction::Skip,
rows_to_skip: 1,
rows_to_take: 1,
take_trailer: false,
},
ChunkInstructions {
chunk_idx: 2,
preamble: PreambleAction::Absent,
rows_to_skip: 0,
rows_to_take: 2,
take_trailer: false,
},
],
);
}
#[test]
fn test_drain_instructions() {
fn drain_from_instructions(
instructions: &mut VecDeque<ChunkInstructions>,
mut rows_desired: u64,
need_preamble: &mut bool,
skip_in_chunk: &mut u64,
) -> Vec<ChunkDrainInstructions> {
let mut drain_instructions = Vec::with_capacity(instructions.len());
while rows_desired > 0 || *need_preamble {
let (next_instructions, consumed_chunk) = instructions
.front()
.unwrap()
.drain_from_instruction(&mut rows_desired, need_preamble, skip_in_chunk);
if consumed_chunk {
instructions.pop_front();
}
drain_instructions.push(next_instructions);
}
drain_instructions
}
let repetition_index = vec![vec![5, 2], vec![3, 0], vec![4, 7], vec![2, 0]];
let user_ranges = vec![1..7, 10..14];
let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
let mut to_drain = VecDeque::from(scheduled.clone());
let mut need_preamble = false;
let mut skip_in_chunk = 0;
let next_batch =
drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
assert!(!need_preamble);
assert_eq!(skip_in_chunk, 4);
assert_eq!(
next_batch,
vec![ChunkDrainInstructions {
chunk_instructions: scheduled[0].clone(),
rows_to_take: 4,
rows_to_skip: 0,
preamble_action: PreambleAction::Absent,
}]
);
let next_batch =
drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
assert!(!need_preamble);
assert_eq!(skip_in_chunk, 2);
assert_eq!(
next_batch,
vec![
ChunkDrainInstructions {
chunk_instructions: scheduled[0].clone(),
rows_to_take: 1,
rows_to_skip: 4,
preamble_action: PreambleAction::Absent,
},
ChunkDrainInstructions {
chunk_instructions: scheduled[1].clone(),
rows_to_take: 1,
rows_to_skip: 0,
preamble_action: PreambleAction::Take,
},
ChunkDrainInstructions {
chunk_instructions: scheduled[2].clone(),
rows_to_take: 2,
rows_to_skip: 0,
preamble_action: PreambleAction::Absent,
}
]
);
let next_batch =
drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
assert!(!need_preamble);
assert_eq!(skip_in_chunk, 0);
assert_eq!(
next_batch,
vec![
ChunkDrainInstructions {
chunk_instructions: scheduled[2].clone(),
rows_to_take: 1,
rows_to_skip: 2,
preamble_action: PreambleAction::Absent,
},
ChunkDrainInstructions {
chunk_instructions: scheduled[3].clone(),
rows_to_take: 1,
rows_to_skip: 0,
preamble_action: PreambleAction::Take,
},
]
);
let repetition_index = vec![vec![5, 2], vec![3, 3], vec![20, 0]];
let user_ranges = vec![0..28];
let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
let mut to_drain = VecDeque::from(scheduled.clone());
let mut need_preamble = false;
let mut skip_in_chunk = 0;
let next_batch =
drain_from_instructions(&mut to_drain, 7, &mut need_preamble, &mut skip_in_chunk);
assert_eq!(
next_batch,
vec![
ChunkDrainInstructions {
chunk_instructions: scheduled[0].clone(),
rows_to_take: 6,
rows_to_skip: 0,
preamble_action: PreambleAction::Absent,
},
ChunkDrainInstructions {
chunk_instructions: scheduled[1].clone(),
rows_to_take: 1,
rows_to_skip: 0,
preamble_action: PreambleAction::Take,
},
]
);
assert!(!need_preamble);
assert_eq!(skip_in_chunk, 1);
let next_batch =
drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
assert_eq!(
next_batch,
vec![
ChunkDrainInstructions {
chunk_instructions: scheduled[1].clone(),
rows_to_take: 2,
rows_to_skip: 1,
preamble_action: PreambleAction::Skip,
},
ChunkDrainInstructions {
chunk_instructions: scheduled[2].clone(),
rows_to_take: 0,
rows_to_skip: 0,
preamble_action: PreambleAction::Take,
},
]
);
assert!(!need_preamble);
assert_eq!(skip_in_chunk, 0);
}
}