use std::{
any::Any,
collections::{HashMap, VecDeque},
env,
fmt::Debug,
iter,
ops::Range,
sync::Arc,
vec,
};
use crate::{
constants::{
STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY, STRUCTURAL_ENCODING_MINIBLOCK,
},
data::DictionaryDataBlock,
encodings::logical::primitive::blob::{BlobDescriptionPageScheduler, BlobPageScheduler},
format::{
ProtobufUtils21,
pb21::{self, CompressiveEncoding, PageLayout, compressive_encoding::Compression},
},
};
use arrow_array::{Array, ArrayRef, PrimitiveArray, cast::AsArray, make_array, types::UInt64Type};
use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, NullBuffer, ScalarBuffer};
use arrow_schema::{DataType, Field as ArrowField};
use bytes::Bytes;
use futures::{FutureExt, TryStreamExt, future::BoxFuture, stream::FuturesOrdered};
use itertools::Itertools;
use lance_arrow::DataTypeExt;
use lance_arrow::deepcopy::deep_copy_nulls;
use lance_core::{
cache::{CacheKey, Context, DeepSizeOf},
error::{Error, LanceOptionExt},
utils::bit::pad_bytes,
};
use log::trace;
use crate::{
compression::{
BlockDecompressor, CompressionStrategy, DecompressionStrategy, MiniBlockDecompressor,
},
data::{AllNullDataBlock, DataBlock, VariableWidthBlock},
utils::bytepack::BytepackedIntegerEncoder,
};
use crate::{
compression::{FixedPerValueDecompressor, VariablePerValueDecompressor},
encodings::logical::primitive::fullzip::PerValueDataBlock,
};
use crate::{
encodings::logical::primitive::miniblock::MiniBlockChunk, utils::bytepack::ByteUnpacker,
};
use crate::{
encodings::logical::primitive::miniblock::MiniBlockCompressed,
statistics::{ComputeStat, GetStat, Stat},
};
use crate::{
repdef::{
CompositeRepDefUnraveler, ControlWordIterator, ControlWordParser, DefinitionInterpretation,
RepDefSlicer, build_control_word_iterator,
},
utils::accumulation::AccumulationQueue,
};
use lance_core::{Result, datatypes::Field, utils::tokio::spawn_cpu};
use crate::constants::{
COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, DICT_DIVISOR_META_KEY,
DICT_SIZE_RATIO_META_KEY, DICT_VALUES_COMPRESSION_ENV_VAR,
DICT_VALUES_COMPRESSION_LEVEL_ENV_VAR, DICT_VALUES_COMPRESSION_LEVEL_META_KEY,
DICT_VALUES_COMPRESSION_META_KEY,
};
use crate::version::LanceFileVersion;
use crate::{
EncodingsIo,
buffer::LanceBuffer,
data::{BlockInfo, DataBlockBuilder, FixedWidthDataBlock},
decoder::{
ColumnInfo, DecodePageTask, DecodedArray, DecodedPage, FilterExpression, LoadedPageShard,
MessageType, PageEncoding, PageInfo, ScheduledScanLine, SchedulerContext,
StructuralDecodeArrayTask, StructuralFieldDecoder, StructuralFieldScheduler,
StructuralPageDecoder, StructuralSchedulingJob, UnloadedPageShard,
},
encoder::{
EncodeTask, EncodedColumn, EncodedPage, EncodingOptions, FieldEncoder, OutOfLineBuffers,
},
repdef::{LevelBuffer, RepDefBuilder, RepDefUnraveler},
};
pub mod blob;
pub mod constant;
pub mod dict;
pub mod fullzip;
pub mod miniblock;
const FILL_BYTE: u8 = 0xFE;
const DEFAULT_DICT_DIVISOR: u64 = 2;
const DEFAULT_DICT_MAX_CARDINALITY: u64 = 100_000;
const DEFAULT_DICT_SIZE_RATIO: f64 = 0.8;
const DEFAULT_DICT_VALUES_COMPRESSION: &str = "lz4";
struct PageLoadTask {
decoder_fut: BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>,
num_rows: u64,
}
trait StructuralPageScheduler: std::fmt::Debug + Send {
fn initialize<'a>(
&'a mut self,
io: &Arc<dyn EncodingsIo>,
) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>>;
fn load(&mut self, data: &Arc<dyn CachedPageData>);
fn schedule_ranges(
&self,
ranges: &[Range<u64>],
io: &Arc<dyn EncodingsIo>,
) -> Result<Vec<PageLoadTask>>;
}
#[derive(Debug)]
struct ChunkMeta {
num_values: u64,
chunk_size_bytes: u64,
offset_bytes: u64,
}
#[derive(Debug, Clone)]
struct DecodedMiniBlockChunk {
rep: Option<ScalarBuffer<u16>>,
def: Option<ScalarBuffer<u16>>,
values: DataBlock,
}
#[derive(Debug)]
struct DecodeMiniBlockTask {
rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
def_decompressor: Option<Arc<dyn BlockDecompressor>>,
value_decompressor: Arc<dyn MiniBlockDecompressor>,
dictionary_data: Option<Arc<DataBlock>>,
def_meaning: Arc<[DefinitionInterpretation]>,
num_buffers: u64,
max_visible_level: u16,
instructions: Vec<(ChunkDrainInstructions, LoadedChunk)>,
has_large_chunk: bool,
}
impl DecodeMiniBlockTask {
fn decode_levels(
rep_decompressor: &dyn BlockDecompressor,
levels: LanceBuffer,
num_levels: u16,
) -> Result<ScalarBuffer<u16>> {
let rep = rep_decompressor.decompress(levels, num_levels as u64)?;
let rep = rep.as_fixed_width().unwrap();
debug_assert_eq!(rep.num_values, num_levels as u64);
debug_assert_eq!(rep.bits_per_value, 16);
Ok(rep.data.borrow_to_typed_slice::<u16>())
}
fn extend_levels(
range: Range<u64>,
levels: &mut Option<LevelBuffer>,
level_buf: &Option<impl AsRef<[u16]>>,
dest_offset: usize,
) {
if let Some(level_buf) = level_buf {
if levels.is_none() {
let mut new_levels_vec =
LevelBuffer::with_capacity(dest_offset + (range.end - range.start) as usize);
new_levels_vec.extend(iter::repeat_n(0, dest_offset));
*levels = Some(new_levels_vec);
}
levels.as_mut().unwrap().extend(
level_buf.as_ref()[range.start as usize..range.end as usize]
.iter()
.copied(),
);
} else if let Some(levels) = levels {
let num_values = (range.end - range.start) as usize;
levels.extend(iter::repeat_n(0, num_values));
}
}
fn map_range(
range: Range<u64>,
rep: Option<&impl AsRef<[u16]>>,
def: Option<&impl AsRef<[u16]>>,
max_rep: u16,
max_visible_def: u16,
total_items: u64,
preamble_action: PreambleAction,
) -> (Range<u64>, Range<u64>) {
if let Some(rep) = rep {
let mut rep = rep.as_ref();
let mut items_in_preamble = 0_u64;
let first_row_start = match preamble_action {
PreambleAction::Skip | PreambleAction::Take => {
let first_row_start = if let Some(def) = def.as_ref() {
let mut first_row_start = None;
for (idx, (rep, def)) in rep.iter().zip(def.as_ref()).enumerate() {
if *rep == max_rep {
first_row_start = Some(idx as u64);
break;
}
if *def <= max_visible_def {
items_in_preamble += 1;
}
}
first_row_start
} else {
let first_row_start =
rep.iter().position(|&r| r == max_rep).map(|r| r as u64);
items_in_preamble = first_row_start.unwrap_or(rep.len() as u64);
first_row_start
};
if first_row_start.is_none() {
assert!(preamble_action == PreambleAction::Take);
return (0..total_items, 0..rep.len() as u64);
}
let first_row_start = first_row_start.unwrap();
rep = &rep[first_row_start as usize..];
first_row_start
}
PreambleAction::Absent => {
debug_assert!(rep[0] == max_rep);
0
}
};
if range.start == range.end {
debug_assert!(preamble_action == PreambleAction::Take);
debug_assert!(items_in_preamble <= total_items);
return (0..items_in_preamble, 0..first_row_start);
}
assert!(range.start < range.end);
let mut rows_seen = 0;
let mut new_start = 0;
let mut new_levels_start = 0;
if let Some(def) = def {
let def = &def.as_ref()[first_row_start as usize..];
let mut lead_invis_seen = 0;
if range.start > 0 {
if def[0] > max_visible_def {
lead_invis_seen += 1;
}
for (idx, (rep, def)) in rep.iter().zip(def).skip(1).enumerate() {
if *rep == max_rep {
rows_seen += 1;
if rows_seen == range.start {
new_start = idx as u64 + 1 - lead_invis_seen;
new_levels_start = idx as u64 + 1;
break;
}
}
if *def > max_visible_def {
lead_invis_seen += 1;
}
}
}
rows_seen += 1;
let mut new_end = u64::MAX;
let mut new_levels_end = rep.len() as u64;
let new_start_is_visible = def[new_levels_start as usize] <= max_visible_def;
let mut tail_invis_seen = if new_start_is_visible { 0 } else { 1 };
for (idx, (rep, def)) in rep[(new_levels_start + 1) as usize..]
.iter()
.zip(&def[(new_levels_start + 1) as usize..])
.enumerate()
{
if *rep == max_rep {
rows_seen += 1;
if rows_seen == range.end + 1 {
new_end = idx as u64 + new_start + 1 - tail_invis_seen;
new_levels_end = idx as u64 + new_levels_start + 1;
break;
}
}
if *def > max_visible_def {
tail_invis_seen += 1;
}
}
if new_end == u64::MAX {
new_levels_end = rep.len() as u64;
let total_invis_seen = lead_invis_seen + tail_invis_seen;
new_end = rep.len() as u64 - total_invis_seen;
}
assert_ne!(new_end, u64::MAX);
if preamble_action == PreambleAction::Skip {
new_start += items_in_preamble;
new_end += items_in_preamble;
new_levels_start += first_row_start;
new_levels_end += first_row_start;
} else if preamble_action == PreambleAction::Take {
debug_assert_eq!(new_start, 0);
debug_assert_eq!(new_levels_start, 0);
new_end += items_in_preamble;
new_levels_end += first_row_start;
}
debug_assert!(new_end <= total_items);
(new_start..new_end, new_levels_start..new_levels_end)
} else {
if range.start > 0 {
for (idx, rep) in rep.iter().skip(1).enumerate() {
if *rep == max_rep {
rows_seen += 1;
if rows_seen == range.start {
new_start = idx as u64 + 1;
break;
}
}
}
}
let mut new_end = rep.len() as u64;
if range.end < total_items {
for (idx, rep) in rep[(new_start + 1) as usize..].iter().enumerate() {
if *rep == max_rep {
rows_seen += 1;
if rows_seen == range.end {
new_end = idx as u64 + new_start + 1;
break;
}
}
}
}
if preamble_action == PreambleAction::Skip {
new_start += first_row_start;
new_end += first_row_start;
} else if preamble_action == PreambleAction::Take {
debug_assert_eq!(new_start, 0);
new_end += first_row_start;
}
debug_assert!(new_end <= total_items);
(new_start..new_end, new_start..new_end)
}
} else {
(range.clone(), range)
}
}
fn read_buffer_sizes<const LARGE: bool>(
buf: &[u8],
offset: &mut usize,
num_buffers: u64,
) -> Vec<u32> {
let read_size = if LARGE { 4 } else { 2 };
(0..num_buffers)
.map(|_| {
let bytes = &buf[*offset..*offset + read_size];
let size = if LARGE {
u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]])
} else {
u16::from_le_bytes([bytes[0], bytes[1]]) as u32
};
*offset += read_size;
size
})
.collect()
}
fn decode_miniblock_chunk(
&self,
buf: &LanceBuffer,
items_in_chunk: u64,
) -> Result<DecodedMiniBlockChunk> {
let mut offset = 0;
let num_levels = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
offset += 2;
let rep_size = if self.rep_decompressor.is_some() {
let rep_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
offset += 2;
Some(rep_size)
} else {
None
};
let def_size = if self.def_decompressor.is_some() {
let def_size = u16::from_le_bytes([buf[offset], buf[offset + 1]]);
offset += 2;
Some(def_size)
} else {
None
};
let buffer_sizes = if self.has_large_chunk {
Self::read_buffer_sizes::<true>(buf, &mut offset, self.num_buffers)
} else {
Self::read_buffer_sizes::<false>(buf, &mut offset, self.num_buffers)
};
offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
let rep = rep_size.map(|rep_size| {
let rep = buf.slice_with_length(offset, rep_size as usize);
offset += rep_size as usize;
offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
rep
});
let def = def_size.map(|def_size| {
let def = buf.slice_with_length(offset, def_size as usize);
offset += def_size as usize;
offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
def
});
let buffers = buffer_sizes
.into_iter()
.map(|buf_size| {
let buf = buf.slice_with_length(offset, buf_size as usize);
offset += buf_size as usize;
offset += pad_bytes::<MINIBLOCK_ALIGNMENT>(offset);
buf
})
.collect::<Vec<_>>();
let values = self
.value_decompressor
.decompress(buffers, items_in_chunk)?;
let rep = rep
.map(|rep| {
Self::decode_levels(
self.rep_decompressor.as_ref().unwrap().as_ref(),
rep,
num_levels,
)
})
.transpose()?;
let def = def
.map(|def| {
Self::decode_levels(
self.def_decompressor.as_ref().unwrap().as_ref(),
def,
num_levels,
)
})
.transpose()?;
Ok(DecodedMiniBlockChunk { rep, def, values })
}
}
impl DecodePageTask for DecodeMiniBlockTask {
fn decode(self: Box<Self>) -> Result<DecodedPage> {
let mut repbuf: Option<LevelBuffer> = None;
let mut defbuf: Option<LevelBuffer> = None;
let max_rep = self.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
let estimated_size_bytes = self
.instructions
.iter()
.map(|(_, chunk)| chunk.data.len())
.sum::<usize>()
* 2;
let mut data_builder =
DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
let mut level_offset = 0;
let needs_caching: Vec<bool> = self
.instructions
.windows(2)
.map(|w| w[0].1.chunk_idx == w[1].1.chunk_idx)
.chain(std::iter::once(false)) .collect();
let mut chunk_cache: Option<(usize, DecodedMiniBlockChunk)> = None;
for (idx, (instructions, chunk)) in self.instructions.iter().enumerate() {
let should_cache_this_chunk = needs_caching[idx];
let decoded_chunk = match &chunk_cache {
Some((cached_chunk_idx, cached_chunk)) if *cached_chunk_idx == chunk.chunk_idx => {
cached_chunk.clone()
}
_ => {
let decoded = self.decode_miniblock_chunk(&chunk.data, chunk.items_in_chunk)?;
if should_cache_this_chunk {
chunk_cache = Some((chunk.chunk_idx, decoded.clone()));
}
decoded
}
};
let DecodedMiniBlockChunk { rep, def, values } = decoded_chunk;
let row_range_start =
instructions.rows_to_skip + instructions.chunk_instructions.rows_to_skip;
let row_range_end = row_range_start + instructions.rows_to_take;
let (item_range, level_range) = Self::map_range(
row_range_start..row_range_end,
rep.as_ref(),
def.as_ref(),
max_rep,
self.max_visible_level,
chunk.items_in_chunk,
instructions.preamble_action,
);
if item_range.end - item_range.start > chunk.items_in_chunk {
return Err(lance_core::Error::internal(format!(
"Item range {:?} is greater than chunk items in chunk {:?}",
item_range, chunk.items_in_chunk
)));
}
Self::extend_levels(level_range.clone(), &mut repbuf, &rep, level_offset);
Self::extend_levels(level_range.clone(), &mut defbuf, &def, level_offset);
level_offset += (level_range.end - level_range.start) as usize;
data_builder.append(&values, item_range);
}
let mut data = data_builder.finish();
let unraveler =
RepDefUnraveler::new(repbuf, defbuf, self.def_meaning.clone(), data.num_values());
if let Some(dictionary) = &self.dictionary_data {
let DataBlock::FixedWidth(indices) = data else {
return Err(lance_core::Error::internal(format!(
"Expected FixedWidth DataBlock for dictionary indices, got {:?}",
data
)));
};
data = DataBlock::Dictionary(DictionaryDataBlock::from_parts(
indices,
dictionary.as_ref().clone(),
));
}
Ok(DecodedPage {
data,
repdef: unraveler,
})
}
}
#[derive(Debug)]
struct LoadedChunk {
data: LanceBuffer,
items_in_chunk: u64,
byte_range: Range<u64>,
chunk_idx: usize,
}
impl Clone for LoadedChunk {
fn clone(&self) -> Self {
Self {
data: self.data.clone(),
items_in_chunk: self.items_in_chunk,
byte_range: self.byte_range.clone(),
chunk_idx: self.chunk_idx,
}
}
}
#[derive(Debug)]
struct MiniBlockDecoder {
rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
def_decompressor: Option<Arc<dyn BlockDecompressor>>,
value_decompressor: Arc<dyn MiniBlockDecompressor>,
def_meaning: Arc<[DefinitionInterpretation]>,
loaded_chunks: VecDeque<LoadedChunk>,
instructions: VecDeque<ChunkInstructions>,
offset_in_current_chunk: u64,
num_rows: u64,
num_buffers: u64,
dictionary: Option<Arc<DataBlock>>,
has_large_chunk: bool,
}
impl StructuralPageDecoder for MiniBlockDecoder {
fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
let mut items_desired = num_rows;
let mut need_preamble = false;
let mut skip_in_chunk = self.offset_in_current_chunk;
let mut drain_instructions = Vec::new();
while items_desired > 0 || need_preamble {
let (instructions, consumed) = self
.instructions
.front()
.unwrap()
.drain_from_instruction(&mut items_desired, &mut need_preamble, &mut skip_in_chunk);
while self.loaded_chunks.front().unwrap().chunk_idx
!= instructions.chunk_instructions.chunk_idx
{
self.loaded_chunks.pop_front();
}
drain_instructions.push((instructions, self.loaded_chunks.front().unwrap().clone()));
if consumed {
self.instructions.pop_front();
}
}
self.offset_in_current_chunk = skip_in_chunk;
let max_visible_level = self
.def_meaning
.iter()
.take_while(|l| !l.is_list())
.map(|l| l.num_def_levels())
.sum::<u16>();
Ok(Box::new(DecodeMiniBlockTask {
instructions: drain_instructions,
def_decompressor: self.def_decompressor.clone(),
rep_decompressor: self.rep_decompressor.clone(),
value_decompressor: self.value_decompressor.clone(),
dictionary_data: self.dictionary.clone(),
def_meaning: self.def_meaning.clone(),
num_buffers: self.num_buffers,
max_visible_level,
has_large_chunk: self.has_large_chunk,
}))
}
fn num_rows(&self) -> u64 {
self.num_rows
}
}
#[derive(Debug)]
struct CachedComplexAllNullState {
rep: Option<ScalarBuffer<u16>>,
def: Option<ScalarBuffer<u16>>,
}
impl DeepSizeOf for CachedComplexAllNullState {
fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
self.rep.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
+ self.def.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
}
}
impl CachedPageData for CachedComplexAllNullState {
fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
self
}
}
#[derive(Debug)]
pub struct ComplexAllNullScheduler {
buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
def_meaning: Arc<[DefinitionInterpretation]>,
repdef: Option<Arc<CachedComplexAllNullState>>,
max_visible_level: u16,
rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
def_decompressor: Option<Arc<dyn BlockDecompressor>>,
num_rep_values: u64,
num_def_values: u64,
}
impl ComplexAllNullScheduler {
pub fn new(
buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
def_meaning: Arc<[DefinitionInterpretation]>,
rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
def_decompressor: Option<Arc<dyn BlockDecompressor>>,
num_rep_values: u64,
num_def_values: u64,
) -> Self {
let max_visible_level = def_meaning
.iter()
.take_while(|l| !l.is_list())
.map(|l| l.num_def_levels())
.sum::<u16>();
Self {
buffer_offsets_and_sizes,
def_meaning,
repdef: None,
max_visible_level,
rep_decompressor,
def_decompressor,
num_rep_values,
num_def_values,
}
}
}
impl StructuralPageScheduler for ComplexAllNullScheduler {
fn initialize<'a>(
&'a mut self,
io: &Arc<dyn EncodingsIo>,
) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
let (rep_pos, rep_size) = self.buffer_offsets_and_sizes[0];
let (def_pos, def_size) = self.buffer_offsets_and_sizes[1];
let has_rep = rep_size > 0;
let has_def = def_size > 0;
let mut reads = Vec::with_capacity(2);
if has_rep {
reads.push(rep_pos..rep_pos + rep_size);
}
if has_def {
reads.push(def_pos..def_pos + def_size);
}
let data = io.submit_request(reads, 0);
let rep_decompressor = self.rep_decompressor.clone();
let def_decompressor = self.def_decompressor.clone();
let num_rep_values = self.num_rep_values;
let num_def_values = self.num_def_values;
async move {
let data = data.await?;
let mut data_iter = data.into_iter();
let decompress_levels = |compressed_bytes: Bytes,
decompressor: &Arc<dyn BlockDecompressor>,
num_values: u64,
level_type: &str|
-> Result<ScalarBuffer<u16>> {
let compressed_buffer = LanceBuffer::from_bytes(compressed_bytes, 1);
let decompressed = decompressor.decompress(compressed_buffer, num_values)?;
match decompressed {
DataBlock::FixedWidth(block) => {
if block.num_values != num_values {
return Err(Error::invalid_input_source(format!(
"Unexpected {} level count after decompression: expected {}, got {}",
level_type, num_values, block.num_values
)
.into()));
}
if block.bits_per_value != 16 {
return Err(Error::invalid_input_source(format!(
"Unexpected {} level bit width after decompression: expected 16, got {}",
level_type, block.bits_per_value
)
.into()));
}
Ok(block.data.borrow_to_typed_slice::<u16>())
}
_ => Err(Error::invalid_input_source(format!(
"Expected fixed-width data block for {} levels",
level_type
)
.into())),
}
};
let rep = if has_rep {
let rep = data_iter.next().unwrap();
if let Some(rep_decompressor) = rep_decompressor.as_ref() {
Some(decompress_levels(
rep,
rep_decompressor,
num_rep_values,
"repetition",
)?)
} else {
let rep = LanceBuffer::from_bytes(rep, 2);
let rep = rep.borrow_to_typed_slice::<u16>();
Some(rep)
}
} else {
None
};
let def = if has_def {
let def = data_iter.next().unwrap();
if let Some(def_decompressor) = def_decompressor.as_ref() {
Some(decompress_levels(
def,
def_decompressor,
num_def_values,
"definition",
)?)
} else {
let def = LanceBuffer::from_bytes(def, 2);
let def = def.borrow_to_typed_slice::<u16>();
Some(def)
}
} else {
None
};
let repdef = Arc::new(CachedComplexAllNullState { rep, def });
self.repdef = Some(repdef.clone());
Ok(repdef as Arc<dyn CachedPageData>)
}
.boxed()
}
fn load(&mut self, data: &Arc<dyn CachedPageData>) {
self.repdef = Some(
data.clone()
.as_arc_any()
.downcast::<CachedComplexAllNullState>()
.unwrap(),
);
}
fn schedule_ranges(
&self,
ranges: &[Range<u64>],
_io: &Arc<dyn EncodingsIo>,
) -> Result<Vec<PageLoadTask>> {
let ranges = VecDeque::from_iter(ranges.iter().cloned());
let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
let decoder = Box::new(ComplexAllNullPageDecoder {
ranges,
rep: self.repdef.as_ref().unwrap().rep.clone(),
def: self.repdef.as_ref().unwrap().def.clone(),
num_rows,
def_meaning: self.def_meaning.clone(),
max_visible_level: self.max_visible_level,
}) as Box<dyn StructuralPageDecoder>;
let page_load_task = PageLoadTask {
decoder_fut: std::future::ready(Ok(decoder)).boxed(),
num_rows,
};
Ok(vec![page_load_task])
}
}
#[derive(Debug)]
pub struct ComplexAllNullPageDecoder {
ranges: VecDeque<Range<u64>>,
rep: Option<ScalarBuffer<u16>>,
def: Option<ScalarBuffer<u16>>,
num_rows: u64,
def_meaning: Arc<[DefinitionInterpretation]>,
max_visible_level: u16,
}
impl ComplexAllNullPageDecoder {
fn drain_ranges(&mut self, num_rows: u64) -> Vec<Range<u64>> {
let mut rows_desired = num_rows;
let mut ranges = Vec::with_capacity(self.ranges.len());
while rows_desired > 0 {
let front = self.ranges.front_mut().unwrap();
let avail = front.end - front.start;
if avail > rows_desired {
ranges.push(front.start..front.start + rows_desired);
front.start += rows_desired;
rows_desired = 0;
} else {
ranges.push(self.ranges.pop_front().unwrap());
rows_desired -= avail;
}
}
ranges
}
}
impl StructuralPageDecoder for ComplexAllNullPageDecoder {
fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
let drained_ranges = self.drain_ranges(num_rows);
Ok(Box::new(DecodeComplexAllNullTask {
ranges: drained_ranges,
rep: self.rep.clone(),
def: self.def.clone(),
def_meaning: self.def_meaning.clone(),
max_visible_level: self.max_visible_level,
}))
}
fn num_rows(&self) -> u64 {
self.num_rows
}
}
#[derive(Debug)]
pub struct DecodeComplexAllNullTask {
ranges: Vec<Range<u64>>,
rep: Option<ScalarBuffer<u16>>,
def: Option<ScalarBuffer<u16>>,
def_meaning: Arc<[DefinitionInterpretation]>,
max_visible_level: u16,
}
impl DecodeComplexAllNullTask {
fn decode_level(
&self,
levels: &Option<ScalarBuffer<u16>>,
num_values: u64,
) -> Option<Vec<u16>> {
levels.as_ref().map(|levels| {
let mut referenced_levels = Vec::with_capacity(num_values as usize);
for range in &self.ranges {
referenced_levels.extend(
levels[range.start as usize..range.end as usize]
.iter()
.copied(),
);
}
referenced_levels
})
}
}
impl DecodePageTask for DecodeComplexAllNullTask {
fn decode(self: Box<Self>) -> Result<DecodedPage> {
let num_values = self.ranges.iter().map(|r| r.end - r.start).sum::<u64>();
let rep = self.decode_level(&self.rep, num_values);
let def = self.decode_level(&self.def, num_values);
let num_values = if let Some(def) = &def {
def.iter().filter(|&d| *d <= self.max_visible_level).count() as u64
} else {
num_values
};
let data = DataBlock::AllNull(AllNullDataBlock { num_values });
let unraveler = RepDefUnraveler::new(rep, def, self.def_meaning, num_values);
Ok(DecodedPage {
data,
repdef: unraveler,
})
}
}
#[derive(Debug, Default)]
pub struct SimpleAllNullScheduler {}
impl StructuralPageScheduler for SimpleAllNullScheduler {
fn initialize<'a>(
&'a mut self,
_io: &Arc<dyn EncodingsIo>,
) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
}
fn load(&mut self, _cache: &Arc<dyn CachedPageData>) {}
fn schedule_ranges(
&self,
ranges: &[Range<u64>],
_io: &Arc<dyn EncodingsIo>,
) -> Result<Vec<PageLoadTask>> {
let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
let decoder =
Box::new(SimpleAllNullPageDecoder { num_rows }) as Box<dyn StructuralPageDecoder>;
let page_load_task = PageLoadTask {
decoder_fut: std::future::ready(Ok(decoder)).boxed(),
num_rows,
};
Ok(vec![page_load_task])
}
}
#[derive(Debug)]
struct SimpleAllNullDecodePageTask {
num_values: u64,
}
impl DecodePageTask for SimpleAllNullDecodePageTask {
fn decode(self: Box<Self>) -> Result<DecodedPage> {
let unraveler = RepDefUnraveler::new(
None,
Some(vec![1; self.num_values as usize]),
Arc::new([DefinitionInterpretation::NullableItem]),
self.num_values,
);
Ok(DecodedPage {
data: DataBlock::AllNull(AllNullDataBlock {
num_values: self.num_values,
}),
repdef: unraveler,
})
}
}
#[derive(Debug)]
pub struct SimpleAllNullPageDecoder {
num_rows: u64,
}
impl StructuralPageDecoder for SimpleAllNullPageDecoder {
fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
Ok(Box::new(SimpleAllNullDecodePageTask {
num_values: num_rows,
}))
}
fn num_rows(&self) -> u64 {
self.num_rows
}
}
#[derive(Debug, Clone)]
struct MiniBlockSchedulerDictionary {
dictionary_decompressor: Arc<dyn BlockDecompressor>,
dictionary_buf_position_and_size: (u64, u64),
dictionary_data_alignment: u64,
num_dictionary_items: u64,
}
#[derive(Debug)]
struct MiniBlockRepIndexBlock {
first_row: u64,
starts_including_trailer: u64,
has_preamble: bool,
has_trailer: bool,
}
impl DeepSizeOf for MiniBlockRepIndexBlock {
fn deep_size_of_children(&self, _context: &mut Context) -> usize {
0
}
}
#[derive(Debug)]
struct MiniBlockRepIndex {
blocks: Vec<MiniBlockRepIndexBlock>,
}
impl DeepSizeOf for MiniBlockRepIndex {
fn deep_size_of_children(&self, context: &mut Context) -> usize {
self.blocks.deep_size_of_children(context)
}
}
impl MiniBlockRepIndex {
pub fn default_from_chunks(chunks: &[ChunkMeta]) -> Self {
let mut blocks = Vec::with_capacity(chunks.len());
let mut offset: u64 = 0;
for c in chunks {
blocks.push(MiniBlockRepIndexBlock {
first_row: offset,
starts_including_trailer: c.num_values,
has_preamble: false,
has_trailer: false,
});
offset += c.num_values;
}
Self { blocks }
}
pub fn decode_from_bytes(rep_bytes: &[u8], stride: usize) -> Self {
let buffer = crate::buffer::LanceBuffer::from(rep_bytes.to_vec());
let u64_slice = buffer.borrow_to_typed_slice::<u64>();
let n = u64_slice.len() / stride;
let mut blocks = Vec::with_capacity(n);
let mut chunk_has_preamble = false;
let mut offset: u64 = 0;
for i in 0..n {
let base_idx = i * stride;
let ends = u64_slice[base_idx];
let partial = u64_slice[base_idx + 1];
let has_trailer = partial > 0;
let starts_including_trailer =
ends + (has_trailer as u64) - (chunk_has_preamble as u64);
blocks.push(MiniBlockRepIndexBlock {
first_row: offset,
starts_including_trailer,
has_preamble: chunk_has_preamble,
has_trailer,
});
chunk_has_preamble = has_trailer;
offset += starts_including_trailer;
}
Self { blocks }
}
}
#[derive(Debug)]
struct MiniBlockCacheableState {
chunk_meta: Vec<ChunkMeta>,
rep_index: MiniBlockRepIndex,
dictionary: Option<Arc<DataBlock>>,
}
impl DeepSizeOf for MiniBlockCacheableState {
fn deep_size_of_children(&self, context: &mut Context) -> usize {
self.rep_index.deep_size_of_children(context)
+ self
.dictionary
.as_ref()
.map(|dict| dict.data_size() as usize)
.unwrap_or(0)
}
}
impl CachedPageData for MiniBlockCacheableState {
fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
self
}
}
#[derive(Debug)]
pub struct MiniBlockScheduler {
buffer_offsets_and_sizes: Vec<(u64, u64)>,
priority: u64,
items_in_page: u64,
repetition_index_depth: u16,
num_buffers: u64,
rep_decompressor: Option<Arc<dyn BlockDecompressor>>,
def_decompressor: Option<Arc<dyn BlockDecompressor>>,
value_decompressor: Arc<dyn MiniBlockDecompressor>,
def_meaning: Arc<[DefinitionInterpretation]>,
dictionary: Option<MiniBlockSchedulerDictionary>,
page_meta: Option<Arc<MiniBlockCacheableState>>,
has_large_chunk: bool,
}
impl MiniBlockScheduler {
fn try_new(
buffer_offsets_and_sizes: &[(u64, u64)],
priority: u64,
items_in_page: u64,
layout: &pb21::MiniBlockLayout,
decompressors: &dyn DecompressionStrategy,
) -> Result<Self> {
let rep_decompressor = layout
.rep_compression
.as_ref()
.map(|rep_compression| {
decompressors
.create_block_decompressor(rep_compression)
.map(Arc::from)
})
.transpose()?;
let def_decompressor = layout
.def_compression
.as_ref()
.map(|def_compression| {
decompressors
.create_block_decompressor(def_compression)
.map(Arc::from)
})
.transpose()?;
let def_meaning = layout
.layers
.iter()
.map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
.collect::<Vec<_>>();
let value_decompressor = decompressors.create_miniblock_decompressor(
layout.value_compression.as_ref().unwrap(),
decompressors,
)?;
let dictionary = if let Some(dictionary_encoding) = layout.dictionary.as_ref() {
let num_dictionary_items = layout.num_dictionary_items;
let dictionary_decompressor = decompressors
.create_block_decompressor(dictionary_encoding)?
.into();
let dictionary_data_alignment = match dictionary_encoding.compression.as_ref().unwrap()
{
Compression::Variable(_) => 4,
Compression::Flat(_) => 16,
Compression::General(_) => 1,
Compression::InlineBitpacking(_) | Compression::OutOfLineBitpacking(_) => {
crate::encoder::MIN_PAGE_BUFFER_ALIGNMENT
}
_ => {
return Err(Error::invalid_input_source(
format!(
"Unsupported mini-block dictionary encoding: {:?}",
dictionary_encoding.compression.as_ref().unwrap()
)
.into(),
));
}
};
Some(MiniBlockSchedulerDictionary {
dictionary_decompressor,
dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
dictionary_data_alignment,
num_dictionary_items,
})
} else {
None
};
Ok(Self {
buffer_offsets_and_sizes: buffer_offsets_and_sizes.to_vec(),
rep_decompressor,
def_decompressor,
value_decompressor: value_decompressor.into(),
repetition_index_depth: layout.repetition_index_depth as u16,
num_buffers: layout.num_buffers,
priority,
items_in_page,
dictionary,
def_meaning: def_meaning.into(),
page_meta: None,
has_large_chunk: layout.has_large_chunk,
})
}
fn lookup_chunks(&self, chunk_indices: &[usize]) -> Vec<LoadedChunk> {
let page_meta = self.page_meta.as_ref().unwrap();
chunk_indices
.iter()
.map(|&chunk_idx| {
let chunk_meta = &page_meta.chunk_meta[chunk_idx];
let bytes_start = chunk_meta.offset_bytes;
let bytes_end = bytes_start + chunk_meta.chunk_size_bytes;
LoadedChunk {
byte_range: bytes_start..bytes_end,
items_in_chunk: chunk_meta.num_values,
chunk_idx,
data: LanceBuffer::empty(),
}
})
.collect()
}
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
enum PreambleAction {
Take,
Skip,
Absent,
}
#[derive(Clone, Debug, PartialEq, Eq)]
struct ChunkInstructions {
chunk_idx: usize,
preamble: PreambleAction,
rows_to_skip: u64,
rows_to_take: u64,
take_trailer: bool,
}
#[derive(Debug, PartialEq, Eq)]
struct ChunkDrainInstructions {
chunk_instructions: ChunkInstructions,
rows_to_skip: u64,
rows_to_take: u64,
preamble_action: PreambleAction,
}
impl ChunkInstructions {
fn schedule_instructions(
rep_index: &MiniBlockRepIndex,
user_ranges: &[Range<u64>],
) -> Vec<Self> {
let mut chunk_instructions = Vec::with_capacity(user_ranges.len());
for user_range in user_ranges {
let mut rows_needed = user_range.end - user_range.start;
let mut need_preamble = false;
let mut block_index = match rep_index
.blocks
.binary_search_by_key(&user_range.start, |block| block.first_row)
{
Ok(idx) => {
let mut idx = idx;
while idx > 0 && rep_index.blocks[idx - 1].first_row == user_range.start {
idx -= 1;
}
idx
}
Err(idx) => idx - 1,
};
let mut to_skip = user_range.start - rep_index.blocks[block_index].first_row;
while rows_needed > 0 || need_preamble {
if block_index >= rep_index.blocks.len() {
log::warn!(
"schedule_instructions inconsistency: block_index >= rep_index.blocks.len(), exiting early"
);
break;
}
let chunk = &rep_index.blocks[block_index];
let rows_avail = chunk.starts_including_trailer.saturating_sub(to_skip);
if rows_avail == 0 && to_skip == 0 {
if chunk.has_preamble && need_preamble {
chunk_instructions.push(Self {
chunk_idx: block_index,
preamble: PreambleAction::Take,
rows_to_skip: 0,
rows_to_take: 0,
take_trailer: chunk.has_trailer,
});
if chunk.starts_including_trailer > 0
|| block_index == rep_index.blocks.len() - 1
{
need_preamble = false;
}
}
block_index += 1;
continue;
}
if rows_avail == 0 && to_skip > 0 {
to_skip -= chunk.starts_including_trailer;
block_index += 1;
continue;
}
let rows_to_take = rows_avail.min(rows_needed);
rows_needed -= rows_to_take;
let mut take_trailer = false;
let preamble = if chunk.has_preamble {
if need_preamble {
PreambleAction::Take
} else {
PreambleAction::Skip
}
} else {
PreambleAction::Absent
};
if rows_to_take == rows_avail && chunk.has_trailer {
take_trailer = true;
need_preamble = true;
} else {
need_preamble = false;
};
chunk_instructions.push(Self {
preamble,
chunk_idx: block_index,
rows_to_skip: to_skip,
rows_to_take,
take_trailer,
});
to_skip = 0;
block_index += 1;
}
}
if user_ranges.len() > 1 {
let mut merged_instructions = Vec::with_capacity(chunk_instructions.len());
let mut instructions_iter = chunk_instructions.into_iter();
merged_instructions.push(instructions_iter.next().unwrap());
for instruction in instructions_iter {
let last = merged_instructions.last_mut().unwrap();
if last.chunk_idx == instruction.chunk_idx
&& last.rows_to_take + last.rows_to_skip == instruction.rows_to_skip
{
last.rows_to_take += instruction.rows_to_take;
last.take_trailer |= instruction.take_trailer;
} else {
merged_instructions.push(instruction);
}
}
merged_instructions
} else {
chunk_instructions
}
}
fn drain_from_instruction(
&self,
rows_desired: &mut u64,
need_preamble: &mut bool,
skip_in_chunk: &mut u64,
) -> (ChunkDrainInstructions, bool) {
debug_assert!(!*need_preamble || *skip_in_chunk == 0);
let rows_avail = self.rows_to_take - *skip_in_chunk;
let has_preamble = self.preamble != PreambleAction::Absent;
let preamble_action = match (*need_preamble, has_preamble) {
(true, true) => PreambleAction::Take,
(true, false) => panic!("Need preamble but there isn't one"),
(false, true) => PreambleAction::Skip,
(false, false) => PreambleAction::Absent,
};
let rows_taking = if *rows_desired >= rows_avail {
*need_preamble = self.take_trailer;
rows_avail
} else {
*need_preamble = false;
*rows_desired
};
let rows_skipped = *skip_in_chunk;
let consumed_chunk = if *rows_desired >= rows_avail {
*rows_desired -= rows_avail;
*skip_in_chunk = 0;
true
} else {
*skip_in_chunk += *rows_desired;
*rows_desired = 0;
false
};
(
ChunkDrainInstructions {
chunk_instructions: self.clone(),
rows_to_skip: rows_skipped,
rows_to_take: rows_taking,
preamble_action,
},
consumed_chunk,
)
}
}
enum Words {
U16(ScalarBuffer<u16>),
U32(ScalarBuffer<u32>),
}
struct WordsIter<'a> {
iter: Box<dyn Iterator<Item = u32> + 'a>,
}
impl Words {
pub fn len(&self) -> usize {
match self {
Self::U16(b) => b.len(),
Self::U32(b) => b.len(),
}
}
pub fn iter(&self) -> WordsIter<'_> {
match self {
Self::U16(buf) => WordsIter {
iter: Box::new(buf.iter().map(|&x| x as u32)),
},
Self::U32(buf) => WordsIter {
iter: Box::new(buf.iter().copied()),
},
}
}
pub fn from_bytes(bytes: Bytes, has_large_chunk: bool) -> Result<Self> {
let bytes_per_value = if has_large_chunk { 4 } else { 2 };
assert_eq!(bytes.len() % bytes_per_value, 0);
let buffer = LanceBuffer::from_bytes(bytes, bytes_per_value as u64);
if has_large_chunk {
Ok(Self::U32(buffer.borrow_to_typed_slice::<u32>()))
} else {
Ok(Self::U16(buffer.borrow_to_typed_slice::<u16>()))
}
}
}
impl<'a> Iterator for WordsIter<'a> {
type Item = u32;
fn next(&mut self) -> Option<Self::Item> {
self.iter.next()
}
}
impl StructuralPageScheduler for MiniBlockScheduler {
fn initialize<'a>(
&'a mut self,
io: &Arc<dyn EncodingsIo>,
) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
let (meta_buf_position, meta_buf_size) = self.buffer_offsets_and_sizes[0];
let value_buf_position = self.buffer_offsets_and_sizes[1].0;
let mut bufs_needed = 1;
if self.dictionary.is_some() {
bufs_needed += 1;
}
if self.repetition_index_depth > 0 {
bufs_needed += 1;
}
let mut required_ranges = Vec::with_capacity(bufs_needed);
required_ranges.push(meta_buf_position..meta_buf_position + meta_buf_size);
if let Some(ref dictionary) = self.dictionary {
required_ranges.push(
dictionary.dictionary_buf_position_and_size.0
..dictionary.dictionary_buf_position_and_size.0
+ dictionary.dictionary_buf_position_and_size.1,
);
}
if self.repetition_index_depth > 0 {
let (rep_index_pos, rep_index_size) = self.buffer_offsets_and_sizes.last().unwrap();
required_ranges.push(*rep_index_pos..*rep_index_pos + *rep_index_size);
}
let io_req = io.submit_request(required_ranges, 0);
async move {
let mut buffers = io_req.await?.into_iter().fuse();
let meta_bytes = buffers.next().unwrap();
let dictionary_bytes = self.dictionary.as_ref().and_then(|_| buffers.next());
let rep_index_bytes = buffers.next();
let words = Words::from_bytes(meta_bytes, self.has_large_chunk)?;
let mut chunk_meta = Vec::with_capacity(words.len());
let mut rows_counter = 0;
let mut offset_bytes = value_buf_position;
for (word_idx, word) in words.iter().enumerate() {
let log_num_values = word & 0x0F;
let divided_bytes = word >> 4;
let num_bytes = (divided_bytes as usize + 1) * MINIBLOCK_ALIGNMENT;
debug_assert!(num_bytes > 0);
let num_values = if word_idx < words.len() - 1 {
debug_assert!(log_num_values > 0);
1 << log_num_values
} else {
debug_assert!(
log_num_values == 0
|| (1 << log_num_values) == (self.items_in_page - rows_counter)
);
self.items_in_page - rows_counter
};
rows_counter += num_values;
chunk_meta.push(ChunkMeta {
num_values,
chunk_size_bytes: num_bytes as u64,
offset_bytes,
});
offset_bytes += num_bytes as u64;
}
let rep_index = if let Some(rep_index_data) = rep_index_bytes {
assert!(rep_index_data.len() % 8 == 0);
let stride = self.repetition_index_depth as usize + 1;
MiniBlockRepIndex::decode_from_bytes(&rep_index_data, stride)
} else {
MiniBlockRepIndex::default_from_chunks(&chunk_meta)
};
let mut page_meta = MiniBlockCacheableState {
chunk_meta,
rep_index,
dictionary: None,
};
if let Some(ref mut dictionary) = self.dictionary {
let dictionary_data = dictionary_bytes.unwrap();
page_meta.dictionary =
Some(Arc::new(dictionary.dictionary_decompressor.decompress(
LanceBuffer::from_bytes(
dictionary_data,
dictionary.dictionary_data_alignment,
),
dictionary.num_dictionary_items,
)?));
};
let page_meta = Arc::new(page_meta);
self.page_meta = Some(page_meta.clone());
Ok(page_meta as Arc<dyn CachedPageData>)
}
.boxed()
}
fn load(&mut self, data: &Arc<dyn CachedPageData>) {
self.page_meta = Some(
data.clone()
.as_arc_any()
.downcast::<MiniBlockCacheableState>()
.unwrap(),
);
}
fn schedule_ranges(
&self,
ranges: &[Range<u64>],
io: &Arc<dyn EncodingsIo>,
) -> Result<Vec<PageLoadTask>> {
let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
let page_meta = self.page_meta.as_ref().unwrap();
let chunk_instructions =
ChunkInstructions::schedule_instructions(&page_meta.rep_index, ranges);
debug_assert_eq!(
num_rows,
chunk_instructions
.iter()
.map(|ci| ci.rows_to_take)
.sum::<u64>()
);
let chunks_needed = chunk_instructions
.iter()
.map(|ci| ci.chunk_idx)
.unique()
.collect::<Vec<_>>();
let mut loaded_chunks = self.lookup_chunks(&chunks_needed);
let chunk_ranges = loaded_chunks
.iter()
.map(|c| c.byte_range.clone())
.collect::<Vec<_>>();
let loaded_chunk_data = io.submit_request(chunk_ranges, self.priority);
let rep_decompressor = self.rep_decompressor.clone();
let def_decompressor = self.def_decompressor.clone();
let value_decompressor = self.value_decompressor.clone();
let num_buffers = self.num_buffers;
let has_large_chunk = self.has_large_chunk;
let dictionary = page_meta
.dictionary
.as_ref()
.map(|dictionary| dictionary.clone());
let def_meaning = self.def_meaning.clone();
let res = async move {
let loaded_chunk_data = loaded_chunk_data.await?;
for (loaded_chunk, chunk_data) in loaded_chunks.iter_mut().zip(loaded_chunk_data) {
loaded_chunk.data = LanceBuffer::from_bytes(chunk_data, 1);
}
Ok(Box::new(MiniBlockDecoder {
rep_decompressor,
def_decompressor,
value_decompressor,
def_meaning,
loaded_chunks: VecDeque::from_iter(loaded_chunks),
instructions: VecDeque::from(chunk_instructions),
offset_in_current_chunk: 0,
dictionary,
num_rows,
num_buffers,
has_large_chunk,
}) as Box<dyn StructuralPageDecoder>)
}
.boxed();
let page_load_task = PageLoadTask {
decoder_fut: res,
num_rows,
};
Ok(vec![page_load_task])
}
}
#[derive(Debug, Clone, Copy)]
struct FullZipRepIndexDetails {
buf_position: u64,
bytes_per_value: u64, }
#[derive(Debug)]
enum PerValueDecompressor {
Fixed(Arc<dyn FixedPerValueDecompressor>),
Variable(Arc<dyn VariablePerValueDecompressor>),
}
#[derive(Debug)]
struct FullZipDecodeDetails {
value_decompressor: PerValueDecompressor,
def_meaning: Arc<[DefinitionInterpretation]>,
ctrl_word_parser: ControlWordParser,
max_rep: u16,
max_visible_def: u16,
}
#[derive(Debug, Clone)]
enum FullZipReadSource {
Remote(Arc<dyn EncodingsIo>),
PrefetchedPage { base_offset: u64, data: LanceBuffer },
}
impl FullZipReadSource {
fn fetch(
&self,
ranges: &[Range<u64>],
priority: u64,
) -> BoxFuture<'static, Result<VecDeque<LanceBuffer>>> {
match self {
Self::Remote(io) => {
let io = io.clone();
let ranges = ranges.to_vec();
async move {
let data = io.submit_request(ranges, priority).await?;
Ok(data
.into_iter()
.map(|bytes| LanceBuffer::from_bytes(bytes, 1))
.collect::<VecDeque<_>>())
}
.boxed()
}
Self::PrefetchedPage { base_offset, data } => {
let base_offset = *base_offset;
let data = data.clone();
let page_end = base_offset + data.len() as u64;
std::future::ready(
ranges
.iter()
.map(|range| {
if range.start > range.end
|| range.start < base_offset
|| range.end > page_end
{
return Err(Error::internal(format!(
"Requested range {:?} is outside page range {}..{}",
range, base_offset, page_end
)));
}
let start = (range.start - base_offset) as usize;
let len = (range.end - range.start) as usize;
Ok(data.slice_with_length(start, len))
})
.collect::<Result<VecDeque<_>>>(),
)
.boxed()
}
}
}
}
#[derive(Debug)]
pub struct FullZipScheduler {
data_buf_position: u64,
data_buf_size: u64,
rep_index: Option<FullZipRepIndexDetails>,
priority: u64,
rows_in_page: u64,
bits_per_offset: u8,
details: Arc<FullZipDecodeDetails>,
cached_state: Option<Arc<FullZipCacheableState>>,
enable_cache: bool,
}
impl FullZipScheduler {
fn try_new(
buffer_offsets_and_sizes: &[(u64, u64)],
priority: u64,
rows_in_page: u64,
layout: &pb21::FullZipLayout,
decompressors: &dyn DecompressionStrategy,
) -> Result<Self> {
let (data_buf_position, data_buf_size) = buffer_offsets_and_sizes[0];
let rep_index = buffer_offsets_and_sizes.get(1).map(|(pos, len)| {
let num_reps = rows_in_page + 1;
let bytes_per_rep = len / num_reps;
debug_assert_eq!(len % num_reps, 0);
debug_assert!(
bytes_per_rep == 1
|| bytes_per_rep == 2
|| bytes_per_rep == 4
|| bytes_per_rep == 8
);
FullZipRepIndexDetails {
buf_position: *pos,
bytes_per_value: bytes_per_rep,
}
});
let value_decompressor = match layout.details {
Some(pb21::full_zip_layout::Details::BitsPerValue(_)) => {
let decompressor = decompressors.create_fixed_per_value_decompressor(
layout.value_compression.as_ref().unwrap(),
)?;
PerValueDecompressor::Fixed(decompressor.into())
}
Some(pb21::full_zip_layout::Details::BitsPerOffset(_)) => {
let decompressor = decompressors.create_variable_per_value_decompressor(
layout.value_compression.as_ref().unwrap(),
)?;
PerValueDecompressor::Variable(decompressor.into())
}
None => {
panic!("Full-zip layout must have a `details` field");
}
};
let ctrl_word_parser = ControlWordParser::new(
layout.bits_rep.try_into().unwrap(),
layout.bits_def.try_into().unwrap(),
);
let def_meaning = layout
.layers
.iter()
.map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
.collect::<Vec<_>>();
let max_rep = def_meaning.iter().filter(|d| d.is_list()).count() as u16;
let max_visible_def = def_meaning
.iter()
.filter(|d| !d.is_list())
.map(|d| d.num_def_levels())
.sum();
let bits_per_offset = match layout.details {
Some(pb21::full_zip_layout::Details::BitsPerValue(_)) => 32,
Some(pb21::full_zip_layout::Details::BitsPerOffset(bits_per_offset)) => {
bits_per_offset as u8
}
None => panic!("Full-zip layout must have a `details` field"),
};
let details = Arc::new(FullZipDecodeDetails {
value_decompressor,
def_meaning: def_meaning.into(),
ctrl_word_parser,
max_rep,
max_visible_def,
});
Ok(Self {
data_buf_position,
data_buf_size,
rep_index,
details,
priority,
rows_in_page,
bits_per_offset,
cached_state: None,
enable_cache: false,
})
}
fn covers_entire_page(ranges: &[Range<u64>], rows_in_page: u64) -> bool {
if ranges.is_empty() {
return false;
}
let mut expected_start = 0;
for range in ranges {
if range.start != expected_start || range.end > rows_in_page || range.end < range.start
{
return false;
}
expected_start = range.end;
}
expected_start == rows_in_page
}
fn create_page_load_task(
read_source: FullZipReadSource,
byte_ranges: Vec<Range<u64>>,
priority: u64,
num_rows: u64,
details: Arc<FullZipDecodeDetails>,
bits_per_offset: u8,
) -> PageLoadTask {
let load_task = async move {
let data = read_source.fetch(&byte_ranges, priority).await?;
Self::create_decoder(details, data, num_rows, bits_per_offset)
}
.boxed();
PageLoadTask {
decoder_fut: load_task,
num_rows,
}
}
fn create_decoder(
details: Arc<FullZipDecodeDetails>,
data: VecDeque<LanceBuffer>,
num_rows: u64,
bits_per_offset: u8,
) -> Result<Box<dyn StructuralPageDecoder>> {
match &details.value_decompressor {
PerValueDecompressor::Fixed(decompressor) => {
let bits_per_value = decompressor.bits_per_value();
if bits_per_value % 8 != 0 {
return Err(lance_core::Error::not_supported_source("Bit-packed full-zip encoding (non-byte-aligned values) is not yet implemented".into()));
}
let bytes_per_value = bits_per_value / 8;
let total_bytes_per_value =
bytes_per_value as usize + details.ctrl_word_parser.bytes_per_word();
if total_bytes_per_value == 0 {
return Err(lance_core::Error::internal(
"Invalid encoding: per-row byte width must be greater than 0",
));
}
Ok(Box::new(FixedFullZipDecoder {
details,
data,
num_rows,
offset_in_current: 0,
bytes_per_value: bytes_per_value as usize,
total_bytes_per_value,
}) as Box<dyn StructuralPageDecoder>)
}
PerValueDecompressor::Variable(_decompressor) => {
Ok(Box::new(VariableFullZipDecoder::new(
details,
data,
num_rows,
bits_per_offset,
bits_per_offset,
)))
}
}
}
fn extract_byte_ranges_from_pairs(
buffer: LanceBuffer,
bytes_per_value: u64,
data_buf_position: u64,
) -> Vec<Range<u64>> {
ByteUnpacker::new(buffer, bytes_per_value as usize)
.chunks(2)
.into_iter()
.map(|mut c| {
let start = c.next().unwrap() + data_buf_position;
let end = c.next().unwrap() + data_buf_position;
start..end
})
.collect::<Vec<_>>()
}
fn extract_byte_ranges_from_cached(
buffer: &LanceBuffer,
ranges: &[Range<u64>],
bytes_per_value: u64,
data_buf_position: u64,
) -> Vec<Range<u64>> {
ranges
.iter()
.map(|r| {
let start_offset = (r.start * bytes_per_value) as usize;
let end_offset = (r.end * bytes_per_value) as usize;
let start_slice = &buffer[start_offset..start_offset + bytes_per_value as usize];
let start_val =
ByteUnpacker::new(start_slice.iter().copied(), bytes_per_value as usize)
.next()
.unwrap();
let end_slice = &buffer[end_offset..end_offset + bytes_per_value as usize];
let end_val =
ByteUnpacker::new(end_slice.iter().copied(), bytes_per_value as usize)
.next()
.unwrap();
(data_buf_position + start_val)..(data_buf_position + end_val)
})
.collect()
}
fn compute_rep_index_ranges(
ranges: &[Range<u64>],
rep_index: &FullZipRepIndexDetails,
) -> Vec<Range<u64>> {
ranges
.iter()
.flat_map(|r| {
let first_val_start =
rep_index.buf_position + (r.start * rep_index.bytes_per_value);
let first_val_end = first_val_start + rep_index.bytes_per_value;
let last_val_start = rep_index.buf_position + (r.end * rep_index.bytes_per_value);
let last_val_end = last_val_start + rep_index.bytes_per_value;
[first_val_start..first_val_end, last_val_start..last_val_end]
})
.collect()
}
fn schedule_ranges_rep(
&self,
ranges: &[Range<u64>],
io: &Arc<dyn EncodingsIo>,
rep_index: FullZipRepIndexDetails,
) -> Result<Vec<PageLoadTask>> {
let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
let data_buf_position = self.data_buf_position;
let priority = self.priority;
let details = self.details.clone();
let bits_per_offset = self.bits_per_offset;
if Self::covers_entire_page(ranges, self.rows_in_page) {
let full_range = self.data_buf_position..(self.data_buf_position + self.data_buf_size);
let page_data = io.submit_single(full_range.clone(), priority);
let load_task = async move {
let page_data = page_data.await?;
let source = FullZipReadSource::PrefetchedPage {
base_offset: full_range.start,
data: LanceBuffer::from_bytes(page_data, 1),
};
let read_ranges = vec![full_range];
let data = source.fetch(&read_ranges, priority).await?;
Self::create_decoder(details, data, num_rows, bits_per_offset)
}
.boxed();
let page_load_task = PageLoadTask {
decoder_fut: load_task,
num_rows,
};
return Ok(vec![page_load_task]);
}
if let Some(cached_state) = &self.cached_state {
let byte_ranges = Self::extract_byte_ranges_from_cached(
&cached_state.rep_index_buffer,
ranges,
rep_index.bytes_per_value,
data_buf_position,
);
let page_load_task = Self::create_page_load_task(
FullZipReadSource::Remote(io.clone()),
byte_ranges,
priority,
num_rows,
details,
bits_per_offset,
);
return Ok(vec![page_load_task]);
}
let rep_ranges = Self::compute_rep_index_ranges(ranges, &rep_index);
let rep_data = io.submit_request(rep_ranges, priority);
let io_clone = io.clone();
let load_task = async move {
let rep_data = rep_data.await?;
let rep_buffer = LanceBuffer::concat(
&rep_data
.into_iter()
.map(|d| LanceBuffer::from_bytes(d, 1))
.collect::<Vec<_>>(),
);
let byte_ranges = Self::extract_byte_ranges_from_pairs(
rep_buffer,
rep_index.bytes_per_value,
data_buf_position,
);
let source = FullZipReadSource::Remote(io_clone);
let data = source.fetch(&byte_ranges, priority).await?;
Self::create_decoder(details, data, num_rows, bits_per_offset)
}
.boxed();
let page_load_task = PageLoadTask {
decoder_fut: load_task,
num_rows,
};
Ok(vec![page_load_task])
}
fn schedule_ranges_simple(
&self,
ranges: &[Range<u64>],
io: &Arc<dyn EncodingsIo>,
) -> Result<Vec<PageLoadTask>> {
let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor else {
unreachable!()
};
let bits_per_value = decompressor.bits_per_value();
assert_eq!(bits_per_value % 8, 0);
let bytes_per_value = bits_per_value / 8;
let bytes_per_cw = self.details.ctrl_word_parser.bytes_per_word();
let total_bytes_per_value = bytes_per_value + bytes_per_cw as u64;
let byte_ranges = ranges
.iter()
.map(|r| {
debug_assert!(r.end <= self.rows_in_page);
let start = self.data_buf_position + r.start * total_bytes_per_value;
let end = self.data_buf_position + r.end * total_bytes_per_value;
start..end
})
.collect::<Vec<_>>();
let page_load_task = Self::create_page_load_task(
FullZipReadSource::Remote(io.clone()),
byte_ranges,
self.priority,
num_rows,
self.details.clone(),
self.bits_per_offset,
);
Ok(vec![page_load_task])
}
}
#[derive(Debug)]
struct FullZipCacheableState {
rep_index_buffer: LanceBuffer,
}
impl DeepSizeOf for FullZipCacheableState {
fn deep_size_of_children(&self, _context: &mut Context) -> usize {
self.rep_index_buffer.len()
}
}
impl CachedPageData for FullZipCacheableState {
fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
self
}
}
impl StructuralPageScheduler for FullZipScheduler {
fn initialize<'a>(
&'a mut self,
io: &Arc<dyn EncodingsIo>,
) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
if self.enable_cache
&& let Some(rep_index) = self.rep_index
{
let total_size = (self.rows_in_page + 1) * rep_index.bytes_per_value;
let rep_index_range = rep_index.buf_position..(rep_index.buf_position + total_size);
let io_clone = io.clone();
return async move {
let rep_index_data = io_clone.submit_request(vec![rep_index_range], 0).await?;
let state = Arc::new(FullZipCacheableState {
rep_index_buffer: LanceBuffer::from_bytes(rep_index_data[0].clone(), 1),
});
self.cached_state = Some(state.clone());
Ok(state as Arc<dyn CachedPageData>)
}
.boxed();
}
std::future::ready(Ok(Arc::new(NoCachedPageData) as Arc<dyn CachedPageData>)).boxed()
}
fn load(&mut self, cache: &Arc<dyn CachedPageData>) {
if let Ok(cached_state) = cache
.clone()
.as_arc_any()
.downcast::<FullZipCacheableState>()
{
self.cached_state = Some(cached_state);
}
}
fn schedule_ranges(
&self,
ranges: &[Range<u64>],
io: &Arc<dyn EncodingsIo>,
) -> Result<Vec<PageLoadTask>> {
if let Some(rep_index) = self.rep_index {
self.schedule_ranges_rep(ranges, io, rep_index)
} else {
self.schedule_ranges_simple(ranges, io)
}
}
}
#[derive(Debug)]
struct FixedFullZipDecoder {
details: Arc<FullZipDecodeDetails>,
data: VecDeque<LanceBuffer>,
offset_in_current: usize,
bytes_per_value: usize,
total_bytes_per_value: usize,
num_rows: u64,
}
impl FixedFullZipDecoder {
fn slice_next_task(&mut self, num_rows: u64) -> FullZipDecodeTaskItem {
debug_assert!(num_rows > 0);
let cur_buf = self.data.front_mut().unwrap();
let start = self.offset_in_current;
if self.details.ctrl_word_parser.has_rep() {
let mut rows_started = 0;
let mut num_items = 0;
while self.offset_in_current < cur_buf.len() {
let control = self.details.ctrl_word_parser.parse_desc(
&cur_buf[self.offset_in_current..],
self.details.max_rep,
self.details.max_visible_def,
);
if control.is_new_row {
if rows_started == num_rows {
break;
}
rows_started += 1;
}
num_items += 1;
if control.is_visible {
self.offset_in_current += self.total_bytes_per_value;
} else {
self.offset_in_current += self.details.ctrl_word_parser.bytes_per_word();
}
}
let task_slice = cur_buf.slice_with_length(start, self.offset_in_current - start);
if self.offset_in_current == cur_buf.len() {
self.data.pop_front();
self.offset_in_current = 0;
}
FullZipDecodeTaskItem {
data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
data: task_slice,
bits_per_value: self.bytes_per_value as u64 * 8,
num_values: num_items,
block_info: BlockInfo::new(),
}),
rows_in_buf: rows_started,
}
} else {
let cur_buf = self.data.front_mut().unwrap();
let bytes_avail = cur_buf.len() - self.offset_in_current;
let offset_in_cur = self.offset_in_current;
let bytes_needed = num_rows as usize * self.total_bytes_per_value;
let mut rows_taken = num_rows;
let task_slice = if bytes_needed >= bytes_avail {
self.offset_in_current = 0;
rows_taken = bytes_avail as u64 / self.total_bytes_per_value as u64;
self.data
.pop_front()
.unwrap()
.slice_with_length(offset_in_cur, bytes_avail)
} else {
self.offset_in_current += bytes_needed;
cur_buf.slice_with_length(offset_in_cur, bytes_needed)
};
FullZipDecodeTaskItem {
data: PerValueDataBlock::Fixed(FixedWidthDataBlock {
data: task_slice,
bits_per_value: self.bytes_per_value as u64 * 8,
num_values: rows_taken,
block_info: BlockInfo::new(),
}),
rows_in_buf: rows_taken,
}
}
}
}
impl StructuralPageDecoder for FixedFullZipDecoder {
fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
let mut task_data = Vec::with_capacity(self.data.len());
let mut remaining = num_rows;
while remaining > 0 {
let task_item = self.slice_next_task(remaining);
remaining -= task_item.rows_in_buf;
task_data.push(task_item);
}
Ok(Box::new(FixedFullZipDecodeTask {
details: self.details.clone(),
data: task_data,
bytes_per_value: self.bytes_per_value,
num_rows: num_rows as usize,
}))
}
fn num_rows(&self) -> u64 {
self.num_rows
}
}
#[derive(Debug)]
struct VariableFullZipDecoder {
details: Arc<FullZipDecodeDetails>,
decompressor: Arc<dyn VariablePerValueDecompressor>,
data: LanceBuffer,
offsets: LanceBuffer,
rep: ScalarBuffer<u16>,
def: ScalarBuffer<u16>,
repdef_starts: Vec<usize>,
data_starts: Vec<usize>,
offset_starts: Vec<usize>,
visible_item_counts: Vec<u64>,
bits_per_offset: u8,
current_idx: usize,
num_rows: u64,
}
impl VariableFullZipDecoder {
fn new(
details: Arc<FullZipDecodeDetails>,
data: VecDeque<LanceBuffer>,
num_rows: u64,
in_bits_per_length: u8,
out_bits_per_offset: u8,
) -> Self {
let decompressor = match details.value_decompressor {
PerValueDecompressor::Variable(ref d) => d.clone(),
_ => unreachable!(),
};
assert_eq!(in_bits_per_length % 8, 0);
assert!(out_bits_per_offset == 32 || out_bits_per_offset == 64);
let mut decoder = Self {
details,
decompressor,
data: LanceBuffer::empty(),
offsets: LanceBuffer::empty(),
rep: LanceBuffer::empty().borrow_to_typed_slice(),
def: LanceBuffer::empty().borrow_to_typed_slice(),
bits_per_offset: out_bits_per_offset,
repdef_starts: Vec::with_capacity(num_rows as usize + 1),
data_starts: Vec::with_capacity(num_rows as usize + 1),
offset_starts: Vec::with_capacity(num_rows as usize + 1),
visible_item_counts: Vec::with_capacity(num_rows as usize + 1),
current_idx: 0,
num_rows,
};
decoder.unzip(data, in_bits_per_length, out_bits_per_offset, num_rows);
decoder
}
fn slice_batch_data_and_rebase_offsets_typed<T>(
data: &LanceBuffer,
offsets: &LanceBuffer,
) -> Result<(LanceBuffer, LanceBuffer)>
where
T: arrow_buffer::ArrowNativeType
+ Copy
+ PartialOrd
+ std::ops::Sub<Output = T>
+ std::fmt::Display
+ TryInto<usize>,
{
let offsets_slice = offsets.borrow_to_typed_slice::<T>();
let offsets_slice = offsets_slice.as_ref();
if offsets_slice.is_empty() {
return Err(Error::internal(
"Variable offsets cannot be empty".to_string(),
));
}
let base = offsets_slice[0];
let end = *offsets_slice.last().unwrap();
if end < base {
return Err(Error::internal(format!(
"Invalid variable offsets: end ({end}) is less than base ({base})"
)));
}
let data_start = base.try_into().map_err(|_| {
Error::internal(format!("Variable offset ({base}) does not fit into usize"))
})?;
let data_end = end.try_into().map_err(|_| {
Error::internal(format!("Variable offset ({end}) does not fit into usize"))
})?;
if data_end > data.len() {
return Err(Error::internal(format!(
"Invalid variable offsets: end ({data_end}) exceeds data len ({})",
data.len()
)));
}
let mut rebased_offsets = Vec::with_capacity(offsets_slice.len());
for &offset in offsets_slice {
if offset < base {
return Err(Error::internal(format!(
"Invalid variable offsets: offset ({offset}) is less than base ({base})"
)));
}
rebased_offsets.push(offset - base);
}
let sliced_data = data.slice_with_length(data_start, data_end - data_start);
let sliced_data = LanceBuffer::copy_slice(&sliced_data);
let rebased_offsets = LanceBuffer::reinterpret_vec(rebased_offsets);
Ok((sliced_data, rebased_offsets))
}
fn slice_batch_data_and_rebase_offsets(
data: &LanceBuffer,
offsets: &LanceBuffer,
bits_per_offset: u8,
) -> Result<(LanceBuffer, LanceBuffer)> {
match bits_per_offset {
32 => Self::slice_batch_data_and_rebase_offsets_typed::<u32>(data, offsets),
64 => Self::slice_batch_data_and_rebase_offsets_typed::<u64>(data, offsets),
_ => Err(Error::internal(format!(
"Unsupported bits_per_offset={bits_per_offset}"
))),
}
}
unsafe fn parse_length(data: &[u8], bits_per_offset: u8) -> u64 {
match bits_per_offset {
8 => *data.get_unchecked(0) as u64,
16 => u16::from_le_bytes([*data.get_unchecked(0), *data.get_unchecked(1)]) as u64,
32 => u32::from_le_bytes([
*data.get_unchecked(0),
*data.get_unchecked(1),
*data.get_unchecked(2),
*data.get_unchecked(3),
]) as u64,
64 => u64::from_le_bytes([
*data.get_unchecked(0),
*data.get_unchecked(1),
*data.get_unchecked(2),
*data.get_unchecked(3),
*data.get_unchecked(4),
*data.get_unchecked(5),
*data.get_unchecked(6),
*data.get_unchecked(7),
]),
_ => unreachable!(),
}
}
fn unzip(
&mut self,
data: VecDeque<LanceBuffer>,
in_bits_per_length: u8,
out_bits_per_offset: u8,
num_rows: u64,
) {
let mut rep = Vec::with_capacity(num_rows as usize);
let mut def = Vec::with_capacity(num_rows as usize);
let bytes_cw = self.details.ctrl_word_parser.bytes_per_word() * num_rows as usize;
let bytes_per_offset = out_bits_per_offset as usize / 8;
let bytes_offsets = bytes_per_offset * (num_rows as usize + 1);
let mut offsets_data = Vec::with_capacity(bytes_offsets);
let bytes_per_length = in_bits_per_length as usize / 8;
let bytes_lengths = bytes_per_length * num_rows as usize;
let bytes_data = data.iter().map(|d| d.len()).sum::<usize>();
let mut unzipped_data =
Vec::with_capacity((bytes_data - bytes_cw).saturating_sub(bytes_lengths));
let mut current_offset = 0_u64;
let mut visible_item_count = 0_u64;
for databuf in data.into_iter() {
let mut databuf = databuf.as_ref();
while !databuf.is_empty() {
let data_start = unzipped_data.len();
let offset_start = offsets_data.len();
let repdef_start = rep.len().max(def.len());
let ctrl_desc = self.details.ctrl_word_parser.parse_desc(
databuf,
self.details.max_rep,
self.details.max_visible_def,
);
self.details
.ctrl_word_parser
.parse(databuf, &mut rep, &mut def);
databuf = &databuf[self.details.ctrl_word_parser.bytes_per_word()..];
if ctrl_desc.is_new_row {
self.repdef_starts.push(repdef_start);
self.data_starts.push(data_start);
self.offset_starts.push(offset_start);
self.visible_item_counts.push(visible_item_count);
}
if ctrl_desc.is_visible {
visible_item_count += 1;
if ctrl_desc.is_valid_item {
debug_assert!(databuf.len() >= bytes_per_length);
let length = unsafe { Self::parse_length(databuf, in_bits_per_length) };
match out_bits_per_offset {
32 => offsets_data
.extend_from_slice(&(current_offset as u32).to_le_bytes()),
64 => offsets_data.extend_from_slice(¤t_offset.to_le_bytes()),
_ => unreachable!(),
};
databuf = &databuf[bytes_per_offset..];
unzipped_data.extend_from_slice(&databuf[..length as usize]);
databuf = &databuf[length as usize..];
current_offset += length;
} else {
match out_bits_per_offset {
32 => offsets_data
.extend_from_slice(&(current_offset as u32).to_le_bytes()),
64 => offsets_data.extend_from_slice(¤t_offset.to_le_bytes()),
_ => unreachable!(),
}
}
}
}
}
self.repdef_starts.push(rep.len().max(def.len()));
self.data_starts.push(unzipped_data.len());
self.offset_starts.push(offsets_data.len());
self.visible_item_counts.push(visible_item_count);
match out_bits_per_offset {
32 => offsets_data.extend_from_slice(&(current_offset as u32).to_le_bytes()),
64 => offsets_data.extend_from_slice(¤t_offset.to_le_bytes()),
_ => unreachable!(),
};
self.rep = ScalarBuffer::from(rep);
self.def = ScalarBuffer::from(def);
self.data = LanceBuffer::from(unzipped_data);
self.offsets = LanceBuffer::from(offsets_data);
}
}
impl StructuralPageDecoder for VariableFullZipDecoder {
fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
let start = self.current_idx;
let end = start + num_rows as usize;
let offset_start = self.offset_starts[start];
let offset_end = self.offset_starts[end] + (self.bits_per_offset as usize / 8);
let offsets = self
.offsets
.slice_with_length(offset_start, offset_end - offset_start);
let (data, offsets) =
Self::slice_batch_data_and_rebase_offsets(&self.data, &offsets, self.bits_per_offset)?;
let repdef_start = self.repdef_starts[start];
let repdef_end = self.repdef_starts[end];
let rep = if self.rep.is_empty() {
self.rep.clone()
} else {
self.rep.slice(repdef_start, repdef_end - repdef_start)
};
let def = if self.def.is_empty() {
self.def.clone()
} else {
self.def.slice(repdef_start, repdef_end - repdef_start)
};
let visible_item_counts_start = self.visible_item_counts[start];
let visible_item_counts_end = self.visible_item_counts[end];
let num_visible_items = visible_item_counts_end - visible_item_counts_start;
self.current_idx += num_rows as usize;
Ok(Box::new(VariableFullZipDecodeTask {
details: self.details.clone(),
decompressor: self.decompressor.clone(),
data,
offsets,
bits_per_offset: self.bits_per_offset,
num_visible_items,
rep,
def,
}))
}
fn num_rows(&self) -> u64 {
self.num_rows
}
}
#[derive(Debug)]
struct VariableFullZipDecodeTask {
details: Arc<FullZipDecodeDetails>,
decompressor: Arc<dyn VariablePerValueDecompressor>,
data: LanceBuffer,
offsets: LanceBuffer,
bits_per_offset: u8,
num_visible_items: u64,
rep: ScalarBuffer<u16>,
def: ScalarBuffer<u16>,
}
impl DecodePageTask for VariableFullZipDecodeTask {
fn decode(self: Box<Self>) -> Result<DecodedPage> {
let block = VariableWidthBlock {
data: self.data,
offsets: self.offsets,
bits_per_offset: self.bits_per_offset,
num_values: self.num_visible_items,
block_info: BlockInfo::new(),
};
let decomopressed = self.decompressor.decompress(block)?;
let rep = if self.rep.is_empty() {
None
} else {
Some(self.rep.to_vec())
};
let def = if self.def.is_empty() {
None
} else {
Some(self.def.to_vec())
};
let unraveler = RepDefUnraveler::new(
rep,
def,
self.details.def_meaning.clone(),
self.num_visible_items,
);
Ok(DecodedPage {
data: decomopressed,
repdef: unraveler,
})
}
}
#[derive(Debug)]
struct FullZipDecodeTaskItem {
data: PerValueDataBlock,
rows_in_buf: u64,
}
#[derive(Debug)]
struct FixedFullZipDecodeTask {
details: Arc<FullZipDecodeDetails>,
data: Vec<FullZipDecodeTaskItem>,
num_rows: usize,
bytes_per_value: usize,
}
impl DecodePageTask for FixedFullZipDecodeTask {
fn decode(self: Box<Self>) -> Result<DecodedPage> {
let estimated_size_bytes = self
.data
.iter()
.map(|task_item| task_item.data.data_size() as usize)
.sum::<usize>()
* 2;
let mut data_builder =
DataBlockBuilder::with_capacity_estimate(estimated_size_bytes as u64);
if self.details.ctrl_word_parser.bytes_per_word() == 0 {
for task_item in self.data.into_iter() {
let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
unreachable!()
};
let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
else {
unreachable!()
};
debug_assert_eq!(fixed_data.num_values, task_item.rows_in_buf);
let decompressed = decompressor.decompress(fixed_data, task_item.rows_in_buf)?;
data_builder.append(&decompressed, 0..task_item.rows_in_buf);
}
let unraveler = RepDefUnraveler::new(
None,
None,
self.details.def_meaning.clone(),
self.num_rows as u64,
);
Ok(DecodedPage {
data: data_builder.finish(),
repdef: unraveler,
})
} else {
let mut rep = Vec::with_capacity(self.num_rows);
let mut def = Vec::with_capacity(self.num_rows);
for task_item in self.data.into_iter() {
let PerValueDataBlock::Fixed(fixed_data) = task_item.data else {
unreachable!()
};
let mut buf_slice = fixed_data.data.as_ref();
let num_values = fixed_data.num_values as usize;
let mut values = Vec::with_capacity(
fixed_data.data.len()
- (self.details.ctrl_word_parser.bytes_per_word() * num_values),
);
let mut visible_items = 0;
for _ in 0..num_values {
self.details
.ctrl_word_parser
.parse(buf_slice, &mut rep, &mut def);
buf_slice = &buf_slice[self.details.ctrl_word_parser.bytes_per_word()..];
let is_visible = def
.last()
.map(|d| *d <= self.details.max_visible_def)
.unwrap_or(true);
if is_visible {
values.extend_from_slice(buf_slice[..self.bytes_per_value].as_ref());
buf_slice = &buf_slice[self.bytes_per_value..];
visible_items += 1;
}
}
let values_buf = LanceBuffer::from(values);
let fixed_data = FixedWidthDataBlock {
bits_per_value: self.bytes_per_value as u64 * 8,
block_info: BlockInfo::new(),
data: values_buf,
num_values: visible_items,
};
let PerValueDecompressor::Fixed(decompressor) = &self.details.value_decompressor
else {
unreachable!()
};
let decompressed = decompressor.decompress(fixed_data, visible_items)?;
data_builder.append(&decompressed, 0..visible_items);
}
let repetition = if rep.is_empty() { None } else { Some(rep) };
let definition = if def.is_empty() { None } else { Some(def) };
let unraveler = RepDefUnraveler::new(
repetition,
definition,
self.details.def_meaning.clone(),
self.num_rows as u64,
);
let data = data_builder.finish();
Ok(DecodedPage {
data,
repdef: unraveler,
})
}
}
}
#[derive(Debug)]
struct StructuralPrimitiveFieldSchedulingJob<'a> {
scheduler: &'a StructuralPrimitiveFieldScheduler,
ranges: Vec<Range<u64>>,
page_idx: usize,
range_idx: usize,
global_row_offset: u64,
}
impl<'a> StructuralPrimitiveFieldSchedulingJob<'a> {
pub fn new(scheduler: &'a StructuralPrimitiveFieldScheduler, ranges: Vec<Range<u64>>) -> Self {
Self {
scheduler,
ranges,
page_idx: 0,
range_idx: 0,
global_row_offset: 0,
}
}
}
impl StructuralSchedulingJob for StructuralPrimitiveFieldSchedulingJob<'_> {
fn schedule_next(&mut self, context: &mut SchedulerContext) -> Result<Vec<ScheduledScanLine>> {
if self.range_idx >= self.ranges.len() {
return Ok(Vec::new());
}
let mut range = self.ranges[self.range_idx].clone();
let priority = range.start;
let mut cur_page = &self.scheduler.page_schedulers[self.page_idx];
trace!(
"Current range is {:?} and current page has {} rows",
range, cur_page.num_rows
);
while cur_page.num_rows + self.global_row_offset <= range.start {
self.global_row_offset += cur_page.num_rows;
self.page_idx += 1;
trace!("Skipping entire page of {} rows", cur_page.num_rows);
cur_page = &self.scheduler.page_schedulers[self.page_idx];
}
let mut ranges_in_page = Vec::new();
while cur_page.num_rows + self.global_row_offset > range.start {
range.start = range.start.max(self.global_row_offset);
let start_in_page = range.start - self.global_row_offset;
let end_in_page = start_in_page + (range.end - range.start);
let end_in_page = end_in_page.min(cur_page.num_rows);
let last_in_range = (end_in_page + self.global_row_offset) >= range.end;
ranges_in_page.push(start_in_page..end_in_page);
if last_in_range {
self.range_idx += 1;
if self.range_idx == self.ranges.len() {
break;
}
range = self.ranges[self.range_idx].clone();
} else {
break;
}
}
trace!(
"Scheduling {} rows across {} ranges from page with {} rows (priority={}, column_index={}, page_index={})",
ranges_in_page.iter().map(|r| r.end - r.start).sum::<u64>(),
ranges_in_page.len(),
cur_page.num_rows,
priority,
self.scheduler.column_index,
cur_page.page_index,
);
self.global_row_offset += cur_page.num_rows;
self.page_idx += 1;
let page_decoders = cur_page
.scheduler
.schedule_ranges(&ranges_in_page, context.io())?;
let cur_path = context.current_path();
page_decoders
.into_iter()
.map(|page_load_task| {
let cur_path = cur_path.clone();
let page_decoder = page_load_task.decoder_fut;
let unloaded_page = async move {
let page_decoder = page_decoder.await?;
Ok(LoadedPageShard {
decoder: page_decoder,
path: cur_path,
})
}
.boxed();
Ok(ScheduledScanLine {
decoders: vec![MessageType::UnloadedPage(UnloadedPageShard(unloaded_page))],
rows_scheduled: page_load_task.num_rows,
})
})
.collect::<Result<Vec<_>>>()
}
}
#[derive(Debug)]
struct PageInfoAndScheduler {
page_index: usize,
num_rows: u64,
scheduler: Box<dyn StructuralPageScheduler>,
}
#[derive(Debug)]
pub struct StructuralPrimitiveFieldScheduler {
page_schedulers: Vec<PageInfoAndScheduler>,
column_index: u32,
}
impl StructuralPrimitiveFieldScheduler {
pub fn try_new(
column_info: &ColumnInfo,
decompressors: &dyn DecompressionStrategy,
cache_repetition_index: bool,
target_field: &Field,
) -> Result<Self> {
let page_schedulers = column_info
.page_infos
.iter()
.enumerate()
.map(|(page_index, page_info)| {
Self::page_info_to_scheduler(
page_info,
page_index,
decompressors,
cache_repetition_index,
target_field,
)
})
.collect::<Result<Vec<_>>>()?;
Ok(Self {
page_schedulers,
column_index: column_info.index,
})
}
fn page_layout_to_scheduler(
page_info: &PageInfo,
page_layout: &PageLayout,
decompressors: &dyn DecompressionStrategy,
cache_repetition_index: bool,
target_field: &Field,
) -> Result<Box<dyn StructuralPageScheduler>> {
use pb21::page_layout::Layout;
Ok(match page_layout.layout.as_ref().expect_ok()? {
Layout::MiniBlockLayout(mini_block) => Box::new(MiniBlockScheduler::try_new(
&page_info.buffer_offsets_and_sizes,
page_info.priority,
mini_block.num_items,
mini_block,
decompressors,
)?),
Layout::FullZipLayout(full_zip) => {
let mut scheduler = FullZipScheduler::try_new(
&page_info.buffer_offsets_and_sizes,
page_info.priority,
page_info.num_rows,
full_zip,
decompressors,
)?;
scheduler.enable_cache = cache_repetition_index;
Box::new(scheduler)
}
Layout::ConstantLayout(constant_layout) => {
let def_meaning = constant_layout
.layers
.iter()
.map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
.collect::<Vec<_>>();
let has_scalar_value = constant_layout.inline_value.is_some()
|| page_info.buffer_offsets_and_sizes.len() == 1
|| page_info.buffer_offsets_and_sizes.len() == 3;
if has_scalar_value {
Box::new(constant::ConstantPageScheduler::try_new(
page_info.buffer_offsets_and_sizes.clone(),
constant_layout.inline_value.clone(),
target_field.data_type(),
def_meaning.into(),
)?) as Box<dyn StructuralPageScheduler>
} else if def_meaning.len() == 1
&& def_meaning[0] == DefinitionInterpretation::NullableItem
{
Box::new(SimpleAllNullScheduler::default()) as Box<dyn StructuralPageScheduler>
} else {
let rep_decompressor = constant_layout
.rep_compression
.as_ref()
.map(|encoding| decompressors.create_block_decompressor(encoding))
.transpose()?
.map(Arc::from);
let def_decompressor = constant_layout
.def_compression
.as_ref()
.map(|encoding| decompressors.create_block_decompressor(encoding))
.transpose()?
.map(Arc::from);
Box::new(ComplexAllNullScheduler::new(
page_info.buffer_offsets_and_sizes.clone(),
def_meaning.into(),
rep_decompressor,
def_decompressor,
constant_layout.num_rep_values,
constant_layout.num_def_values,
)) as Box<dyn StructuralPageScheduler>
}
}
Layout::BlobLayout(blob) => {
let inner_scheduler = Self::page_layout_to_scheduler(
page_info,
blob.inner_layout.as_ref().expect_ok()?.as_ref(),
decompressors,
cache_repetition_index,
target_field,
)?;
let def_meaning = blob
.layers
.iter()
.map(|l| ProtobufUtils21::repdef_layer_to_def_interp(*l))
.collect::<Vec<_>>();
if matches!(target_field.data_type(), DataType::Struct(_)) {
Box::new(BlobDescriptionPageScheduler::new(
inner_scheduler,
def_meaning.into(),
))
} else {
Box::new(BlobPageScheduler::new(
inner_scheduler,
page_info.priority,
page_info.num_rows,
def_meaning.into(),
))
}
}
})
}
fn page_info_to_scheduler(
page_info: &PageInfo,
page_index: usize,
decompressors: &dyn DecompressionStrategy,
cache_repetition_index: bool,
target_field: &Field,
) -> Result<PageInfoAndScheduler> {
let page_layout = page_info.encoding.as_structural();
let scheduler = Self::page_layout_to_scheduler(
page_info,
page_layout,
decompressors,
cache_repetition_index,
target_field,
)?;
Ok(PageInfoAndScheduler {
page_index,
num_rows: page_info.num_rows,
scheduler,
})
}
}
pub trait CachedPageData: Any + Send + Sync + DeepSizeOf + 'static {
fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static>;
}
pub struct NoCachedPageData;
impl DeepSizeOf for NoCachedPageData {
fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
0
}
}
impl CachedPageData for NoCachedPageData {
fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
self
}
}
pub struct CachedFieldData {
pages: Vec<Arc<dyn CachedPageData>>,
}
impl DeepSizeOf for CachedFieldData {
fn deep_size_of_children(&self, ctx: &mut Context) -> usize {
self.pages.deep_size_of_children(ctx)
}
}
#[derive(Debug, Clone)]
pub struct FieldDataCacheKey {
pub column_index: u32,
}
impl CacheKey for FieldDataCacheKey {
type ValueType = CachedFieldData;
fn key(&self) -> std::borrow::Cow<'_, str> {
self.column_index.to_string().into()
}
}
impl StructuralFieldScheduler for StructuralPrimitiveFieldScheduler {
fn initialize<'a>(
&'a mut self,
_filter: &'a FilterExpression,
context: &'a SchedulerContext,
) -> BoxFuture<'a, Result<()>> {
let cache_key = FieldDataCacheKey {
column_index: self.column_index,
};
let cache = context.cache().clone();
async move {
if let Some(cached_data) = cache.get_with_key(&cache_key).await {
self.page_schedulers
.iter_mut()
.zip(cached_data.pages.iter())
.for_each(|(page_scheduler, cached_data)| {
page_scheduler.scheduler.load(cached_data);
});
return Ok(());
}
let page_data = self
.page_schedulers
.iter_mut()
.map(|s| s.scheduler.initialize(context.io()))
.collect::<FuturesOrdered<_>>();
let page_data = page_data.try_collect::<Vec<_>>().await?;
let cached_data = Arc::new(CachedFieldData { pages: page_data });
cache.insert_with_key(&cache_key, cached_data).await;
Ok(())
}
.boxed()
}
fn schedule_ranges<'a>(
&'a self,
ranges: &[Range<u64>],
_filter: &FilterExpression,
) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
let ranges = ranges.to_vec();
Ok(Box::new(StructuralPrimitiveFieldSchedulingJob::new(
self, ranges,
)))
}
}
#[derive(Debug)]
pub struct StructuralCompositeDecodeArrayTask {
tasks: Vec<Box<dyn DecodePageTask>>,
should_validate: bool,
data_type: DataType,
}
impl StructuralCompositeDecodeArrayTask {
fn restore_validity(
array: Arc<dyn Array>,
unraveler: &mut CompositeRepDefUnraveler,
) -> Arc<dyn Array> {
let validity = unraveler.unravel_validity(array.len());
let Some(validity) = validity else {
return array;
};
if array.data_type() == &DataType::Null {
return array;
}
assert_eq!(validity.len(), array.len());
make_array(unsafe {
array
.to_data()
.into_builder()
.nulls(Some(validity))
.build_unchecked()
})
}
}
impl StructuralDecodeArrayTask for StructuralCompositeDecodeArrayTask {
fn decode(self: Box<Self>) -> Result<DecodedArray> {
let mut arrays = Vec::with_capacity(self.tasks.len());
let mut unravelers = Vec::with_capacity(self.tasks.len());
for task in self.tasks {
let decoded = task.decode()?;
unravelers.push(decoded.repdef);
let array = make_array(
decoded
.data
.into_arrow(self.data_type.clone(), self.should_validate)?,
);
arrays.push(array);
}
let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
let array = arrow_select::concat::concat(&array_refs)?;
let mut repdef = CompositeRepDefUnraveler::new(unravelers);
let array = Self::restore_validity(array, &mut repdef);
Ok(DecodedArray { array, repdef })
}
}
#[derive(Debug)]
pub struct StructuralPrimitiveFieldDecoder {
field: Arc<ArrowField>,
page_decoders: VecDeque<Box<dyn StructuralPageDecoder>>,
should_validate: bool,
rows_drained_in_current: u64,
}
impl StructuralPrimitiveFieldDecoder {
pub fn new(field: &Arc<ArrowField>, should_validate: bool) -> Self {
Self {
field: field.clone(),
page_decoders: VecDeque::new(),
should_validate,
rows_drained_in_current: 0,
}
}
}
impl StructuralFieldDecoder for StructuralPrimitiveFieldDecoder {
fn accept_page(&mut self, child: LoadedPageShard) -> Result<()> {
assert!(child.path.is_empty());
self.page_decoders.push_back(child.decoder);
Ok(())
}
fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
let mut remaining = num_rows;
let mut tasks = Vec::new();
while remaining > 0 {
let cur_page = self.page_decoders.front_mut().unwrap();
let num_in_page = cur_page.num_rows() - self.rows_drained_in_current;
let to_take = num_in_page.min(remaining);
let task = cur_page.drain(to_take)?;
tasks.push(task);
if to_take == num_in_page {
self.page_decoders.pop_front();
self.rows_drained_in_current = 0;
} else {
self.rows_drained_in_current += to_take;
}
remaining -= to_take;
}
Ok(Box::new(StructuralCompositeDecodeArrayTask {
tasks,
should_validate: self.should_validate,
data_type: self.field.data_type().clone(),
}))
}
fn data_type(&self) -> &DataType {
self.field.data_type()
}
}
struct SerializedFullZip {
values: LanceBuffer,
repetition_index: Option<LanceBuffer>,
}
const MINIBLOCK_ALIGNMENT: usize = 8;
pub struct PrimitiveStructuralEncoder {
accumulation_queue: AccumulationQueue,
keep_original_array: bool,
support_large_chunk: bool,
accumulated_repdefs: Vec<RepDefBuilder>,
compression_strategy: Arc<dyn CompressionStrategy>,
column_index: u32,
field: Field,
encoding_metadata: Arc<HashMap<String, String>>,
version: LanceFileVersion,
}
struct CompressedLevelsChunk {
data: LanceBuffer,
num_levels: u16,
}
struct CompressedLevels {
data: Vec<CompressedLevelsChunk>,
compression: CompressiveEncoding,
rep_index: Option<LanceBuffer>,
}
struct SerializedMiniBlockPage {
num_buffers: u64,
data: LanceBuffer,
metadata: LanceBuffer,
}
#[derive(Debug, Clone, Copy)]
struct DictEncodingBudget {
max_dict_entries: u32,
max_encoded_size: usize,
}
impl PrimitiveStructuralEncoder {
pub fn try_new(
options: &EncodingOptions,
compression_strategy: Arc<dyn CompressionStrategy>,
column_index: u32,
field: Field,
encoding_metadata: Arc<HashMap<String, String>>,
) -> Result<Self> {
Ok(Self {
accumulation_queue: AccumulationQueue::new(
options.cache_bytes_per_column,
column_index,
options.keep_original_array,
),
support_large_chunk: options.support_large_chunk(),
keep_original_array: options.keep_original_array,
accumulated_repdefs: Vec::new(),
column_index,
compression_strategy,
field,
encoding_metadata,
version: options.version,
})
}
fn is_narrow(data_block: &DataBlock) -> bool {
const MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE: u64 = 256;
if let Some(max_len_array) = data_block.get_stat(Stat::MaxLength) {
let max_len_array = max_len_array
.as_any()
.downcast_ref::<PrimitiveArray<UInt64Type>>()
.unwrap();
if max_len_array.value(0) < MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE {
return true;
}
}
false
}
fn prefers_miniblock(
data_block: &DataBlock,
encoding_metadata: &HashMap<String, String>,
) -> bool {
if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
return user_requested.to_lowercase() == STRUCTURAL_ENCODING_MINIBLOCK;
}
Self::is_narrow(data_block)
}
fn repdef_too_sparse_for_miniblock(
repdef: &crate::repdef::SerializedRepDefs,
num_values: u64,
) -> bool {
if num_values == 0 {
return false;
}
let num_levels = repdef
.repetition_levels
.as_ref()
.map(|r| r.len() as u64)
.max(repdef.definition_levels.as_ref().map(|d| d.len() as u64))
.unwrap_or(0);
if num_levels == 0 {
return false;
}
let bits_per_rep = repdef
.repetition_levels
.as_ref()
.and_then(|r| r.iter().max().copied())
.map(|max_val| u16::BITS - max_val.leading_zeros())
.unwrap_or(0) as u64;
let bits_per_def = repdef
.definition_levels
.as_ref()
.and_then(|d| d.iter().max().copied())
.map(|max_val| u16::BITS - max_val.leading_zeros())
.unwrap_or(0) as u64;
let bits_per_level = bits_per_rep + bits_per_def;
if bits_per_level == 0 {
return false;
}
const REPDEF_BUDGET_BITS: u64 = 16 * 1024 * 8;
let max_levels_per_chunk = REPDEF_BUDGET_BITS / bits_per_level;
let levels_per_chunk =
(num_levels as f64 / num_values as f64) * miniblock::MAX_MINIBLOCK_VALUES as f64;
levels_per_chunk > max_levels_per_chunk as f64
}
fn prefers_fullzip(encoding_metadata: &HashMap<String, String>) -> bool {
if let Some(user_requested) = encoding_metadata.get(STRUCTURAL_ENCODING_META_KEY) {
return user_requested.to_lowercase() == STRUCTURAL_ENCODING_FULLZIP;
}
true
}
fn serialize_miniblocks(
miniblocks: MiniBlockCompressed,
rep: Option<Vec<CompressedLevelsChunk>>,
def: Option<Vec<CompressedLevelsChunk>>,
support_large_chunk: bool,
) -> Result<SerializedMiniBlockPage> {
let bytes_rep = rep
.as_ref()
.map(|rep| rep.iter().map(|r| r.data.len()).sum::<usize>())
.unwrap_or(0);
let bytes_def = def
.as_ref()
.map(|def| def.iter().map(|d| d.data.len()).sum::<usize>())
.unwrap_or(0);
let bytes_data = miniblocks.data.iter().map(|d| d.len()).sum::<usize>();
let mut num_buffers = miniblocks.data.len();
if rep.is_some() {
num_buffers += 1;
}
if def.is_some() {
num_buffers += 1;
}
let max_extra = 9 * num_buffers;
let mut data_buffer = Vec::with_capacity(bytes_rep + bytes_def + bytes_data + max_extra);
let chunk_size_bytes = if support_large_chunk { 4 } else { 2 };
let mut meta_buffer = Vec::with_capacity(miniblocks.chunks.len() * chunk_size_bytes);
let mut rep_iter = rep.map(|r| r.into_iter());
let mut def_iter = def.map(|d| d.into_iter());
let mut buffer_offsets = vec![0; miniblocks.data.len()];
for chunk in miniblocks.chunks {
let start_pos = data_buffer.len();
debug_assert_eq!(start_pos % MINIBLOCK_ALIGNMENT, 0);
let rep = rep_iter.as_mut().map(|r| r.next().unwrap());
let def = def_iter.as_mut().map(|d| d.next().unwrap());
let num_levels = rep
.as_ref()
.map(|r| r.num_levels)
.unwrap_or(def.as_ref().map(|d| d.num_levels).unwrap_or(0));
data_buffer.extend_from_slice(&num_levels.to_le_bytes());
if let Some(rep) = rep.as_ref() {
let bytes_rep = u16::try_from(rep.data.len()).map_err(|_| {
Error::internal(format!(
"Repetition buffer size ({} bytes) too large",
rep.data.len()
))
})?;
data_buffer.extend_from_slice(&bytes_rep.to_le_bytes());
}
if let Some(def) = def.as_ref() {
let bytes_def = u16::try_from(def.data.len()).map_err(|_| {
Error::internal(format!(
"Definition buffer size ({} bytes) too large",
def.data.len()
))
})?;
data_buffer.extend_from_slice(&bytes_def.to_le_bytes());
}
if support_large_chunk {
for &buffer_size in &chunk.buffer_sizes {
data_buffer.extend_from_slice(&buffer_size.to_le_bytes());
}
} else {
for &buffer_size in &chunk.buffer_sizes {
data_buffer.extend_from_slice(&(buffer_size as u16).to_le_bytes());
}
}
let add_padding = |data_buffer: &mut Vec<u8>| {
let pad = pad_bytes::<MINIBLOCK_ALIGNMENT>(data_buffer.len());
data_buffer.extend(iter::repeat_n(FILL_BYTE, pad));
};
add_padding(&mut data_buffer);
if let Some(rep) = rep.as_ref() {
data_buffer.extend_from_slice(&rep.data);
add_padding(&mut data_buffer);
}
if let Some(def) = def.as_ref() {
data_buffer.extend_from_slice(&def.data);
add_padding(&mut data_buffer);
}
for (buffer_size, (buffer, buffer_offset)) in chunk
.buffer_sizes
.iter()
.zip(miniblocks.data.iter().zip(buffer_offsets.iter_mut()))
{
let start = *buffer_offset;
let end = start + *buffer_size as usize;
*buffer_offset += *buffer_size as usize;
data_buffer.extend_from_slice(&buffer[start..end]);
add_padding(&mut data_buffer);
}
let chunk_bytes = data_buffer.len() - start_pos;
let max_chunk_size = if support_large_chunk {
4 * 1024 * 1024 * 1024 } else {
32 * 1024 };
assert!(chunk_bytes <= max_chunk_size);
assert!(chunk_bytes > 0);
assert_eq!(chunk_bytes % 8, 0);
assert!(chunk.log_num_values <= 12);
let divided_bytes = chunk_bytes / MINIBLOCK_ALIGNMENT;
let divided_bytes_minus_one = (divided_bytes - 1) as u64;
let metadata = (divided_bytes_minus_one << 4) | chunk.log_num_values as u64;
if support_large_chunk {
meta_buffer.extend_from_slice(&(metadata as u32).to_le_bytes());
} else {
meta_buffer.extend_from_slice(&(metadata as u16).to_le_bytes());
}
}
let data_buffer = LanceBuffer::from(data_buffer);
let metadata_buffer = LanceBuffer::from(meta_buffer);
Ok(SerializedMiniBlockPage {
num_buffers: miniblocks.data.len() as u64,
data: data_buffer,
metadata: metadata_buffer,
})
}
fn compress_levels(
mut levels: RepDefSlicer<'_>,
num_elements: u64,
compression_strategy: &dyn CompressionStrategy,
chunks: &[MiniBlockChunk],
max_rep: u16,
) -> Result<CompressedLevels> {
let mut rep_index = if max_rep > 0 {
Vec::with_capacity(chunks.len())
} else {
vec![]
};
let num_levels = levels.num_levels() as u64;
let levels_buf = levels.all_levels().clone();
let mut fixed_width_block = FixedWidthDataBlock {
data: levels_buf,
bits_per_value: 16,
num_values: num_levels,
block_info: BlockInfo::new(),
};
fixed_width_block.compute_stat();
let levels_block = DataBlock::FixedWidth(fixed_width_block);
let levels_field = Field::new_arrow("", DataType::UInt16, false)?;
let (compressor, compressor_desc) =
compression_strategy.create_block_compressor(&levels_field, &levels_block)?;
let mut level_chunks = Vec::with_capacity(chunks.len());
let mut values_counter = 0;
for (chunk_idx, chunk) in chunks.iter().enumerate() {
let chunk_num_values = chunk.num_values(values_counter, num_elements);
debug_assert!(chunk_num_values > 0);
values_counter += chunk_num_values;
let chunk_levels = if chunk_idx < chunks.len() - 1 {
levels.slice_next(chunk_num_values as usize)
} else {
levels.slice_rest()
};
let num_chunk_levels = (chunk_levels.len() / 2) as u64;
if max_rep > 0 {
let rep_values = chunk_levels.borrow_to_typed_slice::<u16>();
let rep_values = rep_values.as_ref();
let mut num_rows = rep_values.iter().skip(1).filter(|v| **v == max_rep).count();
let num_leftovers = if chunk_idx < chunks.len() - 1 {
rep_values
.iter()
.rev()
.position(|v| *v == max_rep)
.map(|pos| pos + 1)
.unwrap_or(rep_values.len())
} else {
0
};
if chunk_idx != 0 && rep_values.first() == Some(&max_rep) {
let rep_len = rep_index.len();
if rep_index[rep_len - 1] != 0 {
rep_index[rep_len - 2] += 1;
rep_index[rep_len - 1] = 0;
}
}
if chunk_idx == chunks.len() - 1 {
num_rows += 1;
}
rep_index.push(num_rows as u64);
rep_index.push(num_leftovers as u64);
}
let mut chunk_fixed_width = FixedWidthDataBlock {
data: chunk_levels,
bits_per_value: 16,
num_values: num_chunk_levels,
block_info: BlockInfo::new(),
};
chunk_fixed_width.compute_stat();
let chunk_levels_block = DataBlock::FixedWidth(chunk_fixed_width);
let compressed_levels = compressor.compress(chunk_levels_block)?;
level_chunks.push(CompressedLevelsChunk {
data: compressed_levels,
num_levels: num_chunk_levels as u16,
});
}
debug_assert_eq!(levels.num_levels_remaining(), 0);
let rep_index = if rep_index.is_empty() {
None
} else {
Some(LanceBuffer::reinterpret_vec(rep_index))
};
Ok(CompressedLevels {
data: level_chunks,
compression: compressor_desc,
rep_index,
})
}
fn encode_simple_all_null(
column_idx: u32,
num_rows: u64,
row_number: u64,
) -> Result<EncodedPage> {
let description =
ProtobufUtils21::constant_layout(&[DefinitionInterpretation::NullableItem], None);
Ok(EncodedPage {
column_idx,
data: vec![],
description: PageEncoding::Structural(description),
num_rows,
row_number,
})
}
fn encode_complex_all_null_vals(
data: &Arc<[u16]>,
compression_strategy: &dyn CompressionStrategy,
) -> Result<(LanceBuffer, pb21::CompressiveEncoding)> {
let buffer = LanceBuffer::reinterpret_slice(data.clone());
let mut fixed_width_block = FixedWidthDataBlock {
data: buffer,
bits_per_value: 16,
num_values: data.len() as u64,
block_info: BlockInfo::new(),
};
fixed_width_block.compute_stat();
let levels_block = DataBlock::FixedWidth(fixed_width_block);
let levels_field = Field::new_arrow("", DataType::UInt16, false)?;
let (compressor, encoding) =
compression_strategy.create_block_compressor(&levels_field, &levels_block)?;
let compressed_buffer = compressor.compress(levels_block)?;
Ok((compressed_buffer, encoding))
}
fn encode_complex_all_null(
column_idx: u32,
repdef: crate::repdef::SerializedRepDefs,
row_number: u64,
num_rows: u64,
version: LanceFileVersion,
compression_strategy: &dyn CompressionStrategy,
) -> Result<EncodedPage> {
if version.resolve() < LanceFileVersion::V2_2 {
let rep_bytes = if let Some(rep) = repdef.repetition_levels.as_ref() {
LanceBuffer::reinterpret_slice(rep.clone())
} else {
LanceBuffer::empty()
};
let def_bytes = if let Some(def) = repdef.definition_levels.as_ref() {
LanceBuffer::reinterpret_slice(def.clone())
} else {
LanceBuffer::empty()
};
let description = ProtobufUtils21::constant_layout(&repdef.def_meaning, None);
return Ok(EncodedPage {
column_idx,
data: vec![rep_bytes, def_bytes],
description: PageEncoding::Structural(description),
num_rows,
row_number,
});
}
let (rep_bytes, rep_encoding, num_rep_values) = if let Some(rep) =
repdef.repetition_levels.as_ref()
{
let num_values = rep.len() as u64;
let (buffer, encoding) = Self::encode_complex_all_null_vals(rep, compression_strategy)?;
(buffer, Some(encoding), num_values)
} else {
(LanceBuffer::empty(), None, 0)
};
let (def_bytes, def_encoding, num_def_values) = if let Some(def) =
repdef.definition_levels.as_ref()
{
let num_values = def.len() as u64;
let (buffer, encoding) = Self::encode_complex_all_null_vals(def, compression_strategy)?;
(buffer, Some(encoding), num_values)
} else {
(LanceBuffer::empty(), None, 0)
};
let description = ProtobufUtils21::compressed_all_null_constant_layout(
&repdef.def_meaning,
rep_encoding,
def_encoding,
num_rep_values,
num_def_values,
);
Ok(EncodedPage {
column_idx,
data: vec![rep_bytes, def_bytes],
description: PageEncoding::Structural(description),
num_rows,
row_number,
})
}
fn leaf_validity(
repdef: &crate::repdef::SerializedRepDefs,
num_values: usize,
) -> Result<Option<BooleanBuffer>> {
let rep = repdef
.repetition_levels
.as_ref()
.map(|rep| rep.as_ref().to_vec());
let def = repdef
.definition_levels
.as_ref()
.map(|def| def.as_ref().to_vec());
let mut unraveler = RepDefUnraveler::new(
rep,
def,
repdef.def_meaning.clone().into(),
num_values as u64,
);
if unraveler.is_all_valid() {
return Ok(None);
}
let mut validity = BooleanBufferBuilder::new(num_values);
unraveler.unravel_validity(&mut validity);
Ok(Some(validity.finish()))
}
fn is_constant_values(
arrays: &[ArrayRef],
scalar: &ArrayRef,
validity: Option<&BooleanBuffer>,
) -> Result<bool> {
debug_assert_eq!(scalar.len(), 1);
debug_assert_eq!(scalar.null_count(), 0);
match scalar.data_type() {
DataType::Boolean => {
let mut global_idx = 0usize;
let scalar_val = scalar.as_boolean().value(0);
for arr in arrays {
let bool_arr = arr.as_boolean();
for i in 0..arr.len() {
let is_valid = validity.map(|v| v.value(global_idx)).unwrap_or(true);
global_idx += 1;
if !is_valid {
continue;
}
if bool_arr.value(i) != scalar_val {
return Ok(false);
}
}
}
Ok(true)
}
DataType::Utf8 => Self::is_constant_utf8::<i32>(arrays, scalar, validity),
DataType::LargeUtf8 => Self::is_constant_utf8::<i64>(arrays, scalar, validity),
DataType::Binary => Self::is_constant_binary::<i32>(arrays, scalar, validity),
DataType::LargeBinary => Self::is_constant_binary::<i64>(arrays, scalar, validity),
data_type => {
let mut global_idx = 0usize;
let Some(byte_width) = data_type.byte_width_opt() else {
return Ok(false);
};
let scalar_data = scalar.to_data();
if scalar_data.buffers().len() != 1 || !scalar_data.child_data().is_empty() {
return Ok(false);
}
let scalar_bytes = scalar_data.buffers()[0].as_slice();
if scalar_bytes.len() != byte_width {
return Ok(false);
}
for arr in arrays {
let data = arr.to_data();
if data.buffers().is_empty() {
return Ok(false);
}
let buf = data.buffers()[0].as_slice();
let base = data.offset();
for i in 0..arr.len() {
let is_valid = validity.map(|v| v.value(global_idx)).unwrap_or(true);
global_idx += 1;
if !is_valid {
continue;
}
let start = (base + i) * byte_width;
if buf[start..start + byte_width] != scalar_bytes[..] {
return Ok(false);
}
}
}
Ok(true)
}
}
}
fn is_constant_utf8<O: arrow_array::OffsetSizeTrait>(
arrays: &[ArrayRef],
scalar: &ArrayRef,
validity: Option<&BooleanBuffer>,
) -> Result<bool> {
debug_assert_eq!(scalar.len(), 1);
let scalar_val = scalar.as_string::<O>().value(0).as_bytes();
let mut global_idx = 0usize;
for arr in arrays {
let str_arr = arr.as_string::<O>();
for i in 0..arr.len() {
let is_valid = validity.map(|v| v.value(global_idx)).unwrap_or(true);
global_idx += 1;
if !is_valid {
continue;
}
if str_arr.value(i).as_bytes() != scalar_val {
return Ok(false);
}
}
}
Ok(true)
}
fn is_constant_binary<O: arrow_array::OffsetSizeTrait>(
arrays: &[ArrayRef],
scalar: &ArrayRef,
validity: Option<&BooleanBuffer>,
) -> Result<bool> {
debug_assert_eq!(scalar.len(), 1);
let scalar_val = scalar.as_binary::<O>().value(0);
let mut global_idx = 0usize;
for arr in arrays {
let bin_arr = arr.as_binary::<O>();
for i in 0..arr.len() {
let is_valid = validity.map(|v| v.value(global_idx)).unwrap_or(true);
global_idx += 1;
if !is_valid {
continue;
}
if bin_arr.value(i) != scalar_val {
return Ok(false);
}
}
}
Ok(true)
}
fn find_constant_scalar(
arrays: &[ArrayRef],
validity: Option<&BooleanBuffer>,
) -> Result<Option<ArrayRef>> {
if arrays.is_empty() {
return Ok(None);
}
let global_scalar_idx = if let Some(validity) = validity {
let Some(idx) = (0..validity.len()).find(|&i| validity.value(i)) else {
return Ok(None);
};
idx
} else {
0
};
let mut idx_remaining = global_scalar_idx;
let mut scalar_arr_idx = 0usize;
while scalar_arr_idx < arrays.len() {
let len = arrays[scalar_arr_idx].len();
if idx_remaining < len {
break;
}
idx_remaining -= len;
scalar_arr_idx += 1;
}
if scalar_arr_idx >= arrays.len() {
return Ok(None);
}
let scalar =
lance_arrow::scalar::extract_scalar_value(&arrays[scalar_arr_idx], idx_remaining)?;
if scalar.null_count() != 0 {
return Ok(None);
}
if !Self::is_constant_values(arrays, &scalar, validity)? {
return Ok(None);
}
Ok(Some(scalar))
}
fn resolve_dict_values_compression_metadata(
field_metadata: &HashMap<String, String>,
env_compression: Option<String>,
env_compression_level: Option<String>,
) -> HashMap<String, String> {
let mut metadata = HashMap::new();
let compression = field_metadata
.get(DICT_VALUES_COMPRESSION_META_KEY)
.cloned()
.or(env_compression)
.unwrap_or_else(|| DEFAULT_DICT_VALUES_COMPRESSION.to_string());
metadata.insert(COMPRESSION_META_KEY.to_string(), compression);
if let Some(compression_level) = field_metadata
.get(DICT_VALUES_COMPRESSION_LEVEL_META_KEY)
.cloned()
.or(env_compression_level)
{
metadata.insert(COMPRESSION_LEVEL_META_KEY.to_string(), compression_level);
}
metadata
}
fn build_dict_values_compressor_field(field: &Field) -> Result<Field> {
let mut dict_values_field = Field::new_arrow("", DataType::UInt16, false)?;
dict_values_field.metadata = Self::resolve_dict_values_compression_metadata(
&field.metadata,
env::var(DICT_VALUES_COMPRESSION_ENV_VAR).ok(),
env::var(DICT_VALUES_COMPRESSION_LEVEL_ENV_VAR).ok(),
);
Ok(dict_values_field)
}
#[allow(clippy::too_many_arguments)]
fn encode_miniblock(
column_idx: u32,
field: &Field,
compression_strategy: &dyn CompressionStrategy,
data: DataBlock,
repdef: crate::repdef::SerializedRepDefs,
row_number: u64,
dictionary_data: Option<DataBlock>,
num_rows: u64,
support_large_chunk: bool,
) -> Result<EncodedPage> {
if let DataBlock::AllNull(_null_block) = data {
unreachable!()
}
let num_items = data.num_values();
let compressor = compression_strategy.create_miniblock_compressor(field, &data)?;
let (compressed_data, value_encoding) = compressor.compress(data)?;
let max_rep = repdef.def_meaning.iter().filter(|l| l.is_list()).count() as u16;
let mut compressed_rep = repdef
.rep_slicer()
.map(|rep_slicer| {
Self::compress_levels(
rep_slicer,
num_items,
compression_strategy,
&compressed_data.chunks,
max_rep,
)
})
.transpose()?;
let (rep_index, rep_index_depth) =
match compressed_rep.as_mut().and_then(|cr| cr.rep_index.as_mut()) {
Some(rep_index) => (Some(rep_index.clone()), 1),
None => (None, 0),
};
let mut compressed_def = repdef
.def_slicer()
.map(|def_slicer| {
Self::compress_levels(
def_slicer,
num_items,
compression_strategy,
&compressed_data.chunks,
0,
)
})
.transpose()?;
let rep_data = compressed_rep
.as_mut()
.map(|cr| std::mem::take(&mut cr.data));
let def_data = compressed_def
.as_mut()
.map(|cd| std::mem::take(&mut cd.data));
let serialized =
Self::serialize_miniblocks(compressed_data, rep_data, def_data, support_large_chunk)?;
let mut data = Vec::with_capacity(4);
data.push(serialized.metadata);
data.push(serialized.data);
if let Some(dictionary_data) = dictionary_data {
let num_dictionary_items = dictionary_data.num_values();
let dict_values_field = Self::build_dict_values_compressor_field(field)?;
let (compressor, dictionary_encoding) = compression_strategy
.create_block_compressor(&dict_values_field, &dictionary_data)?;
let dictionary_buffer = compressor.compress(dictionary_data)?;
data.push(dictionary_buffer);
if let Some(rep_index) = rep_index {
data.push(rep_index);
}
let description = ProtobufUtils21::miniblock_layout(
compressed_rep.map(|cr| cr.compression),
compressed_def.map(|cd| cd.compression),
value_encoding,
rep_index_depth,
serialized.num_buffers,
Some((dictionary_encoding, num_dictionary_items)),
&repdef.def_meaning,
num_items,
support_large_chunk,
);
Ok(EncodedPage {
num_rows,
column_idx,
data,
description: PageEncoding::Structural(description),
row_number,
})
} else {
let description = ProtobufUtils21::miniblock_layout(
compressed_rep.map(|cr| cr.compression),
compressed_def.map(|cd| cd.compression),
value_encoding,
rep_index_depth,
serialized.num_buffers,
None,
&repdef.def_meaning,
num_items,
support_large_chunk,
);
if let Some(rep_index) = rep_index {
let view = rep_index.borrow_to_typed_slice::<u64>();
let total = view.chunks_exact(2).map(|c| c[0]).sum::<u64>();
debug_assert_eq!(total, num_rows);
data.push(rep_index);
}
Ok(EncodedPage {
num_rows,
column_idx,
data,
description: PageEncoding::Structural(description),
row_number,
})
}
}
fn serialize_full_zip_fixed(
fixed: FixedWidthDataBlock,
mut repdef: ControlWordIterator,
num_values: u64,
) -> SerializedFullZip {
let len = fixed.data.len() + repdef.bytes_per_word() * num_values as usize;
let mut zipped_data = Vec::with_capacity(len);
let max_rep_index_val = if repdef.has_repetition() {
len as u64
} else {
0
};
let mut rep_index_builder =
BytepackedIntegerEncoder::with_capacity(num_values as usize + 1, max_rep_index_val);
assert_eq!(
fixed.bits_per_value % 8,
0,
"Non-byte aligned full-zip compression not yet supported"
);
let bytes_per_value = fixed.bits_per_value as usize / 8;
let mut offset = 0;
if bytes_per_value == 0 {
while let Some(control) = repdef.append_next(&mut zipped_data) {
if control.is_new_row {
debug_assert!(offset <= len);
unsafe { rep_index_builder.append(offset as u64) };
}
offset = zipped_data.len();
}
} else {
let mut data_iter = fixed.data.chunks_exact(bytes_per_value);
while let Some(control) = repdef.append_next(&mut zipped_data) {
if control.is_new_row {
debug_assert!(offset <= len);
unsafe { rep_index_builder.append(offset as u64) };
}
if control.is_visible {
let value = data_iter.next().unwrap();
zipped_data.extend_from_slice(value);
}
offset = zipped_data.len();
}
}
debug_assert_eq!(zipped_data.len(), len);
unsafe {
rep_index_builder.append(zipped_data.len() as u64);
}
let zipped_data = LanceBuffer::from(zipped_data);
let rep_index = rep_index_builder.into_data();
let rep_index = if rep_index.is_empty() {
None
} else {
Some(LanceBuffer::from(rep_index))
};
SerializedFullZip {
values: zipped_data,
repetition_index: rep_index,
}
}
fn serialize_full_zip_variable(
variable: VariableWidthBlock,
mut repdef: ControlWordIterator,
num_items: u64,
) -> SerializedFullZip {
let bytes_per_offset = variable.bits_per_offset as usize / 8;
assert_eq!(
variable.bits_per_offset % 8,
0,
"Only byte-aligned offsets supported"
);
let len = variable.data.len()
+ repdef.bytes_per_word() * num_items as usize
+ bytes_per_offset * variable.num_values as usize;
let mut buf = Vec::with_capacity(len);
let max_rep_index_val = len as u64;
let mut rep_index_builder =
BytepackedIntegerEncoder::with_capacity(num_items as usize + 1, max_rep_index_val);
match bytes_per_offset {
4 => {
let offs = variable.offsets.borrow_to_typed_slice::<u32>();
let mut rep_offset = 0;
let mut windows_iter = offs.as_ref().windows(2);
while let Some(control) = repdef.append_next(&mut buf) {
if control.is_new_row {
debug_assert!(rep_offset <= len);
unsafe { rep_index_builder.append(rep_offset as u64) };
}
if control.is_visible {
let window = windows_iter.next().unwrap();
if control.is_valid_item {
buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
buf.extend_from_slice(
&variable.data[window[0] as usize..window[1] as usize],
);
}
}
rep_offset = buf.len();
}
}
8 => {
let offs = variable.offsets.borrow_to_typed_slice::<u64>();
let mut rep_offset = 0;
let mut windows_iter = offs.as_ref().windows(2);
while let Some(control) = repdef.append_next(&mut buf) {
if control.is_new_row {
debug_assert!(rep_offset <= len);
unsafe { rep_index_builder.append(rep_offset as u64) };
}
if control.is_visible {
let window = windows_iter.next().unwrap();
if control.is_valid_item {
buf.extend_from_slice(&(window[1] - window[0]).to_le_bytes());
buf.extend_from_slice(
&variable.data[window[0] as usize..window[1] as usize],
);
}
}
rep_offset = buf.len();
}
}
_ => panic!("Unsupported offset size"),
}
debug_assert!(buf.len() <= len);
unsafe {
rep_index_builder.append(buf.len() as u64);
}
let zipped_data = LanceBuffer::from(buf);
let rep_index = rep_index_builder.into_data();
debug_assert!(!rep_index.is_empty());
let rep_index = Some(LanceBuffer::from(rep_index));
SerializedFullZip {
values: zipped_data,
repetition_index: rep_index,
}
}
fn serialize_full_zip(
compressed_data: PerValueDataBlock,
repdef: ControlWordIterator,
num_items: u64,
) -> SerializedFullZip {
match compressed_data {
PerValueDataBlock::Fixed(fixed) => {
Self::serialize_full_zip_fixed(fixed, repdef, num_items)
}
PerValueDataBlock::Variable(var) => {
Self::serialize_full_zip_variable(var, repdef, num_items)
}
}
}
fn encode_full_zip(
column_idx: u32,
field: &Field,
compression_strategy: &dyn CompressionStrategy,
data: DataBlock,
repdef: crate::repdef::SerializedRepDefs,
row_number: u64,
num_lists: u64,
) -> Result<EncodedPage> {
let max_rep = repdef
.repetition_levels
.as_ref()
.map_or(0, |r| r.iter().max().copied().unwrap_or(0));
let max_def = repdef
.definition_levels
.as_ref()
.map_or(0, |d| d.iter().max().copied().unwrap_or(0));
let (num_items, num_visible_items) =
if let Some(rep_levels) = repdef.repetition_levels.as_ref() {
(rep_levels.len() as u64, data.num_values())
} else {
(data.num_values(), data.num_values())
};
let max_visible_def = repdef.max_visible_level.unwrap_or(u16::MAX);
let repdef_iter = build_control_word_iterator(
repdef.repetition_levels.as_deref(),
max_rep,
repdef.definition_levels.as_deref(),
max_def,
max_visible_def,
num_items as usize,
);
let bits_rep = repdef_iter.bits_rep();
let bits_def = repdef_iter.bits_def();
let compressor = compression_strategy.create_per_value(field, &data)?;
let (compressed_data, value_encoding) = compressor.compress(data)?;
let description = match &compressed_data {
PerValueDataBlock::Fixed(fixed) => ProtobufUtils21::fixed_full_zip_layout(
bits_rep,
bits_def,
fixed.bits_per_value as u32,
value_encoding,
&repdef.def_meaning,
num_items as u32,
num_visible_items as u32,
),
PerValueDataBlock::Variable(variable) => ProtobufUtils21::variable_full_zip_layout(
bits_rep,
bits_def,
variable.bits_per_offset as u32,
value_encoding,
&repdef.def_meaning,
num_items as u32,
num_visible_items as u32,
),
};
let zipped = Self::serialize_full_zip(compressed_data, repdef_iter, num_items);
let data = if let Some(repindex) = zipped.repetition_index {
vec![zipped.values, repindex]
} else {
vec![zipped.values]
};
Ok(EncodedPage {
num_rows: num_lists,
column_idx,
data,
description: PageEncoding::Structural(description),
row_number,
})
}
fn should_dictionary_encode(
data_block: &DataBlock,
field: &Field,
version: LanceFileVersion,
) -> Option<DictEncodingBudget> {
const DEFAULT_SAMPLE_SIZE: usize = 4096;
const DEFAULT_SAMPLE_UNIQUE_RATIO: f64 = 0.98;
match data_block {
DataBlock::FixedWidth(fixed) => {
if fixed.bits_per_value == 64 && version < LanceFileVersion::V2_2 {
return None;
}
if fixed.bits_per_value != 64 && fixed.bits_per_value != 128 {
return None;
}
if fixed.bits_per_value % 8 != 0 {
return None;
}
}
DataBlock::VariableWidth(var) => {
if var.bits_per_offset != 32 && var.bits_per_offset != 64 {
return None;
}
}
_ => return None,
}
let too_small = env::var("LANCE_ENCODING_DICT_TOO_SMALL")
.ok()
.and_then(|val| val.parse().ok())
.unwrap_or(100);
if data_block.num_values() < too_small {
return None;
}
let num_values = data_block.num_values();
let divisor: u64 = field
.metadata
.get(DICT_DIVISOR_META_KEY)
.and_then(|val| val.parse().ok())
.or_else(|| {
env::var("LANCE_ENCODING_DICT_DIVISOR")
.ok()
.and_then(|val| val.parse().ok())
})
.unwrap_or(DEFAULT_DICT_DIVISOR);
let max_cardinality: u64 = env::var("LANCE_ENCODING_DICT_MAX_CARDINALITY")
.ok()
.and_then(|val| val.parse().ok())
.unwrap_or(DEFAULT_DICT_MAX_CARDINALITY);
let threshold_cardinality = num_values
.checked_div(divisor.max(1))
.unwrap_or(0)
.min(max_cardinality);
if threshold_cardinality == 0 {
return None;
}
let threshold_ratio = field
.metadata
.get(DICT_SIZE_RATIO_META_KEY)
.and_then(|val| val.parse::<f64>().ok())
.or_else(|| {
env::var("LANCE_ENCODING_DICT_SIZE_RATIO")
.ok()
.and_then(|val| val.parse().ok())
})
.unwrap_or(DEFAULT_DICT_SIZE_RATIO);
if threshold_ratio <= 0.0 || threshold_ratio > 1.0 {
panic!(
"Invalid parameter: dict-size-ratio is {} which is not in the range (0, 1].",
threshold_ratio
);
}
let data_size = data_block.data_size();
if data_size == 0 {
return None;
}
let max_encoded_size = (data_size as f64 * threshold_ratio) as u64;
let max_encoded_size = usize::try_from(max_encoded_size).ok()?;
if Self::sample_is_near_unique(
data_block,
DEFAULT_SAMPLE_SIZE,
DEFAULT_SAMPLE_UNIQUE_RATIO,
)? {
return None;
}
let max_dict_entries = u32::try_from(threshold_cardinality.min(i32::MAX as u64)).ok()?;
Some(DictEncodingBudget {
max_dict_entries,
max_encoded_size,
})
}
fn sample_is_near_unique(
data_block: &DataBlock,
max_samples: usize,
unique_ratio_threshold: f64,
) -> Option<bool> {
use std::collections::HashSet;
if unique_ratio_threshold <= 0.0 || unique_ratio_threshold > 1.0 {
return None;
}
let num_values = usize::try_from(data_block.num_values()).ok()?;
if num_values == 0 {
return Some(false);
}
let sample_count = num_values.min(max_samples).max(1);
let step = (num_values / sample_count).max(1);
match data_block {
DataBlock::FixedWidth(fixed) => match fixed.bits_per_value {
64 => {
let values = fixed.data.borrow_to_typed_slice::<u64>();
let values = values.as_ref();
let mut unique: HashSet<u64> = HashSet::with_capacity(sample_count.min(1024));
for idx in (0..num_values).step_by(step).take(sample_count) {
unique.insert(values.get(idx).copied()?);
}
let ratio = unique.len() as f64 / sample_count as f64;
Some(sample_count >= 1024 && ratio >= unique_ratio_threshold)
}
128 => {
let values = fixed.data.borrow_to_typed_slice::<u128>();
let values = values.as_ref();
let mut unique: HashSet<u128> = HashSet::with_capacity(sample_count.min(1024));
for idx in (0..num_values).step_by(step).take(sample_count) {
unique.insert(values.get(idx).copied()?);
}
let ratio = unique.len() as f64 / sample_count as f64;
Some(sample_count >= 1024 && ratio >= unique_ratio_threshold)
}
_ => Some(false),
},
DataBlock::VariableWidth(var) => {
use xxhash_rust::xxh3::xxh3_64;
let mut unique: HashSet<u64> = HashSet::with_capacity(sample_count.min(1024));
match var.bits_per_offset {
32 => {
let offsets_ref = var.offsets.borrow_to_typed_slice::<u32>();
let offsets: &[u32] = offsets_ref.as_ref();
for i in (0..num_values).step_by(step).take(sample_count) {
let start = usize::try_from(*offsets.get(i)?).ok()?;
let end = usize::try_from(*offsets.get(i + 1)?).ok()?;
if start > end || end > var.data.len() {
return None;
}
unique.insert(xxh3_64(&var.data[start..end]));
}
}
64 => {
let offsets_ref = var.offsets.borrow_to_typed_slice::<u64>();
let offsets: &[u64] = offsets_ref.as_ref();
for i in (0..num_values).step_by(step).take(sample_count) {
let start = usize::try_from(*offsets.get(i)?).ok()?;
let end = usize::try_from(*offsets.get(i + 1)?).ok()?;
if start > end || end > var.data.len() {
return None;
}
unique.insert(xxh3_64(&var.data[start..end]));
}
}
_ => return Some(false),
}
let ratio = unique.len() as f64 / sample_count as f64;
Some(sample_count >= 1024 && ratio >= unique_ratio_threshold)
}
_ => Some(false),
}
}
fn do_flush(
&mut self,
arrays: Vec<ArrayRef>,
repdefs: Vec<RepDefBuilder>,
row_number: u64,
num_rows: u64,
) -> Result<Vec<EncodeTask>> {
let column_idx = self.column_index;
let compression_strategy = self.compression_strategy.clone();
let field = self.field.clone();
let encoding_metadata = self.encoding_metadata.clone();
let support_large_chunk = self.support_large_chunk;
let version = self.version;
let task = spawn_cpu(move || {
let num_values = arrays.iter().map(|arr| arr.len() as u64).sum();
let is_simple_validity = repdefs.iter().all(|rd| rd.is_simple_validity());
let has_repdef_info = repdefs.iter().any(|rd| !rd.is_empty());
let repdef = RepDefBuilder::serialize(repdefs);
if num_values == 0 {
log::debug!("Encoding column {} with {} items ({} rows) using complex-null layout", column_idx, num_values, num_rows);
return Self::encode_complex_all_null(
column_idx,
repdef,
row_number,
num_rows,
version,
compression_strategy.as_ref(),
);
}
let leaf_validity = Self::leaf_validity(&repdef, num_values as usize)?;
let all_null = leaf_validity
.as_ref()
.map(|validity| validity.count_set_bits() == 0)
.unwrap_or(false);
if all_null {
return if is_simple_validity {
log::debug!(
"Encoding column {} with {} items ({} rows) using simple-null layout",
column_idx,
num_values,
num_rows
);
Self::encode_simple_all_null(column_idx, num_values, row_number)
} else {
log::debug!(
"Encoding column {} with {} items ({} rows) using complex-null layout",
column_idx,
num_values,
num_rows
);
Self::encode_complex_all_null(
column_idx,
repdef,
row_number,
num_rows,
version,
compression_strategy.as_ref(),
)
};
}
if let DataType::Struct(fields) = &field.data_type()
&& fields.is_empty()
{
if has_repdef_info {
return Err(Error::invalid_input_source(format!("Empty structs with rep/def information are not yet supported. The field {} is an empty struct that either has nulls or is in a list.", field.name).into()));
}
return Self::encode_simple_all_null(column_idx, num_values, row_number);
}
let data_block = DataBlock::from_arrays(&arrays, num_values);
if version.resolve() >= LanceFileVersion::V2_2
&& let Some(scalar) = Self::find_constant_scalar(&arrays, leaf_validity.as_ref())?
{
log::debug!(
"Encoding column {} with {} items ({} rows) using constant layout",
column_idx,
num_values,
num_rows
);
return constant::encode_constant_page(
column_idx,
scalar,
repdef,
row_number,
num_rows,
);
}
let requires_full_zip_packed_struct =
if let DataBlock::Struct(ref struct_data_block) = data_block {
struct_data_block.has_variable_width_child()
} else {
false
};
if requires_full_zip_packed_struct {
log::debug!(
"Encoding column {} with {} items using full-zip packed struct layout",
column_idx,
num_values
);
return Self::encode_full_zip(
column_idx,
&field,
compression_strategy.as_ref(),
data_block,
repdef,
row_number,
num_rows,
);
}
let too_sparse = Self::repdef_too_sparse_for_miniblock(&repdef, num_values);
if !too_sparse {
if let DataBlock::Dictionary(dict) = data_block {
log::debug!("Encoding column {} with {} items using dictionary encoding (already dictionary encoded)", column_idx, num_values);
let (mut indices_data_block, dictionary_data_block) = dict.into_parts();
indices_data_block.compute_stat();
return Self::encode_miniblock(
column_idx,
&field,
compression_strategy.as_ref(),
indices_data_block,
repdef,
row_number,
Some(dictionary_data_block),
num_rows,
support_large_chunk,
);
}
} else {
log::debug!(
"Encoding column {} with {} items using full-zip layout \
(rep/def too sparse for mini-block)",
column_idx,
num_values
);
}
{
let dict_result = if too_sparse {
None
} else {
Self::should_dictionary_encode(&data_block, &field, version)
.and_then(|budget| {
log::debug!(
"Encoding column {} with {} items using dictionary encoding (mini-block layout)",
column_idx,
num_values
);
dict::dictionary_encode(
&data_block,
budget.max_dict_entries,
budget.max_encoded_size,
)
})
};
if let Some((indices_data_block, dictionary_data_block)) = dict_result {
Self::encode_miniblock(
column_idx,
&field,
compression_strategy.as_ref(),
indices_data_block,
repdef,
row_number,
Some(dictionary_data_block),
num_rows,
support_large_chunk,
)
} else if !too_sparse && Self::prefers_miniblock(&data_block, encoding_metadata.as_ref()) {
log::debug!(
"Encoding column {} with {} items using mini-block layout",
column_idx,
num_values
);
Self::encode_miniblock(
column_idx,
&field,
compression_strategy.as_ref(),
data_block,
repdef,
row_number,
None,
num_rows,
support_large_chunk,
)
} else if too_sparse || Self::prefers_fullzip(encoding_metadata.as_ref()) {
log::debug!(
"Encoding column {} with {} items using full-zip layout",
column_idx,
num_values
);
Self::encode_full_zip(
column_idx,
&field,
compression_strategy.as_ref(),
data_block,
repdef,
row_number,
num_rows,
)
} else {
Err(Error::invalid_input_source(format!("Cannot determine structural encoding for field {}. This typically indicates an invalid value of the field metadata key {}", field.name, STRUCTURAL_ENCODING_META_KEY).into()))
}
}
})
.boxed();
Ok(vec![task])
}
fn extract_validity_buf(
array: Arc<dyn Array>,
repdef: &mut RepDefBuilder,
keep_original_array: bool,
) -> Result<Arc<dyn Array>> {
if let Some(validity) = array.nulls() {
if keep_original_array {
repdef.add_validity_bitmap(validity.clone());
} else {
repdef.add_validity_bitmap(deep_copy_nulls(Some(validity)).unwrap());
}
let data_no_nulls = array.to_data().into_builder().nulls(None).build()?;
Ok(make_array(data_no_nulls))
} else {
repdef.add_no_null(array.len());
Ok(array)
}
}
fn extract_validity(
mut array: Arc<dyn Array>,
repdef: &mut RepDefBuilder,
keep_original_array: bool,
) -> Result<Arc<dyn Array>> {
match array.data_type() {
DataType::Null => {
repdef.add_validity_bitmap(NullBuffer::new(BooleanBuffer::new_unset(array.len())));
Ok(array)
}
DataType::Dictionary(_, _) => {
array = dict::normalize_dict_nulls(array)?;
Self::extract_validity_buf(array, repdef, keep_original_array)
}
_ => Self::extract_validity_buf(array, repdef, keep_original_array),
}
}
}
impl FieldEncoder for PrimitiveStructuralEncoder {
fn maybe_encode(
&mut self,
array: ArrayRef,
_external_buffers: &mut OutOfLineBuffers,
mut repdef: RepDefBuilder,
row_number: u64,
num_rows: u64,
) -> Result<Vec<EncodeTask>> {
let array = Self::extract_validity(array, &mut repdef, self.keep_original_array)?;
self.accumulated_repdefs.push(repdef);
if let Some((arrays, row_number, num_rows)) =
self.accumulation_queue.insert(array, row_number, num_rows)
{
let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
} else {
Ok(vec![])
}
}
fn flush(&mut self, _external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
if let Some((arrays, row_number, num_rows)) = self.accumulation_queue.flush() {
let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
Ok(self.do_flush(arrays, accumulated_repdefs, row_number, num_rows)?)
} else {
Ok(vec![])
}
}
fn num_columns(&self) -> u32 {
1
}
fn finish(
&mut self,
_external_buffers: &mut OutOfLineBuffers,
) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
std::future::ready(Ok(vec![EncodedColumn::default()])).boxed()
}
}
#[cfg(test)]
#[allow(clippy::single_range_in_vec_init)]
mod tests {
use super::{
ChunkInstructions, DataBlock, DecodeMiniBlockTask, FixedPerValueDecompressor,
FixedWidthDataBlock, FullZipCacheableState, FullZipDecodeDetails, FullZipReadSource,
FullZipRepIndexDetails, FullZipScheduler, MiniBlockRepIndex, PerValueDecompressor,
PreambleAction, StructuralPageScheduler, VariableFullZipDecoder,
};
use crate::buffer::LanceBuffer;
use crate::compression::DefaultDecompressionStrategy;
use crate::constants::{
COMPRESSION_LEVEL_META_KEY, COMPRESSION_META_KEY, DICT_VALUES_COMPRESSION_LEVEL_META_KEY,
DICT_VALUES_COMPRESSION_META_KEY, STRUCTURAL_ENCODING_META_KEY,
STRUCTURAL_ENCODING_MINIBLOCK,
};
use crate::data::BlockInfo;
use crate::decoder::PageEncoding;
use crate::encodings::logical::primitive::{
ChunkDrainInstructions, PrimitiveStructuralEncoder,
};
use crate::format::ProtobufUtils21;
use crate::format::pb21;
use crate::format::pb21::compressive_encoding::Compression;
use crate::testing::{TestCases, check_round_trip_encoding_of_data};
use crate::version::LanceFileVersion;
use arrow_array::{ArrayRef, Int8Array, StringArray};
use arrow_schema::DataType;
use std::collections::HashMap;
use std::{collections::VecDeque, sync::Arc};
#[test]
fn test_is_narrow() {
let int8_array = Int8Array::from(vec![1, 2, 3]);
let array_ref: ArrayRef = Arc::new(int8_array);
let block = DataBlock::from_array(array_ref);
assert!(PrimitiveStructuralEncoder::is_narrow(&block));
let string_array = StringArray::from(vec![Some("hello"), Some("world")]);
let block = DataBlock::from_array(string_array);
assert!(PrimitiveStructuralEncoder::is_narrow(&block));
let string_array = StringArray::from(vec![
Some("hello world".repeat(100)),
Some("world".to_string()),
]);
let block = DataBlock::from_array(string_array);
assert!((!PrimitiveStructuralEncoder::is_narrow(&block)));
}
#[test]
fn test_map_range() {
let rep = Some(vec![1, 0, 0, 1, 0, 1, 1, 0, 0]);
let def = Some(vec![0, 0, 0, 0, 0, 1, 0, 0, 0]);
let max_visible_def = 0;
let total_items = 8;
let max_rep = 1;
let check = |range, expected_item_range, expected_level_range| {
let (item_range, level_range) = DecodeMiniBlockTask::map_range(
range,
rep.as_ref(),
def.as_ref(),
max_rep,
max_visible_def,
total_items,
PreambleAction::Absent,
);
assert_eq!(item_range, expected_item_range);
assert_eq!(level_range, expected_level_range);
};
check(0..1, 0..3, 0..3);
check(1..2, 3..5, 3..5);
check(2..3, 5..5, 5..6);
check(3..4, 5..8, 6..9);
check(0..2, 0..5, 0..5);
check(1..3, 3..5, 3..6);
check(2..4, 5..8, 5..9);
check(0..3, 0..5, 0..6);
check(1..4, 3..8, 3..9);
check(0..4, 0..8, 0..9);
let rep = Some(vec![1, 1, 0, 1]);
let def = Some(vec![1, 0, 0, 0]);
let max_visible_def = 0;
let total_items = 3;
let check = |range, expected_item_range, expected_level_range| {
let (item_range, level_range) = DecodeMiniBlockTask::map_range(
range,
rep.as_ref(),
def.as_ref(),
max_rep,
max_visible_def,
total_items,
PreambleAction::Absent,
);
assert_eq!(item_range, expected_item_range);
assert_eq!(level_range, expected_level_range);
};
check(0..1, 0..0, 0..1);
check(1..2, 0..2, 1..3);
check(2..3, 2..3, 3..4);
check(0..2, 0..2, 0..3);
check(1..3, 0..3, 1..4);
check(0..3, 0..3, 0..4);
let rep = Some(vec![1, 1, 0, 1]);
let def = Some(vec![0, 0, 0, 1]);
let max_visible_def = 0;
let total_items = 3;
let check = |range, expected_item_range, expected_level_range| {
let (item_range, level_range) = DecodeMiniBlockTask::map_range(
range,
rep.as_ref(),
def.as_ref(),
max_rep,
max_visible_def,
total_items,
PreambleAction::Absent,
);
assert_eq!(item_range, expected_item_range);
assert_eq!(level_range, expected_level_range);
};
check(0..1, 0..1, 0..1);
check(1..2, 1..3, 1..3);
check(2..3, 3..3, 3..4);
check(0..2, 0..3, 0..3);
check(1..3, 1..3, 1..4);
check(0..3, 0..3, 0..4);
let rep = Some(vec![1, 0, 1, 0, 1, 0]);
let def: Option<&[u16]> = None;
let max_visible_def = 0;
let total_items = 6;
let check = |range, expected_item_range, expected_level_range| {
let (item_range, level_range) = DecodeMiniBlockTask::map_range(
range,
rep.as_ref(),
def.as_ref(),
max_rep,
max_visible_def,
total_items,
PreambleAction::Absent,
);
assert_eq!(item_range, expected_item_range);
assert_eq!(level_range, expected_level_range);
};
check(0..1, 0..2, 0..2);
check(1..2, 2..4, 2..4);
check(2..3, 4..6, 4..6);
check(0..2, 0..4, 0..4);
check(1..3, 2..6, 2..6);
check(0..3, 0..6, 0..6);
let rep: Option<&[u16]> = None;
let def = Some(vec![0, 0, 1, 0]);
let max_visible_def = 1;
let total_items = 4;
let check = |range, expected_item_range, expected_level_range| {
let (item_range, level_range) = DecodeMiniBlockTask::map_range(
range,
rep.as_ref(),
def.as_ref(),
max_rep,
max_visible_def,
total_items,
PreambleAction::Absent,
);
assert_eq!(item_range, expected_item_range);
assert_eq!(level_range, expected_level_range);
};
check(0..1, 0..1, 0..1);
check(1..2, 1..2, 1..2);
check(2..3, 2..3, 2..3);
check(0..2, 0..2, 0..2);
check(1..3, 1..3, 1..3);
check(0..3, 0..3, 0..3);
let rep = Some(vec![0, 1, 0, 1]);
let def = Some(vec![0, 0, 0, 1]);
let max_visible_def = 0;
let total_items = 3;
let check = |range, expected_item_range, expected_level_range| {
let (item_range, level_range) = DecodeMiniBlockTask::map_range(
range,
rep.as_ref(),
def.as_ref(),
max_rep,
max_visible_def,
total_items,
PreambleAction::Take,
);
assert_eq!(item_range, expected_item_range);
assert_eq!(level_range, expected_level_range);
};
check(0..1, 0..3, 0..3);
check(0..2, 0..3, 0..4);
let check = |range, expected_item_range, expected_level_range| {
let (item_range, level_range) = DecodeMiniBlockTask::map_range(
range,
rep.as_ref(),
def.as_ref(),
max_rep,
max_visible_def,
total_items,
PreambleAction::Skip,
);
assert_eq!(item_range, expected_item_range);
assert_eq!(level_range, expected_level_range);
};
check(0..1, 1..3, 1..3);
check(1..2, 3..3, 3..4);
check(0..2, 1..3, 1..4);
let rep = Some(vec![0, 1, 1, 0]);
let def = Some(vec![0, 1, 0, 0]);
let max_visible_def = 0;
let total_items = 4;
let check = |range, expected_item_range, expected_level_range| {
let (item_range, level_range) = DecodeMiniBlockTask::map_range(
range,
rep.as_ref(),
def.as_ref(),
max_rep,
max_visible_def,
total_items,
PreambleAction::Take,
);
assert_eq!(item_range, expected_item_range);
assert_eq!(level_range, expected_level_range);
};
check(0..1, 0..1, 0..2);
check(0..2, 0..3, 0..4);
let check = |range, expected_item_range, expected_level_range| {
let (item_range, level_range) = DecodeMiniBlockTask::map_range(
range,
rep.as_ref(),
def.as_ref(),
max_rep,
max_visible_def,
total_items,
PreambleAction::Skip,
);
assert_eq!(item_range, expected_item_range);
assert_eq!(level_range, expected_level_range);
};
check(0..1, 1..1, 1..2);
check(1..2, 1..3, 2..4);
check(0..2, 1..3, 1..4);
let rep = Some(vec![0, 1, 0, 1]);
let def: Option<Vec<u16>> = None;
let max_visible_def = 0;
let total_items = 4;
let check = |range, expected_item_range, expected_level_range| {
let (item_range, level_range) = DecodeMiniBlockTask::map_range(
range,
rep.as_ref(),
def.as_ref(),
max_rep,
max_visible_def,
total_items,
PreambleAction::Take,
);
assert_eq!(item_range, expected_item_range);
assert_eq!(level_range, expected_level_range);
};
check(0..1, 0..3, 0..3);
check(0..2, 0..4, 0..4);
let check = |range, expected_item_range, expected_level_range| {
let (item_range, level_range) = DecodeMiniBlockTask::map_range(
range,
rep.as_ref(),
def.as_ref(),
max_rep,
max_visible_def,
total_items,
PreambleAction::Skip,
);
assert_eq!(item_range, expected_item_range);
assert_eq!(level_range, expected_level_range);
};
check(0..1, 1..3, 1..3);
check(1..2, 3..4, 3..4);
check(0..2, 1..4, 1..4);
let rep = Some(vec![2, 1, 2, 0, 1, 2]);
let def = Some(vec![0, 1, 2, 0, 0, 0]);
let max_rep = 2;
let max_visible_def = 0;
let total_items = 4;
let check = |range, expected_item_range, expected_level_range| {
let (item_range, level_range) = DecodeMiniBlockTask::map_range(
range,
rep.as_ref(),
def.as_ref(),
max_rep,
max_visible_def,
total_items,
PreambleAction::Absent,
);
assert_eq!(item_range, expected_item_range);
assert_eq!(level_range, expected_level_range);
};
check(0..3, 0..4, 0..6);
check(0..1, 0..1, 0..2);
check(1..2, 1..3, 2..5);
check(2..3, 3..4, 5..6);
let rep = Some(vec![0, 0, 1, 0, 1, 1]);
let def = Some(vec![0, 1, 0, 0, 0, 0]);
let max_rep = 1;
let max_visible_def = 0;
let total_items = 5;
let check = |range, expected_item_range, expected_level_range| {
let (item_range, level_range) = DecodeMiniBlockTask::map_range(
range,
rep.as_ref(),
def.as_ref(),
max_rep,
max_visible_def,
total_items,
PreambleAction::Take,
);
assert_eq!(item_range, expected_item_range);
assert_eq!(level_range, expected_level_range);
};
check(0..0, 0..1, 0..2);
check(0..1, 0..3, 0..4);
check(0..2, 0..4, 0..5);
let rep = Some(vec![0, 1, 0, 1, 0, 1, 0, 1]);
let def = Some(vec![1, 0, 1, 1, 0, 0, 0, 0]);
let max_rep = 1;
let max_visible_def = 0;
let total_items = 5;
let check = |range, expected_item_range, expected_level_range| {
let (item_range, level_range) = DecodeMiniBlockTask::map_range(
range,
rep.as_ref(),
def.as_ref(),
max_rep,
max_visible_def,
total_items,
PreambleAction::Skip,
);
assert_eq!(item_range, expected_item_range);
assert_eq!(level_range, expected_level_range);
};
check(2..3, 2..4, 5..7);
}
#[test]
fn test_slice_batch_data_and_rebase_offsets_u32() {
let data = LanceBuffer::copy_slice(b"0123456789abcdefghij");
let offsets = LanceBuffer::reinterpret_vec(vec![6_u32, 8_u32, 8_u32, 12_u32]);
let (sliced_data, normalized_offsets) =
VariableFullZipDecoder::slice_batch_data_and_rebase_offsets(&data, &offsets, 32)
.unwrap();
assert_eq!(sliced_data.as_ref(), b"6789ab");
let normalized = normalized_offsets.borrow_to_typed_slice::<u32>();
assert_eq!(normalized.as_ref(), &[0, 2, 2, 6]);
}
#[test]
fn test_slice_batch_data_and_rebase_offsets_u64() {
let data = LanceBuffer::copy_slice(b"abcdefghijklmnopqrstuvwxyz");
let offsets = LanceBuffer::reinterpret_vec(vec![10_u64, 12_u64, 16_u64, 20_u64]);
let (sliced_data, normalized_offsets) =
VariableFullZipDecoder::slice_batch_data_and_rebase_offsets(&data, &offsets, 64)
.unwrap();
assert_eq!(sliced_data.as_ref(), b"klmnopqrst");
let normalized = normalized_offsets.borrow_to_typed_slice::<u64>();
assert_eq!(normalized.as_ref(), &[0, 2, 6, 10]);
}
#[test]
fn test_slice_batch_data_and_rebase_offsets_rejects_invalid_offsets() {
let data = LanceBuffer::copy_slice(b"abcd");
let offsets = LanceBuffer::reinterpret_vec(vec![3_u32, 2_u32]);
let err = VariableFullZipDecoder::slice_batch_data_and_rebase_offsets(&data, &offsets, 32)
.expect_err("offset end before start should error");
assert!(err.to_string().contains("less than base"));
}
#[test]
fn test_schedule_instructions() {
let rep_data: Vec<u64> = vec![5, 2, 3, 0, 4, 7, 2, 0];
let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
let check = |user_ranges, expected_instructions| {
let instructions =
ChunkInstructions::schedule_instructions(&repetition_index, user_ranges);
assert_eq!(instructions, expected_instructions);
};
let expected_take_all = vec![
ChunkInstructions {
chunk_idx: 0,
preamble: PreambleAction::Absent,
rows_to_skip: 0,
rows_to_take: 6,
take_trailer: true,
},
ChunkInstructions {
chunk_idx: 1,
preamble: PreambleAction::Take,
rows_to_skip: 0,
rows_to_take: 2,
take_trailer: false,
},
ChunkInstructions {
chunk_idx: 2,
preamble: PreambleAction::Absent,
rows_to_skip: 0,
rows_to_take: 5,
take_trailer: true,
},
ChunkInstructions {
chunk_idx: 3,
preamble: PreambleAction::Take,
rows_to_skip: 0,
rows_to_take: 1,
take_trailer: false,
},
];
check(&[0..14], expected_take_all.clone());
check(
&[
0..1,
1..2,
2..3,
3..4,
4..5,
5..6,
6..7,
7..8,
8..9,
9..10,
10..11,
11..12,
12..13,
13..14,
],
expected_take_all,
);
check(
&[0..1, 3..4],
vec![
ChunkInstructions {
chunk_idx: 0,
preamble: PreambleAction::Absent,
rows_to_skip: 0,
rows_to_take: 1,
take_trailer: false,
},
ChunkInstructions {
chunk_idx: 0,
preamble: PreambleAction::Absent,
rows_to_skip: 3,
rows_to_take: 1,
take_trailer: false,
},
],
);
check(
&[5..6],
vec![
ChunkInstructions {
chunk_idx: 0,
preamble: PreambleAction::Absent,
rows_to_skip: 5,
rows_to_take: 1,
take_trailer: true,
},
ChunkInstructions {
chunk_idx: 1,
preamble: PreambleAction::Take,
rows_to_skip: 0,
rows_to_take: 0,
take_trailer: false,
},
],
);
check(
&[7..10],
vec![
ChunkInstructions {
chunk_idx: 1,
preamble: PreambleAction::Skip,
rows_to_skip: 1,
rows_to_take: 1,
take_trailer: false,
},
ChunkInstructions {
chunk_idx: 2,
preamble: PreambleAction::Absent,
rows_to_skip: 0,
rows_to_take: 2,
take_trailer: false,
},
],
);
}
#[test]
fn test_drain_instructions() {
fn drain_from_instructions(
instructions: &mut VecDeque<ChunkInstructions>,
mut rows_desired: u64,
need_preamble: &mut bool,
skip_in_chunk: &mut u64,
) -> Vec<ChunkDrainInstructions> {
let mut drain_instructions = Vec::with_capacity(instructions.len());
while rows_desired > 0 || *need_preamble {
let (next_instructions, consumed_chunk) = instructions
.front()
.unwrap()
.drain_from_instruction(&mut rows_desired, need_preamble, skip_in_chunk);
if consumed_chunk {
instructions.pop_front();
}
drain_instructions.push(next_instructions);
}
drain_instructions
}
let rep_data: Vec<u64> = vec![5, 2, 3, 0, 4, 7, 2, 0];
let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
let user_ranges = vec![1..7, 10..14];
let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
let mut to_drain = VecDeque::from(scheduled.clone());
let mut need_preamble = false;
let mut skip_in_chunk = 0;
let next_batch =
drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
assert!(!need_preamble);
assert_eq!(skip_in_chunk, 4);
assert_eq!(
next_batch,
vec![ChunkDrainInstructions {
chunk_instructions: scheduled[0].clone(),
rows_to_take: 4,
rows_to_skip: 0,
preamble_action: PreambleAction::Absent,
}]
);
let next_batch =
drain_from_instructions(&mut to_drain, 4, &mut need_preamble, &mut skip_in_chunk);
assert!(!need_preamble);
assert_eq!(skip_in_chunk, 2);
assert_eq!(
next_batch,
vec![
ChunkDrainInstructions {
chunk_instructions: scheduled[0].clone(),
rows_to_take: 1,
rows_to_skip: 4,
preamble_action: PreambleAction::Absent,
},
ChunkDrainInstructions {
chunk_instructions: scheduled[1].clone(),
rows_to_take: 1,
rows_to_skip: 0,
preamble_action: PreambleAction::Take,
},
ChunkDrainInstructions {
chunk_instructions: scheduled[2].clone(),
rows_to_take: 2,
rows_to_skip: 0,
preamble_action: PreambleAction::Absent,
}
]
);
let next_batch =
drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
assert!(!need_preamble);
assert_eq!(skip_in_chunk, 0);
assert_eq!(
next_batch,
vec![
ChunkDrainInstructions {
chunk_instructions: scheduled[2].clone(),
rows_to_take: 1,
rows_to_skip: 2,
preamble_action: PreambleAction::Absent,
},
ChunkDrainInstructions {
chunk_instructions: scheduled[3].clone(),
rows_to_take: 1,
rows_to_skip: 0,
preamble_action: PreambleAction::Take,
},
]
);
let rep_data: Vec<u64> = vec![5, 2, 3, 3, 20, 0];
let rep_bytes: Vec<u8> = rep_data.iter().flat_map(|v| v.to_le_bytes()).collect();
let repetition_index = MiniBlockRepIndex::decode_from_bytes(&rep_bytes, 2);
let user_ranges = vec![0..28];
let scheduled = ChunkInstructions::schedule_instructions(&repetition_index, &user_ranges);
let mut to_drain = VecDeque::from(scheduled.clone());
let mut need_preamble = false;
let mut skip_in_chunk = 0;
let next_batch =
drain_from_instructions(&mut to_drain, 7, &mut need_preamble, &mut skip_in_chunk);
assert_eq!(
next_batch,
vec![
ChunkDrainInstructions {
chunk_instructions: scheduled[0].clone(),
rows_to_take: 6,
rows_to_skip: 0,
preamble_action: PreambleAction::Absent,
},
ChunkDrainInstructions {
chunk_instructions: scheduled[1].clone(),
rows_to_take: 1,
rows_to_skip: 0,
preamble_action: PreambleAction::Take,
},
]
);
assert!(!need_preamble);
assert_eq!(skip_in_chunk, 1);
let next_batch =
drain_from_instructions(&mut to_drain, 2, &mut need_preamble, &mut skip_in_chunk);
assert_eq!(
next_batch,
vec![
ChunkDrainInstructions {
chunk_instructions: scheduled[1].clone(),
rows_to_take: 2,
rows_to_skip: 1,
preamble_action: PreambleAction::Skip,
},
ChunkDrainInstructions {
chunk_instructions: scheduled[2].clone(),
rows_to_take: 0,
rows_to_skip: 0,
preamble_action: PreambleAction::Take,
},
]
);
assert!(!need_preamble);
assert_eq!(skip_in_chunk, 0);
}
#[tokio::test]
async fn test_fullzip_initialize_is_lazy() {
use futures::{FutureExt, future::BoxFuture};
use std::ops::Range;
use std::sync::Mutex;
#[derive(Debug, Clone)]
struct RecordingScheduler {
data: bytes::Bytes,
requests: Arc<Mutex<Vec<Vec<Range<u64>>>>>,
}
impl RecordingScheduler {
fn new(data: bytes::Bytes) -> Self {
Self {
data,
requests: Arc::new(Mutex::new(Vec::new())),
}
}
fn requests(&self) -> Vec<Vec<Range<u64>>> {
self.requests.lock().unwrap().clone()
}
}
impl crate::EncodingsIo for RecordingScheduler {
fn submit_request(
&self,
ranges: Vec<Range<u64>>,
_priority: u64,
) -> BoxFuture<'static, crate::Result<Vec<bytes::Bytes>>> {
self.requests.lock().unwrap().push(ranges.clone());
let data = ranges
.into_iter()
.map(|range| self.data.slice(range.start as usize..range.end as usize))
.collect::<Vec<_>>();
std::future::ready(Ok(data)).boxed()
}
}
#[derive(Debug)]
struct TestFixedDecompressor;
impl FixedPerValueDecompressor for TestFixedDecompressor {
fn decompress(
&self,
_data: FixedWidthDataBlock,
_num_rows: u64,
) -> crate::Result<DataBlock> {
unimplemented!("Test decompressor")
}
fn bits_per_value(&self) -> u64 {
32
}
}
let io = Arc::new(RecordingScheduler::new(bytes::Bytes::from(vec![
0;
16 * 1024
])));
let mut scheduler = FullZipScheduler {
data_buf_position: 0,
data_buf_size: 4096,
rep_index: Some(FullZipRepIndexDetails {
buf_position: 1000,
bytes_per_value: 4,
}),
priority: 0,
rows_in_page: 100,
bits_per_offset: 32,
details: Arc::new(FullZipDecodeDetails {
value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
max_rep: 0,
max_visible_def: 0,
}),
cached_state: None,
enable_cache: false,
};
let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
let cached_data = scheduler.initialize(&io_dyn).await.unwrap();
assert!(
cached_data
.as_arc_any()
.downcast_ref::<super::NoCachedPageData>()
.is_some(),
"FullZip initialize should not eagerly load repetition index data"
);
assert!(scheduler.cached_state.is_none());
assert!(
io.requests().is_empty(),
"FullZip initialize should not issue any I/O"
);
}
#[tokio::test]
async fn test_fullzip_read_source_slices_prefetched_page() {
let page_start = 200_u64;
let page_data = LanceBuffer::copy_slice(&[0, 1, 2, 3, 4, 5, 6, 7]);
let source = FullZipReadSource::PrefetchedPage {
base_offset: page_start,
data: page_data,
};
let ranges = vec![
page_start..(page_start + 3),
(page_start + 4)..(page_start + 8),
];
let mut data = source.fetch(&ranges, 0).await.unwrap();
assert_eq!(data.pop_front().unwrap().as_ref(), &[0, 1, 2]);
assert_eq!(data.pop_front().unwrap().as_ref(), &[4, 5, 6, 7]);
}
#[tokio::test]
async fn test_fullzip_initialize_caches_rep_index_when_enabled() {
use futures::{FutureExt, future::BoxFuture};
use std::ops::Range;
use std::sync::Mutex;
#[derive(Debug, Clone)]
struct RecordingScheduler {
data: bytes::Bytes,
requests: Arc<Mutex<Vec<Vec<Range<u64>>>>>,
}
impl RecordingScheduler {
fn new(data: bytes::Bytes) -> Self {
Self {
data,
requests: Arc::new(Mutex::new(Vec::new())),
}
}
fn requests(&self) -> Vec<Vec<Range<u64>>> {
self.requests.lock().unwrap().clone()
}
}
impl crate::EncodingsIo for RecordingScheduler {
fn submit_request(
&self,
ranges: Vec<Range<u64>>,
_priority: u64,
) -> BoxFuture<'static, crate::Result<Vec<bytes::Bytes>>> {
self.requests.lock().unwrap().push(ranges.clone());
let data = ranges
.into_iter()
.map(|range| self.data.slice(range.start as usize..range.end as usize))
.collect::<Vec<_>>();
std::future::ready(Ok(data)).boxed()
}
}
#[derive(Debug)]
struct TestFixedDecompressor;
impl FixedPerValueDecompressor for TestFixedDecompressor {
fn decompress(
&self,
_data: FixedWidthDataBlock,
_num_rows: u64,
) -> crate::Result<DataBlock> {
unimplemented!("Test decompressor")
}
fn bits_per_value(&self) -> u64 {
32
}
}
let rows_in_page = 100_u64;
let bytes_per_value = 4_u64;
let rep_start = 1000_u64;
let rep_size = ((rows_in_page + 1) * bytes_per_value) as usize;
let mut data = vec![0_u8; 16 * 1024];
data[rep_start as usize..rep_start as usize + rep_size].fill(7);
let io = Arc::new(RecordingScheduler::new(bytes::Bytes::from(data)));
let mut scheduler = FullZipScheduler {
data_buf_position: 0,
data_buf_size: 4096,
rep_index: Some(FullZipRepIndexDetails {
buf_position: rep_start,
bytes_per_value,
}),
priority: 0,
rows_in_page,
bits_per_offset: 32,
details: Arc::new(FullZipDecodeDetails {
value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
max_rep: 0,
max_visible_def: 0,
}),
cached_state: None,
enable_cache: true,
};
let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
let cached_data = scheduler.initialize(&io_dyn).await.unwrap();
assert!(
cached_data
.as_arc_any()
.downcast_ref::<FullZipCacheableState>()
.is_some()
);
assert!(scheduler.cached_state.is_some());
assert_eq!(
io.requests(),
vec![vec![
rep_start..(rep_start + (rows_in_page + 1) * bytes_per_value)
]]
);
}
#[tokio::test]
async fn test_fullzip_full_page_bypasses_rep_index_io() {
use futures::{FutureExt, future::BoxFuture};
use std::ops::Range;
use std::sync::Mutex;
#[derive(Debug, Clone)]
struct RecordingScheduler {
data: bytes::Bytes,
requests: Arc<Mutex<Vec<Vec<Range<u64>>>>>,
}
impl RecordingScheduler {
fn new(data: bytes::Bytes) -> Self {
Self {
data,
requests: Arc::new(Mutex::new(Vec::new())),
}
}
fn requests(&self) -> Vec<Vec<Range<u64>>> {
self.requests.lock().unwrap().clone()
}
}
impl crate::EncodingsIo for RecordingScheduler {
fn submit_request(
&self,
ranges: Vec<Range<u64>>,
_priority: u64,
) -> BoxFuture<'static, crate::Result<Vec<bytes::Bytes>>> {
self.requests.lock().unwrap().push(ranges.clone());
let data = ranges
.into_iter()
.map(|range| self.data.slice(range.start as usize..range.end as usize))
.collect::<Vec<_>>();
std::future::ready(Ok(data)).boxed()
}
}
#[derive(Debug)]
struct TestFixedDecompressor;
impl FixedPerValueDecompressor for TestFixedDecompressor {
fn decompress(
&self,
_data: FixedWidthDataBlock,
_num_rows: u64,
) -> crate::Result<DataBlock> {
unimplemented!("Test decompressor")
}
fn bits_per_value(&self) -> u64 {
32
}
}
let rows_in_page = 100_u64;
let data_start = 256_u64;
let data_size = 500_u64;
let rep_start = 4096_u64;
let bytes_per_value = 4_u64;
let mut bytes = vec![0_u8; 16 * 1024];
for i in 0..=rows_in_page {
let offset = (i * 5) as u32;
let pos = rep_start as usize + (i * bytes_per_value) as usize;
bytes[pos..pos + 4].copy_from_slice(&offset.to_le_bytes());
}
let io = Arc::new(RecordingScheduler::new(bytes::Bytes::from(bytes)));
let scheduler = FullZipScheduler {
data_buf_position: data_start,
data_buf_size: data_size,
rep_index: Some(FullZipRepIndexDetails {
buf_position: rep_start,
bytes_per_value,
}),
priority: 0,
rows_in_page,
bits_per_offset: 32,
details: Arc::new(FullZipDecodeDetails {
value_decompressor: PerValueDecompressor::Fixed(Arc::new(TestFixedDecompressor)),
def_meaning: Arc::new([crate::repdef::DefinitionInterpretation::NullableItem]),
ctrl_word_parser: crate::repdef::ControlWordParser::new(0, 1),
max_rep: 0,
max_visible_def: 0,
}),
cached_state: None,
enable_cache: false,
};
let io_dyn: Arc<dyn crate::EncodingsIo> = io.clone();
let tasks = scheduler
.schedule_ranges_rep(
&[0..rows_in_page],
&io_dyn,
FullZipRepIndexDetails {
buf_position: rep_start,
bytes_per_value,
},
)
.unwrap();
let requests = io.requests();
assert_eq!(requests.len(), 1);
assert_eq!(requests[0], vec![data_start..(data_start + data_size)]);
let _ = tasks.into_iter().next().unwrap().decoder_fut.await.unwrap();
let requests_after_await = io.requests();
assert_eq!(
requests_after_await.len(),
1,
"full page path should not issue rep-index I/O"
);
}
#[tokio::test]
async fn test_fuzz_issue_4492_empty_rep_values() {
use lance_datagen::{RowCount, Seed, array, gen_batch};
let seed = 1823859942947654717u64;
let num_rows = 2741usize;
let batch_gen = gen_batch().with_seed(Seed::from(seed));
let base_generator = array::rand_type(&DataType::FixedSizeBinary(32));
let list_generator = array::rand_list_any(base_generator, false);
let batch = batch_gen
.anon_col(list_generator)
.into_batch_rows(RowCount::from(num_rows as u64))
.unwrap();
let list_array = batch.column(0).clone();
let mut metadata = HashMap::new();
metadata.insert(
STRUCTURAL_ENCODING_META_KEY.to_string(),
STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
);
let test_cases = TestCases::default()
.with_min_file_version(LanceFileVersion::V2_1)
.with_batch_size(100)
.with_range(0..num_rows.min(500) as u64)
.with_indices(vec![0, num_rows as u64 / 2, (num_rows - 1) as u64]);
check_round_trip_encoding_of_data(vec![list_array], &test_cases, metadata).await
}
async fn test_minichunk_size_helper(
string_data: Vec<Option<String>>,
minichunk_size: u64,
file_version: LanceFileVersion,
) {
use crate::constants::MINICHUNK_SIZE_META_KEY;
use crate::testing::{TestCases, check_round_trip_encoding_of_data};
use arrow_array::{ArrayRef, StringArray};
use std::sync::Arc;
let string_array: ArrayRef = Arc::new(StringArray::from(string_data));
let mut metadata = HashMap::new();
metadata.insert(
MINICHUNK_SIZE_META_KEY.to_string(),
minichunk_size.to_string(),
);
metadata.insert(
STRUCTURAL_ENCODING_META_KEY.to_string(),
STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
);
let test_cases = TestCases::default()
.with_min_file_version(file_version)
.with_batch_size(1000);
check_round_trip_encoding_of_data(vec![string_array], &test_cases, metadata).await;
}
#[tokio::test]
async fn test_minichunk_size_roundtrip() {
let mut string_data = Vec::new();
for i in 0..100 {
string_data.push(Some(format!("test_string_{}", i).repeat(50)));
}
test_minichunk_size_helper(string_data, 64, LanceFileVersion::V2_1).await;
}
#[tokio::test]
async fn test_minichunk_size_128kb_v2_2() {
let mut string_data = Vec::new();
for i in 0..10000 {
string_data.push(Some(format!("test_string_{}", i).repeat(50)));
}
test_minichunk_size_helper(string_data, 128 * 1024, LanceFileVersion::V2_2).await;
}
#[tokio::test]
async fn test_binary_large_minichunk_size_over_max_miniblock_values() {
let mut string_data = Vec::new();
for i in 0..10000 {
string_data.push(Some(format!("t_{}", i)));
}
test_minichunk_size_helper(string_data, 128 * 1024, LanceFileVersion::V2_2).await;
}
#[tokio::test]
async fn test_large_dictionary_general_compression() {
use arrow_array::{ArrayRef, StringArray};
use std::collections::HashMap;
use std::sync::Arc;
let unique_values: Vec<String> = (0..100)
.map(|i| format!("value_{:04}_{}", i, "x".repeat(500)))
.collect();
let repeated_strings: Vec<_> = unique_values
.iter()
.cycle()
.take(100_000)
.map(|s| Some(s.as_str()))
.collect();
let string_array = Arc::new(StringArray::from(repeated_strings)) as ArrayRef;
let test_cases = TestCases::default()
.with_min_file_version(LanceFileVersion::V2_2)
.with_verify_encoding(Arc::new(|cols: &[crate::encoder::EncodedColumn], _| {
assert_eq!(cols.len(), 1);
let col = &cols[0];
if let Some(PageEncoding::Structural(page_layout)) =
&col.final_pages.first().map(|p| &p.description)
&& let Some(pb21::page_layout::Layout::MiniBlockLayout(mini_block)) =
&page_layout.layout
&& let Some(dictionary_encoding) = &mini_block.dictionary
{
match dictionary_encoding.compression.as_ref() {
Some(Compression::General(general)) => {
let compression = general.compression.as_ref().unwrap();
assert!(
compression.scheme()
== pb21::CompressionScheme::CompressionAlgorithmLz4
|| compression.scheme()
== pb21::CompressionScheme::CompressionAlgorithmZstd,
"Expected LZ4 or Zstd compression for large dictionary"
);
}
_ => panic!("Expected General compression for large dictionary"),
}
}
}));
check_round_trip_encoding_of_data(vec![string_array], &test_cases, HashMap::new()).await;
}
fn dictionary_encoding_from_page(
page: &crate::encoder::EncodedPage,
) -> &crate::format::pb21::CompressiveEncoding {
let PageEncoding::Structural(layout) = &page.description else {
panic!("Expected structural page encoding");
};
let pb21::page_layout::Layout::MiniBlockLayout(layout) = layout.layout.as_ref().unwrap()
else {
panic!("Expected mini-block layout");
};
layout
.dictionary
.as_ref()
.unwrap_or_else(|| panic!("Expected dictionary encoding"))
}
async fn encode_variable_dict_page(
metadata: HashMap<String, String>,
) -> crate::encoder::EncodedPage {
use arrow_array::types::Int32Type;
use arrow_array::{ArrayRef, DictionaryArray, Int32Array, StringArray};
let values = Arc::new(StringArray::from(
(0..128)
.map(|i| format!("value_{i:04}_{}", "x".repeat(256)))
.collect::<Vec<_>>(),
)) as ArrayRef;
let keys = Int32Array::from_iter_values((0..20_000).map(|i| i % 128));
let dict_array =
Arc::new(DictionaryArray::<Int32Type>::try_new(keys, values).unwrap()) as ArrayRef;
let field = arrow_schema::Field::new(
"dict_col",
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
false,
)
.with_metadata(metadata);
encode_first_page(field, dict_array, LanceFileVersion::V2_2).await
}
async fn encode_auto_fixed_dict_page(
metadata: HashMap<String, String>,
) -> crate::encoder::EncodedPage {
use arrow_array::{ArrayRef, Decimal128Array};
let values = (0..20_000)
.map(|i| match i % 3 {
0 => 10_i128,
1 => 20_i128,
_ => 30_i128,
})
.collect::<Vec<_>>();
let decimal = Decimal128Array::from_iter_values(values)
.with_precision_and_scale(38, 0)
.unwrap();
let decimal = Arc::new(decimal) as ArrayRef;
let mut field_metadata = metadata;
field_metadata.insert(
"lance-encoding:dict-size-ratio".to_string(),
"0.99".to_string(),
);
let field = arrow_schema::Field::new("fixed_col", DataType::Decimal128(38, 0), false)
.with_metadata(field_metadata);
encode_first_page(field, decimal, LanceFileVersion::V2_2).await
}
#[tokio::test]
async fn test_dict_values_general_compression_default_lz4_for_variable_dict_values() {
let page = encode_variable_dict_page(HashMap::new()).await;
let dictionary_encoding = dictionary_encoding_from_page(&page);
let Some(Compression::General(general)) = dictionary_encoding.compression.as_ref() else {
panic!("Expected General compression for dictionary values");
};
let compression = general.compression.as_ref().unwrap();
assert_eq!(
compression.scheme(),
pb21::CompressionScheme::CompressionAlgorithmLz4
);
}
#[tokio::test]
async fn test_dict_values_general_compression_default_lz4_for_fixed_dict_values() {
let page = encode_auto_fixed_dict_page(HashMap::new()).await;
let dictionary_encoding = dictionary_encoding_from_page(&page);
let Some(Compression::General(general)) = dictionary_encoding.compression.as_ref() else {
panic!("Expected General compression for dictionary values");
};
let compression = general.compression.as_ref().unwrap();
assert_eq!(
compression.scheme(),
pb21::CompressionScheme::CompressionAlgorithmLz4
);
}
#[tokio::test]
async fn test_dict_values_general_compression_zstd() {
let mut metadata = HashMap::new();
metadata.insert(
DICT_VALUES_COMPRESSION_META_KEY.to_string(),
"zstd".to_string(),
);
let page = encode_variable_dict_page(metadata).await;
let dictionary_encoding = dictionary_encoding_from_page(&page);
let Some(Compression::General(general)) = dictionary_encoding.compression.as_ref() else {
panic!("Expected General compression for dictionary values");
};
let compression = general.compression.as_ref().unwrap();
assert_eq!(
compression.scheme(),
pb21::CompressionScheme::CompressionAlgorithmZstd
);
}
#[tokio::test]
async fn test_dict_values_general_compression_none() {
let mut metadata = HashMap::new();
metadata.insert(
DICT_VALUES_COMPRESSION_META_KEY.to_string(),
"none".to_string(),
);
let page = encode_variable_dict_page(metadata).await;
let dictionary_encoding = dictionary_encoding_from_page(&page);
assert!(
!matches!(
dictionary_encoding.compression.as_ref(),
Some(Compression::General(_))
),
"Expected dictionary values to avoid General compression"
);
}
#[test]
fn test_resolve_dict_values_compression_metadata_defaults_to_lz4() {
let metadata = PrimitiveStructuralEncoder::resolve_dict_values_compression_metadata(
&HashMap::new(),
None,
None,
);
assert_eq!(metadata.get(COMPRESSION_META_KEY), Some(&"lz4".to_string()),);
assert!(!metadata.contains_key(COMPRESSION_LEVEL_META_KEY));
}
#[test]
fn test_resolve_dict_values_compression_metadata_metadata_overrides_env() {
let field_metadata = HashMap::from([
(
DICT_VALUES_COMPRESSION_META_KEY.to_string(),
"none".to_string(),
),
(
DICT_VALUES_COMPRESSION_LEVEL_META_KEY.to_string(),
"7".to_string(),
),
]);
let metadata = PrimitiveStructuralEncoder::resolve_dict_values_compression_metadata(
&field_metadata,
Some("zstd".to_string()),
Some("3".to_string()),
);
assert_eq!(
metadata.get(COMPRESSION_META_KEY),
Some(&"none".to_string()),
);
assert_eq!(
metadata.get(COMPRESSION_LEVEL_META_KEY),
Some(&"7".to_string()),
);
}
#[test]
fn test_resolve_dict_values_compression_metadata_env_fallback() {
let metadata = PrimitiveStructuralEncoder::resolve_dict_values_compression_metadata(
&HashMap::new(),
Some("zstd".to_string()),
Some("9".to_string()),
);
assert_eq!(
metadata.get(COMPRESSION_META_KEY),
Some(&"zstd".to_string()),
);
assert_eq!(
metadata.get(COMPRESSION_LEVEL_META_KEY),
Some(&"9".to_string()),
);
}
#[tokio::test]
async fn test_dictionary_encode_int64() {
use crate::constants::{DICT_SIZE_RATIO_META_KEY, STRUCTURAL_ENCODING_META_KEY};
use crate::testing::{TestCases, check_round_trip_encoding_of_data};
use crate::version::LanceFileVersion;
use arrow_array::{ArrayRef, Int64Array};
use std::collections::HashMap;
use std::sync::Arc;
let values = (0..1000)
.map(|i| match i % 3 {
0 => 10i64,
1 => 20i64,
_ => 30i64,
})
.collect::<Vec<_>>();
let array = Arc::new(Int64Array::from(values)) as ArrayRef;
let mut metadata = HashMap::new();
metadata.insert(
STRUCTURAL_ENCODING_META_KEY.to_string(),
STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
);
metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.99".to_string());
let test_cases = TestCases::default()
.with_min_file_version(LanceFileVersion::V2_2)
.with_batch_size(1000)
.with_range(0..1000)
.with_indices(vec![0, 1, 10, 999])
.with_expected_encoding("dictionary");
check_round_trip_encoding_of_data(vec![array], &test_cases, metadata).await;
}
#[tokio::test]
async fn test_dictionary_encode_float64() {
use crate::constants::{DICT_SIZE_RATIO_META_KEY, STRUCTURAL_ENCODING_META_KEY};
use crate::testing::{TestCases, check_round_trip_encoding_of_data};
use crate::version::LanceFileVersion;
use arrow_array::{ArrayRef, Float64Array};
use std::collections::HashMap;
use std::sync::Arc;
let values = (0..1000)
.map(|i| match i % 3 {
0 => 0.1f64,
1 => 0.2f64,
_ => 0.3f64,
})
.collect::<Vec<_>>();
let array = Arc::new(Float64Array::from(values)) as ArrayRef;
let mut metadata = HashMap::new();
metadata.insert(
STRUCTURAL_ENCODING_META_KEY.to_string(),
STRUCTURAL_ENCODING_MINIBLOCK.to_string(),
);
metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.99".to_string());
let test_cases = TestCases::default()
.with_min_file_version(LanceFileVersion::V2_2)
.with_batch_size(1000)
.with_range(0..1000)
.with_indices(vec![0, 1, 10, 999])
.with_expected_encoding("dictionary");
check_round_trip_encoding_of_data(vec![array], &test_cases, metadata).await;
}
#[test]
fn test_miniblock_dictionary_out_of_line_bitpacking_decode() {
let rows = 10_000;
let unique_values = 2_000;
let dictionary_encoding =
ProtobufUtils21::out_of_line_bitpacking(64, ProtobufUtils21::flat(11, None));
let layout = pb21::MiniBlockLayout {
rep_compression: None,
def_compression: None,
value_compression: Some(ProtobufUtils21::flat(64, None)),
dictionary: Some(dictionary_encoding),
num_dictionary_items: unique_values,
layers: vec![pb21::RepDefLayer::RepdefAllValidItem as i32],
num_buffers: 1,
repetition_index_depth: 0,
num_items: rows,
has_large_chunk: false,
};
let buffer_offsets_and_sizes = vec![(0, 0), (0, 0), (0, 0)];
let scheduler = super::MiniBlockScheduler::try_new(
&buffer_offsets_and_sizes,
0,
rows,
&layout,
&DefaultDecompressionStrategy::default(),
)
.unwrap();
let dictionary = scheduler.dictionary.unwrap();
assert_eq!(dictionary.num_dictionary_items, unique_values);
assert_eq!(
dictionary.dictionary_data_alignment,
crate::encoder::MIN_PAGE_BUFFER_ALIGNMENT
);
}
fn create_test_fixed_data_block(
num_values: u64,
cardinality: u64,
bits_per_value: u64,
) -> DataBlock {
assert!(cardinality > 0);
assert!(cardinality <= num_values);
let block_info = BlockInfo::default();
assert_eq!(bits_per_value % 8, 0);
let data = match bits_per_value {
32 => {
let values = (0..num_values)
.map(|i| (i % cardinality) as u32)
.collect::<Vec<_>>();
crate::buffer::LanceBuffer::reinterpret_vec(values)
}
64 => {
let values = (0..num_values).map(|i| i % cardinality).collect::<Vec<_>>();
crate::buffer::LanceBuffer::reinterpret_vec(values)
}
128 => {
let values = (0..num_values)
.map(|i| (i % cardinality) as u128)
.collect::<Vec<_>>();
crate::buffer::LanceBuffer::reinterpret_vec(values)
}
_ => unreachable!(),
};
DataBlock::FixedWidth(FixedWidthDataBlock {
bits_per_value,
data,
num_values,
block_info,
})
}
fn create_test_variable_width_block(num_values: u64, cardinality: u64) -> DataBlock {
use arrow_array::StringArray;
assert!(cardinality <= num_values && cardinality > 0);
let mut values = Vec::with_capacity(num_values as usize);
for i in 0..num_values {
values.push(format!("value_{:016}", i % cardinality));
}
let array = StringArray::from(values);
DataBlock::from_array(Arc::new(array) as ArrayRef)
}
#[test]
fn test_should_dictionary_encode() {
use crate::constants::DICT_SIZE_RATIO_META_KEY;
use lance_core::datatypes::Field as LanceField;
let block = create_test_variable_width_block(1000, 10);
let mut metadata = HashMap::new();
metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.8".to_string());
let arrow_field =
arrow_schema::Field::new("test", DataType::Utf8, false).with_metadata(metadata);
let field = LanceField::try_from(&arrow_field).unwrap();
let result = PrimitiveStructuralEncoder::should_dictionary_encode(
&block,
&field,
LanceFileVersion::V2_1,
);
assert!(
result.is_some(),
"Should use dictionary encode based on size"
);
}
#[test]
fn test_should_not_dictionary_encode_unsupported_bits() {
use crate::constants::DICT_SIZE_RATIO_META_KEY;
use lance_core::datatypes::Field as LanceField;
let block = create_test_fixed_data_block(1000, 1000, 32);
let mut metadata = HashMap::new();
metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "0.8".to_string());
let arrow_field =
arrow_schema::Field::new("test", DataType::Int32, false).with_metadata(metadata);
let field = LanceField::try_from(&arrow_field).unwrap();
let result = PrimitiveStructuralEncoder::should_dictionary_encode(
&block,
&field,
LanceFileVersion::V2_1,
);
assert!(
result.is_none(),
"Should not use dictionary encode for unsupported bit width"
);
}
#[test]
fn test_should_not_dictionary_encode_near_unique_sample() {
use crate::constants::DICT_SIZE_RATIO_META_KEY;
use lance_core::datatypes::Field as LanceField;
let num_values = 5000;
let block = create_test_variable_width_block(num_values, num_values);
let mut metadata = HashMap::new();
metadata.insert(DICT_SIZE_RATIO_META_KEY.to_string(), "1.0".to_string());
let arrow_field =
arrow_schema::Field::new("test", DataType::Utf8, false).with_metadata(metadata);
let field = LanceField::try_from(&arrow_field).unwrap();
let result = PrimitiveStructuralEncoder::should_dictionary_encode(
&block,
&field,
LanceFileVersion::V2_1,
);
assert!(
result.is_none(),
"Should not probe dictionary encoding for near-unique data"
);
}
async fn encode_first_page(
field: arrow_schema::Field,
array: ArrayRef,
version: LanceFileVersion,
) -> crate::encoder::EncodedPage {
use crate::encoder::{
ColumnIndexSequence, EncodingOptions, MIN_PAGE_BUFFER_ALIGNMENT, OutOfLineBuffers,
default_encoding_strategy,
};
use crate::repdef::RepDefBuilder;
let lance_field = lance_core::datatypes::Field::try_from(&field).unwrap();
let encoding_strategy = default_encoding_strategy(version);
let mut column_index_seq = ColumnIndexSequence::default();
let encoding_options = EncodingOptions {
cache_bytes_per_column: 1,
max_page_bytes: 32 * 1024 * 1024,
keep_original_array: true,
buffer_alignment: MIN_PAGE_BUFFER_ALIGNMENT,
version,
};
let mut encoder = encoding_strategy
.create_field_encoder(
encoding_strategy.as_ref(),
&lance_field,
&mut column_index_seq,
&encoding_options,
)
.unwrap();
let mut external_buffers = OutOfLineBuffers::new(0, MIN_PAGE_BUFFER_ALIGNMENT);
let repdef = RepDefBuilder::default();
let num_rows = array.len() as u64;
let mut pages = Vec::new();
for task in encoder
.maybe_encode(array, &mut external_buffers, repdef, 0, num_rows)
.unwrap()
{
pages.push(task.await.unwrap());
}
for task in encoder.flush(&mut external_buffers).unwrap() {
pages.push(task.await.unwrap());
}
pages.into_iter().next().unwrap()
}
#[tokio::test]
async fn test_constant_layout_out_of_line_fixed_size_binary_v2_2() {
use crate::format::pb21::page_layout::Layout;
let val = vec![0xABu8; 33];
let arr: ArrayRef = Arc::new(
arrow_array::FixedSizeBinaryArray::try_from_sparse_iter_with_size(
std::iter::repeat_n(Some(val.as_slice()), 256),
33,
)
.unwrap(),
);
let field = arrow_schema::Field::new("c", DataType::FixedSizeBinary(33), true);
let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
let PageEncoding::Structural(layout) = &page.description else {
panic!("Expected structural encoding");
};
let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
panic!("Expected constant layout in slot 2");
};
assert!(layout.inline_value.is_none());
assert_eq!(page.data.len(), 1);
let test_cases = TestCases::default()
.with_min_file_version(LanceFileVersion::V2_2)
.with_max_file_version(LanceFileVersion::V2_2)
.with_page_sizes(vec![4096]);
check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
}
#[tokio::test]
async fn test_constant_layout_out_of_line_utf8_v2_2() {
use crate::format::pb21::page_layout::Layout;
let arr: ArrayRef = Arc::new(arrow_array::StringArray::from_iter_values(
std::iter::repeat_n("hello", 512),
));
let field = arrow_schema::Field::new("c", DataType::Utf8, true);
let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
let PageEncoding::Structural(layout) = &page.description else {
panic!("Expected structural encoding");
};
let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
panic!("Expected constant layout in slot 2");
};
assert!(layout.inline_value.is_none());
assert_eq!(page.data.len(), 1);
let test_cases = TestCases::default()
.with_min_file_version(LanceFileVersion::V2_2)
.with_max_file_version(LanceFileVersion::V2_2)
.with_page_sizes(vec![4096]);
check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
}
#[tokio::test]
async fn test_constant_layout_nullable_item_v2_2() {
use crate::format::pb21::page_layout::Layout;
let arr: ArrayRef = Arc::new(arrow_array::Int32Array::from(vec![
Some(7),
None,
Some(7),
None,
Some(7),
]));
let field = arrow_schema::Field::new("c", DataType::Int32, true);
let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
let PageEncoding::Structural(layout) = &page.description else {
panic!("Expected structural encoding");
};
let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
panic!("Expected constant layout in slot 2");
};
assert!(layout.inline_value.is_some());
assert_eq!(page.data.len(), 2);
let test_cases = TestCases::default()
.with_min_file_version(LanceFileVersion::V2_2)
.with_max_file_version(LanceFileVersion::V2_2)
.with_page_sizes(vec![4096]);
check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
}
#[tokio::test]
async fn test_constant_layout_list_repdef_v2_2() {
use crate::format::pb21::page_layout::Layout;
use arrow_array::builder::{Int32Builder, ListBuilder};
let mut builder = ListBuilder::new(Int32Builder::new());
builder.values().append_value(7);
builder.values().append_null();
builder.values().append_value(7);
builder.append(true);
builder.append(true);
builder.values().append_value(7);
builder.append(true);
builder.append_null();
let arr: ArrayRef = Arc::new(builder.finish());
let field = arrow_schema::Field::new(
"c",
DataType::List(Arc::new(arrow_schema::Field::new(
"item",
DataType::Int32,
true,
))),
true,
);
let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
let PageEncoding::Structural(layout) = &page.description else {
panic!("Expected structural encoding");
};
let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
panic!("Expected constant layout in slot 2");
};
assert!(layout.inline_value.is_some());
assert_eq!(page.data.len(), 2);
let test_cases = TestCases::default()
.with_min_file_version(LanceFileVersion::V2_2)
.with_max_file_version(LanceFileVersion::V2_2)
.with_page_sizes(vec![4096]);
check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
}
#[tokio::test]
async fn test_constant_layout_fixed_size_list_not_used_v2_2() {
use crate::format::pb21::page_layout::Layout;
use arrow_array::builder::{FixedSizeListBuilder, Int32Builder};
let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 3);
for _ in 0..64 {
builder.values().append_value(1);
builder.values().append_null();
builder.values().append_value(3);
builder.append(true);
}
let arr: ArrayRef = Arc::new(builder.finish());
let field = arrow_schema::Field::new(
"c",
DataType::FixedSizeList(
Arc::new(arrow_schema::Field::new("item", DataType::Int32, true)),
3,
),
true,
);
let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
if let PageEncoding::Structural(layout) = &page.description {
assert!(
!matches!(layout.layout.as_ref().unwrap(), Layout::ConstantLayout(_)),
"FixedSizeList should not use constant layout yet"
);
}
let test_cases = TestCases::default()
.with_min_file_version(LanceFileVersion::V2_2)
.with_max_file_version(LanceFileVersion::V2_2)
.with_page_sizes(vec![4096]);
check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
}
#[tokio::test]
async fn test_constant_layout_not_written_before_v2_2() {
use crate::format::pb21::page_layout::Layout;
let arr: ArrayRef = Arc::new(arrow_array::Int32Array::from(vec![7; 1024]));
let field = arrow_schema::Field::new("c", DataType::Int32, true);
let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_1).await;
let PageEncoding::Structural(layout) = &page.description else {
return;
};
assert!(
!matches!(layout.layout.as_ref().unwrap(), Layout::ConstantLayout(_)),
"Should not emit constant layout before v2.2"
);
let test_cases = TestCases::default()
.with_min_file_version(LanceFileVersion::V2_1)
.with_max_file_version(LanceFileVersion::V2_1)
.with_page_sizes(vec![4096]);
check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
}
#[tokio::test]
async fn test_all_null_constant_layout_still_works_v2_2() {
use crate::format::pb21::page_layout::Layout;
let arr: ArrayRef = Arc::new(arrow_array::Int32Array::from(vec![None, None, None]));
let field = arrow_schema::Field::new("c", DataType::Int32, true);
let page = encode_first_page(field, arr.clone(), LanceFileVersion::V2_2).await;
let PageEncoding::Structural(layout) = &page.description else {
panic!("Expected structural encoding");
};
let Layout::ConstantLayout(layout) = layout.layout.as_ref().unwrap() else {
panic!("Expected layout in slot 2");
};
assert!(layout.inline_value.is_none());
assert_eq!(page.data.len(), 0);
let test_cases = TestCases::default()
.with_min_file_version(LanceFileVersion::V2_2)
.with_max_file_version(LanceFileVersion::V2_2)
.with_page_sizes(vec![4096]);
check_round_trip_encoding_of_data(vec![arr], &test_cases, HashMap::new()).await;
}
#[test]
fn test_encode_decode_complex_all_null_vals_roundtrip() {
use crate::compression::{
DecompressionStrategy, DefaultCompressionStrategy, DefaultDecompressionStrategy,
};
let values: Arc<[u16]> = Arc::from((0..2048).map(|i| (i % 5) as u16).collect::<Vec<u16>>());
let compression_strategy = DefaultCompressionStrategy::default();
let decompression_strategy = DefaultDecompressionStrategy::default();
let (compressed_buf, encoding) = PrimitiveStructuralEncoder::encode_complex_all_null_vals(
&values,
&compression_strategy,
)
.unwrap();
let decompressor = decompression_strategy
.create_block_decompressor(&encoding)
.unwrap();
let decompressed = decompressor
.decompress(compressed_buf, values.len() as u64)
.unwrap();
let decompressed_fixed_width = decompressed.as_fixed_width().unwrap();
assert_eq!(decompressed_fixed_width.num_values, values.len() as u64);
assert_eq!(decompressed_fixed_width.bits_per_value, 16);
let rep_result = decompressed_fixed_width.data.borrow_to_typed_slice::<u16>();
assert_eq!(rep_result.as_ref(), values.as_ref());
}
#[tokio::test]
async fn test_complex_all_null_compression_gated_by_version() {
use crate::format::pb21::page_layout::Layout;
use arrow_array::ListArray;
let list_array = ListArray::from_iter_primitive::<arrow_array::types::Int32Type, _, _>(
(0..1000).map(|i| if i % 2 == 0 { None } else { Some(vec![]) }),
);
let arr: ArrayRef = Arc::new(list_array);
let field = arrow_schema::Field::new(
"c",
DataType::List(Arc::new(arrow_schema::Field::new(
"item",
DataType::Int32,
true,
))),
true,
);
let page_v21 = encode_first_page(field.clone(), arr.clone(), LanceFileVersion::V2_1).await;
let PageEncoding::Structural(layout_v21) = &page_v21.description else {
panic!("Expected structural encoding");
};
let Layout::ConstantLayout(layout_v21) = layout_v21.layout.as_ref().unwrap() else {
panic!("Expected constant layout");
};
assert!(layout_v21.rep_compression.is_none());
assert!(layout_v21.def_compression.is_none());
assert_eq!(layout_v21.num_rep_values, 0);
assert_eq!(layout_v21.num_def_values, 0);
let page_v22 = encode_first_page(field, arr, LanceFileVersion::V2_2).await;
let PageEncoding::Structural(layout_v22) = &page_v22.description else {
panic!("Expected structural encoding");
};
let Layout::ConstantLayout(layout_v22) = layout_v22.layout.as_ref().unwrap() else {
panic!("Expected constant layout");
};
assert!(layout_v22.def_compression.is_some());
assert!(layout_v22.num_def_values > 0);
}
#[tokio::test]
async fn test_complex_all_null_round_trip() {
use arrow_array::ListArray;
let list_array = ListArray::from_iter_primitive::<arrow_array::types::Int32Type, _, _>(
(0..1000).map(|i| if i % 2 == 0 { None } else { Some(vec![]) }),
);
let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_2);
check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, HashMap::new())
.await;
}
}