use std::{
collections::{BinaryHeap, VecDeque},
ops::Range,
sync::Arc,
};
use super::{
fixed_size_list::StructuralFixedSizeListDecoder, list::StructuralListDecoder,
map::StructuralMapDecoder, primitive::StructuralPrimitiveFieldDecoder,
};
use crate::{
decoder::{
DecodedArray, FilterExpression, LoadedPageShard, NextDecodeTask, PageEncoding,
ScheduledScanLine, SchedulerContext, StructuralDecodeArrayTask, StructuralFieldDecoder,
StructuralFieldScheduler, StructuralSchedulingJob,
},
encoder::{EncodeTask, EncodedColumn, EncodedPage, FieldEncoder, OutOfLineBuffers},
format::pb,
repdef::{CompositeRepDefUnraveler, RepDefBuilder},
};
use arrow_array::{Array, ArrayRef, StructArray, cast::AsArray};
use arrow_schema::{DataType, Fields};
use futures::{
FutureExt, StreamExt, TryStreamExt,
future::BoxFuture,
stream::{FuturesOrdered, FuturesUnordered},
};
use itertools::Itertools;
use lance_arrow::FieldExt;
use lance_arrow::{deepcopy::deep_copy_nulls, r#struct::StructArrayExt};
use lance_core::{Error, Result};
use log::trace;
#[derive(Debug)]
struct StructuralSchedulingJobWithStatus<'a> {
col_idx: u32,
col_name: &'a str,
job: Box<dyn StructuralSchedulingJob + 'a>,
rows_scheduled: u64,
rows_remaining: u64,
ready_scan_lines: VecDeque<ScheduledScanLine>,
}
impl PartialEq for StructuralSchedulingJobWithStatus<'_> {
fn eq(&self, other: &Self) -> bool {
self.col_idx == other.col_idx
}
}
impl Eq for StructuralSchedulingJobWithStatus<'_> {}
impl PartialOrd for StructuralSchedulingJobWithStatus<'_> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for StructuralSchedulingJobWithStatus<'_> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
other.rows_scheduled.cmp(&self.rows_scheduled)
}
}
#[derive(Debug)]
struct RepDefStructSchedulingJob<'a> {
children: BinaryHeap<StructuralSchedulingJobWithStatus<'a>>,
rows_scheduled: u64,
num_rows: u64,
}
impl<'a> RepDefStructSchedulingJob<'a> {
fn new(
scheduler: &'a StructuralStructScheduler,
children: Vec<Box<dyn StructuralSchedulingJob + 'a>>,
num_rows: u64,
) -> Self {
let children = children
.into_iter()
.enumerate()
.map(|(idx, job)| StructuralSchedulingJobWithStatus {
col_idx: idx as u32,
col_name: scheduler.child_fields[idx].name(),
job,
rows_scheduled: 0,
rows_remaining: num_rows,
ready_scan_lines: VecDeque::new(),
})
.collect::<BinaryHeap<_>>();
Self {
children,
rows_scheduled: 0,
num_rows,
}
}
}
impl StructuralSchedulingJob for RepDefStructSchedulingJob<'_> {
fn schedule_next(
&mut self,
mut context: &mut SchedulerContext,
) -> Result<Vec<ScheduledScanLine>> {
if self.children.is_empty() {
if self.rows_scheduled == self.num_rows {
return Ok(Vec::new());
}
self.rows_scheduled = self.num_rows;
return Ok(vec![ScheduledScanLine {
decoders: Vec::new(),
rows_scheduled: self.num_rows,
}]);
}
let mut decoders = Vec::new();
let old_rows_scheduled = self.rows_scheduled;
while old_rows_scheduled == self.rows_scheduled {
if self.children.is_empty() {
return Ok(Vec::new());
}
let mut next_child = self.children.pop().unwrap();
if next_child.ready_scan_lines.is_empty() {
let scoped = context.push(next_child.col_name, next_child.col_idx);
let child_scans = next_child.job.schedule_next(scoped.context)?;
context = scoped.pop();
if child_scans.is_empty() {
continue;
}
next_child.ready_scan_lines.extend(child_scans);
}
let child_scan = next_child.ready_scan_lines.pop_front().unwrap();
trace!(
"Scheduled {} rows for child {}",
child_scan.rows_scheduled, next_child.col_idx
);
next_child.rows_scheduled += child_scan.rows_scheduled;
next_child.rows_remaining -= child_scan.rows_scheduled;
decoders.extend(child_scan.decoders);
self.children.push(next_child);
self.rows_scheduled = self.children.peek().unwrap().rows_scheduled;
}
let struct_rows_scheduled = self.rows_scheduled - old_rows_scheduled;
Ok(vec![ScheduledScanLine {
decoders,
rows_scheduled: struct_rows_scheduled,
}])
}
}
#[derive(Debug)]
pub struct StructuralStructScheduler {
children: Vec<Box<dyn StructuralFieldScheduler>>,
child_fields: Fields,
}
impl StructuralStructScheduler {
pub fn new(children: Vec<Box<dyn StructuralFieldScheduler>>, child_fields: Fields) -> Self {
Self {
children,
child_fields,
}
}
}
impl StructuralFieldScheduler for StructuralStructScheduler {
fn schedule_ranges<'a>(
&'a self,
ranges: &[Range<u64>],
filter: &FilterExpression,
) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
let child_schedulers = self
.children
.iter()
.map(|child| child.schedule_ranges(ranges, filter))
.collect::<Result<Vec<_>>>()?;
Ok(Box::new(RepDefStructSchedulingJob::new(
self,
child_schedulers,
num_rows,
)))
}
fn initialize<'a>(
&'a mut self,
filter: &'a FilterExpression,
context: &'a SchedulerContext,
) -> BoxFuture<'a, Result<()>> {
let children_initialization = self
.children
.iter_mut()
.map(|child| child.initialize(filter, context))
.collect::<FuturesUnordered<_>>();
async move {
children_initialization
.map(|res| res.map(|_| ()))
.try_collect::<Vec<_>>()
.await?;
Ok(())
}
.boxed()
}
}
#[derive(Debug)]
pub struct StructuralStructDecoder {
children: Vec<Box<dyn StructuralFieldDecoder>>,
data_type: DataType,
child_fields: Fields,
is_root: bool,
}
impl StructuralStructDecoder {
pub fn new(fields: Fields, should_validate: bool, is_root: bool) -> Result<Self> {
let children = fields
.iter()
.map(|field| Self::field_to_decoder(field, should_validate))
.collect::<Result<Vec<_>>>()?;
let data_type = DataType::Struct(fields.clone());
Ok(Self {
data_type,
children,
child_fields: fields,
is_root,
})
}
fn field_to_decoder(
field: &Arc<arrow_schema::Field>,
should_validate: bool,
) -> Result<Box<dyn StructuralFieldDecoder>> {
match field.data_type() {
DataType::Struct(fields) => {
if field.is_packed_struct() || field.is_blob() {
let decoder =
StructuralPrimitiveFieldDecoder::new(&field.clone(), should_validate);
Ok(Box::new(decoder))
} else {
Ok(Box::new(Self::new(fields.clone(), should_validate, false)?))
}
}
DataType::List(child_field) | DataType::LargeList(child_field) => {
let child_decoder = Self::field_to_decoder(child_field, should_validate)?;
Ok(Box::new(StructuralListDecoder::new(
child_decoder,
field.data_type().clone(),
)))
}
DataType::FixedSizeList(child_field, _)
if matches!(child_field.data_type(), DataType::Struct(_)) =>
{
let child_decoder = Self::field_to_decoder(child_field, should_validate)?;
Ok(Box::new(StructuralFixedSizeListDecoder::new(
child_decoder,
field.data_type().clone(),
)))
}
DataType::Map(entries_field, keys_sorted) => {
if *keys_sorted {
return Err(Error::not_supported_source(
"Map data type with keys_sorted=true is not supported yet"
.to_string()
.into(),
));
}
let child_decoder = Self::field_to_decoder(entries_field, should_validate)?;
Ok(Box::new(StructuralMapDecoder::new(
child_decoder,
field.data_type().clone(),
)))
}
DataType::RunEndEncoded(_, _) => todo!(),
DataType::ListView(_) | DataType::LargeListView(_) => todo!(),
DataType::Union(_, _) => todo!(),
_ => Ok(Box::new(StructuralPrimitiveFieldDecoder::new(
field,
should_validate,
))),
}
}
pub fn drain_batch_task(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
let array_drain = self.drain(num_rows)?;
Ok(NextDecodeTask {
num_rows,
task: Box::new(array_drain),
})
}
}
impl StructuralFieldDecoder for StructuralStructDecoder {
fn accept_page(&mut self, mut child: LoadedPageShard) -> Result<()> {
let child_idx = child.path.pop_front().unwrap();
self.children[child_idx as usize].accept_page(child)?;
Ok(())
}
fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
let child_tasks = self
.children
.iter_mut()
.map(|child| child.drain(num_rows))
.collect::<Result<Vec<_>>>()?;
Ok(Box::new(RepDefStructDecodeTask {
children: child_tasks,
child_fields: self.child_fields.clone(),
is_root: self.is_root,
num_rows,
}))
}
fn data_type(&self) -> &DataType {
&self.data_type
}
}
#[derive(Debug)]
struct RepDefStructDecodeTask {
children: Vec<Box<dyn StructuralDecodeArrayTask>>,
child_fields: Fields,
is_root: bool,
num_rows: u64,
}
impl StructuralDecodeArrayTask for RepDefStructDecodeTask {
fn decode(self: Box<Self>) -> Result<DecodedArray> {
if self.children.is_empty() {
return Ok(DecodedArray {
array: Arc::new(StructArray::new_empty_fields(self.num_rows as usize, None)),
repdef: CompositeRepDefUnraveler::new(vec![]),
});
}
let arrays = self
.children
.into_iter()
.map(|task| task.decode())
.collect::<Result<Vec<_>>>()?;
let mut children = Vec::with_capacity(arrays.len());
let mut arrays_iter = arrays.into_iter();
let first_array = arrays_iter.next().unwrap();
let length = first_array.array.len();
let mut repdef = first_array.repdef;
children.push(first_array.array);
for array in arrays_iter {
debug_assert_eq!(length, array.array.len());
children.push(array.array);
}
let validity = if self.is_root {
None
} else {
repdef.unravel_validity(length)
};
let array = StructArray::try_new(self.child_fields, children, validity)
.map_err(|e| Error::invalid_input_source(e.to_string().into()))?;
Ok(DecodedArray {
array: Arc::new(array),
repdef,
})
}
}
pub struct StructStructuralEncoder {
keep_original_array: bool,
children: Vec<Box<dyn FieldEncoder>>,
}
impl StructStructuralEncoder {
pub fn new(keep_original_array: bool, children: Vec<Box<dyn FieldEncoder>>) -> Self {
Self {
keep_original_array,
children,
}
}
}
impl FieldEncoder for StructStructuralEncoder {
fn maybe_encode(
&mut self,
array: ArrayRef,
external_buffers: &mut OutOfLineBuffers,
mut repdef: RepDefBuilder,
row_number: u64,
num_rows: u64,
) -> Result<Vec<EncodeTask>> {
let struct_array = array.as_struct();
let mut struct_array = struct_array.normalize_slicing()?;
if let Some(validity) = struct_array.nulls() {
if self.keep_original_array {
repdef.add_validity_bitmap(validity.clone())
} else {
repdef.add_validity_bitmap(deep_copy_nulls(Some(validity)).unwrap())
}
struct_array = struct_array.pushdown_nulls()?;
} else {
repdef.add_no_null(struct_array.len());
}
let child_tasks = self
.children
.iter_mut()
.zip(struct_array.columns().iter())
.map(|(encoder, arr)| {
encoder.maybe_encode(
arr.clone(),
external_buffers,
repdef.clone(),
row_number,
num_rows,
)
})
.collect::<Result<Vec<_>>>()?;
Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
}
fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
self.children
.iter_mut()
.map(|encoder| encoder.flush(external_buffers))
.flatten_ok()
.collect::<Result<Vec<_>>>()
}
fn num_columns(&self) -> u32 {
self.children
.iter()
.map(|child| child.num_columns())
.sum::<u32>()
}
fn finish(
&mut self,
external_buffers: &mut OutOfLineBuffers,
) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
let mut child_columns = self
.children
.iter_mut()
.map(|child| child.finish(external_buffers))
.collect::<FuturesOrdered<_>>();
async move {
let mut encoded_columns = Vec::with_capacity(child_columns.len());
while let Some(child_cols) = child_columns.next().await {
encoded_columns.extend(child_cols?);
}
Ok(encoded_columns)
}
.boxed()
}
}
pub struct StructFieldEncoder {
children: Vec<Box<dyn FieldEncoder>>,
column_index: u32,
num_rows_seen: u64,
}
impl StructFieldEncoder {
#[allow(dead_code)]
pub fn new(children: Vec<Box<dyn FieldEncoder>>, column_index: u32) -> Self {
Self {
children,
column_index,
num_rows_seen: 0,
}
}
}
impl FieldEncoder for StructFieldEncoder {
fn maybe_encode(
&mut self,
array: ArrayRef,
external_buffers: &mut OutOfLineBuffers,
repdef: RepDefBuilder,
row_number: u64,
num_rows: u64,
) -> Result<Vec<EncodeTask>> {
self.num_rows_seen += array.len() as u64;
let struct_array = array.as_struct();
let child_tasks = self
.children
.iter_mut()
.zip(struct_array.columns().iter())
.map(|(encoder, arr)| {
encoder.maybe_encode(
arr.clone(),
external_buffers,
repdef.clone(),
row_number,
num_rows,
)
})
.collect::<Result<Vec<_>>>()?;
Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
}
fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
let child_tasks = self
.children
.iter_mut()
.map(|encoder| encoder.flush(external_buffers))
.collect::<Result<Vec<_>>>()?;
Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
}
fn num_columns(&self) -> u32 {
self.children
.iter()
.map(|child| child.num_columns())
.sum::<u32>()
+ 1
}
fn finish(
&mut self,
external_buffers: &mut OutOfLineBuffers,
) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
let mut child_columns = self
.children
.iter_mut()
.map(|child| child.finish(external_buffers))
.collect::<FuturesOrdered<_>>();
let num_rows_seen = self.num_rows_seen;
let column_index = self.column_index;
async move {
let mut columns = Vec::new();
let mut header = EncodedColumn::default();
header.final_pages.push(EncodedPage {
data: Vec::new(),
description: PageEncoding::Legacy(pb::ArrayEncoding {
array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
pb::SimpleStruct {},
)),
}),
num_rows: num_rows_seen,
column_idx: column_index,
row_number: 0, });
columns.push(header);
while let Some(child_cols) = child_columns.next().await {
columns.extend(child_cols?);
}
Ok(columns)
}
.boxed()
}
}
#[cfg(test)]
mod tests {
use std::{collections::HashMap, sync::Arc};
use arrow_array::{
Array, ArrayRef, Int32Array, ListArray, StructArray,
builder::{Int32Builder, ListBuilder},
};
use arrow_buffer::{BooleanBuffer, NullBuffer, OffsetBuffer, ScalarBuffer};
use arrow_schema::{DataType, Field, Fields};
use crate::{
testing::{TestCases, check_basic_random, check_round_trip_encoding_of_data},
version::LanceFileVersion,
};
#[test_log::test(tokio::test)]
async fn test_simple_struct() {
let data_type = DataType::Struct(Fields::from(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
]));
let field = Field::new("", data_type, false);
check_basic_random(field).await;
}
#[test_log::test(tokio::test)]
async fn test_nullable_struct() {
let inner_fields = Fields::from(vec![
Field::new("x", DataType::Int32, false),
Field::new("y", DataType::Int32, true),
]);
let inner_struct = DataType::Struct(inner_fields.clone());
let outer_fields = Fields::from(vec![
Field::new("score", DataType::Int32, true),
Field::new("location", inner_struct, true),
]);
let x_vals = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), Some(5)]);
let y_vals = Int32Array::from(vec![Some(6), None, Some(8), Some(9), Some(10)]);
let scores = Int32Array::from(vec![None, Some(12), Some(13), Some(14), Some(15)]);
let location_validity = NullBuffer::from(vec![true, true, true, false, true]);
let locations = StructArray::new(
inner_fields,
vec![Arc::new(x_vals), Arc::new(y_vals)],
Some(location_validity),
);
let rows_validity = NullBuffer::from(vec![true, true, true, true, false]);
let rows = StructArray::new(
outer_fields,
vec![Arc::new(scores), Arc::new(locations)],
Some(rows_validity),
);
let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
check_round_trip_encoding_of_data(vec![Arc::new(rows)], &test_cases, HashMap::new()).await;
}
#[test_log::test(tokio::test)]
async fn test_simple_masked_nonempty_list() {
let items = Int32Array::from(vec![Some(1), Some(2), None, Some(4), Some(5), Some(6)]);
let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 2, 3, 4, 4, 4, 5]));
let list_validity = BooleanBuffer::from(vec![true, true, true, true, false, true]);
let list_array = ListArray::new(
Arc::new(Field::new("item", DataType::Int32, true)),
offsets,
Arc::new(items),
Some(NullBuffer::new(list_validity)),
);
let struct_validity = BooleanBuffer::from(vec![true, true, true, true, true, false]);
let struct_array = StructArray::new(
Fields::from(vec![Field::new(
"inner_list",
list_array.data_type().clone(),
true,
)]),
vec![Arc::new(list_array)],
Some(NullBuffer::new(struct_validity)),
);
check_round_trip_encoding_of_data(
vec![Arc::new(struct_array)],
&TestCases::default().with_min_file_version(LanceFileVersion::V2_1),
HashMap::new(),
)
.await;
}
#[test_log::test(tokio::test)]
async fn test_simple_struct_list() {
let items = Int32Array::from(vec![Some(1), Some(2), None, Some(4)]);
let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 2, 3, 4, 4, 4, 4]));
let list_validity = BooleanBuffer::from(vec![true, true, true, true, false, true]);
let list_array = ListArray::new(
Arc::new(Field::new("item", DataType::Int32, true)),
offsets,
Arc::new(items),
Some(NullBuffer::new(list_validity)),
);
let struct_validity = BooleanBuffer::from(vec![true, true, true, true, true, false]);
let struct_array = StructArray::new(
Fields::from(vec![Field::new(
"inner_list",
list_array.data_type().clone(),
true,
)]),
vec![Arc::new(list_array)],
Some(NullBuffer::new(struct_validity)),
);
check_round_trip_encoding_of_data(
vec![Arc::new(struct_array)],
&TestCases::default().with_min_file_version(LanceFileVersion::V2_1),
HashMap::new(),
)
.await;
}
#[test_log::test(tokio::test)]
async fn test_struct_list() {
let data_type = DataType::Struct(Fields::from(vec![
Field::new(
"inner_list",
DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
true,
),
Field::new("outer_int", DataType::Int32, true),
]));
let field = Field::new("row", data_type, false);
check_basic_random(field).await;
}
#[test_log::test(tokio::test)]
async fn test_empty_struct() {
let data_type = DataType::Struct(Fields::from(Vec::<Field>::default()));
let field = Field::new("row", data_type, false);
check_basic_random(field).await;
}
#[test_log::test(tokio::test)]
async fn test_complicated_struct() {
let data_type = DataType::Struct(Fields::from(vec![
Field::new("int", DataType::Int32, true),
Field::new(
"inner",
DataType::Struct(Fields::from(vec![
Field::new("inner_int", DataType::Int32, true),
Field::new(
"inner_list",
DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
true,
),
])),
true,
),
Field::new("outer_binary", DataType::Binary, true),
]));
let field = Field::new("row", data_type, false);
check_basic_random(field).await;
}
#[test_log::test(tokio::test)]
async fn test_list_of_struct_with_null_struct_element() {
use arrow_array::StringArray;
let tag_array = StringArray::from(vec![
Some("valid"),
Some("null_struct"),
Some("valid"),
Some("valid"),
]);
let struct_fields = Fields::from(vec![Field::new("tag", DataType::Utf8, true)]);
let struct_validity = NullBuffer::from(vec![false, true, false, false]);
let struct_array = StructArray::new(
struct_fields.clone(),
vec![Arc::new(tag_array)],
Some(struct_validity),
);
let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 4]));
let list_field = Field::new("item", DataType::Struct(struct_fields), true);
let list_array =
ListArray::new(Arc::new(list_field), offsets, Arc::new(struct_array), None);
check_round_trip_encoding_of_data(
vec![Arc::new(list_array)],
&TestCases::default().with_min_file_version(LanceFileVersion::V2_2),
HashMap::new(),
)
.await;
}
#[test_log::test(tokio::test)]
async fn test_ragged_scheduling() {
let items_builder = Int32Builder::new();
let mut list_builder = ListBuilder::new(items_builder);
for _ in 0..10000 {
list_builder.append_null();
}
let list_array = Arc::new(list_builder.finish());
let int_array = Arc::new(Int32Array::from_iter_values(0..10000));
let fields = vec![
Field::new("", list_array.data_type().clone(), true),
Field::new("", int_array.data_type().clone(), true),
];
let struct_array = Arc::new(StructArray::new(
Fields::from(fields),
vec![list_array, int_array],
None,
)) as ArrayRef;
let struct_arrays = (0..10000)
.step_by(437)
.map(|offset| struct_array.slice(offset, 437.min(10000 - offset)))
.collect::<Vec<_>>();
check_round_trip_encoding_of_data(struct_arrays, &TestCases::default(), HashMap::new())
.await;
}
}