use std::{collections::VecDeque, sync::Arc};
use arrow_array::{
cast::AsArray,
types::{Int32Type, Int64Type},
ArrayRef, Int32Array, Int64Array, LargeListArray, ListArray, UInt32Array,
};
use arrow_buffer::OffsetBuffer;
use arrow_schema::{DataType, Field};
use futures::{future::BoxFuture, FutureExt};
use log::trace;
use tokio::{sync::mpsc, task::JoinHandle};
use lance_core::Result;
use crate::{
decoder::{DecodeArrayTask, LogicalPageDecoder, LogicalPageScheduler, NextDecodeTask},
encoder::{EncodedArray, EncodedPage, FieldEncoder},
format::pb,
EncodingsIo,
};
use super::primitive::PrimitiveFieldEncoder;
#[derive(Debug)]
pub struct ListPageScheduler {
offsets_scheduler: Box<dyn LogicalPageScheduler>,
items_schedulers: Arc<Vec<Box<dyn LogicalPageScheduler>>>,
offset_type: DataType,
}
impl ListPageScheduler {
pub fn new(
offsets_scheduler: Box<dyn LogicalPageScheduler>,
items_schedulers: Vec<Box<dyn LogicalPageScheduler>>,
offset_type: DataType,
) -> Self {
match &offset_type {
DataType::Int32 | DataType::Int64 => {}
_ => panic!(),
}
Self {
offsets_scheduler,
items_schedulers: Arc::new(items_schedulers),
offset_type,
}
}
}
impl LogicalPageScheduler for ListPageScheduler {
fn schedule_ranges(
&self,
ranges: &[std::ops::Range<u32>],
scheduler: &Arc<dyn EncodingsIo>,
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
) -> Result<()> {
let num_rows = ranges.iter().map(|range| range.end - range.start).sum();
let offsets_ranges = ranges
.iter()
.map(|range| range.start..(range.end + 1))
.collect::<Vec<_>>();
let num_offsets = offsets_ranges
.iter()
.map(|range| range.end - range.start)
.sum();
trace!("Scheduling list offsets ranges: {:?}", offsets_ranges);
let (tx, mut rx) = mpsc::unbounded_channel();
self.offsets_scheduler
.schedule_ranges(&offsets_ranges, scheduler, &tx)?;
let mut scheduled_offsets = rx.recv().now_or_never().unwrap().unwrap();
let items_schedulers = self.items_schedulers.clone();
let ranges = ranges.to_vec();
let scheduler = scheduler.clone();
let indirect_fut = tokio::task::spawn(async move {
let (_, mut dummy_rx) = mpsc::unbounded_channel();
scheduled_offsets.wait(num_rows, &mut dummy_rx).await?;
let decode_task = scheduled_offsets.drain(num_offsets)?;
let offsets = decode_task.task.decode()?;
let numeric_offsets = offsets.as_primitive::<Int32Type>();
let mut normalized_offsets =
Vec::with_capacity(numeric_offsets.len() - ranges.len() + 1);
normalized_offsets.push(0);
let mut last_normalized_offset = 0;
let offsets_values = numeric_offsets.values();
let mut item_ranges = VecDeque::new();
let mut offsets_offset: u32 = 0;
for range in ranges {
let num_lists = range.end - range.start;
let items_start = offsets_values[offsets_offset as usize] as u32;
let items_end = offsets_values[(offsets_offset + num_lists) as usize] as u32;
normalized_offsets.extend(
offsets_values
.slice(offsets_offset as usize, (num_lists + 1) as usize)
.windows(2)
.map(|w| {
let length = w[1] - w[0];
last_normalized_offset += length as u32;
last_normalized_offset
}),
);
trace!(
"List offsets range of {:?} maps to item range {:?}..{:?}",
range,
items_start,
items_end
);
offsets_offset += num_lists + 1;
item_ranges.push_back(items_start..items_end);
}
let (tx, mut rx) = mpsc::unbounded_channel();
trace!(
"Indirectly scheduling items ranges {:?} from {} list items pages",
item_ranges,
items_schedulers.len()
);
let mut item_schedulers = VecDeque::from_iter(items_schedulers.iter());
let mut row_offset = 0;
let mut next_scheduler = item_schedulers.pop_front().unwrap();
let mut next_range = item_ranges.pop_front().unwrap();
let mut next_item_ranges = Vec::new();
loop {
let current_scheduler_end = row_offset + next_scheduler.num_rows();
if next_range.start > current_scheduler_end {
row_offset += next_scheduler.num_rows();
if !next_item_ranges.is_empty() {
next_scheduler.schedule_ranges(&next_item_ranges, &scheduler, &tx)?;
}
next_scheduler = item_schedulers.pop_front().unwrap();
} else if next_range.end <= current_scheduler_end {
let page_range = (next_range.start - row_offset)..(next_range.end - row_offset);
next_item_ranges.push(page_range);
if let Some(item_range) = item_ranges.pop_front() {
next_range = item_range;
} else {
break;
}
} else {
let page_range = (next_range.start - row_offset)..next_scheduler.num_rows();
next_range = current_scheduler_end..next_range.end;
next_item_ranges.push(page_range);
row_offset += next_scheduler.num_rows();
if !next_item_ranges.is_empty() {
next_scheduler.schedule_ranges(&next_item_ranges, &scheduler, &tx)?;
}
next_scheduler = item_schedulers.pop_front().unwrap();
}
}
if !next_item_ranges.is_empty() {
next_scheduler.schedule_ranges(&next_item_ranges, &scheduler, &tx)?;
}
let mut item_decoders = Vec::new();
drop(tx);
while let Some(mut item_decoder) = rx.recv().await {
item_decoder.wait(item_decoder.unawaited(), &mut rx).await?;
item_decoders.push(item_decoder);
}
Ok(IndirectlyLoaded {
offsets: normalized_offsets,
item_decoders,
})
});
sink.send(Box::new(ListPageDecoder {
offsets: Vec::new(),
item_decoders: VecDeque::new(),
num_rows,
rows_drained: 0,
unloaded: Some(indirect_fut),
offset_type: self.offset_type.clone(),
}))
.unwrap();
Ok(())
}
fn num_rows(&self) -> u32 {
self.offsets_scheduler.num_rows() - 1
}
fn schedule_take(
&self,
indices: &[u32],
scheduler: &Arc<dyn EncodingsIo>,
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
) -> Result<()> {
trace!("Scheduling list offsets for {} indices", indices.len());
self.schedule_ranges(
&indices
.iter()
.map(|&idx| idx..(idx + 1))
.collect::<Vec<_>>(),
scheduler,
sink,
)
}
}
struct ListPageDecoder {
unloaded: Option<JoinHandle<Result<IndirectlyLoaded>>>,
offsets: Vec<u32>,
item_decoders: VecDeque<Box<dyn LogicalPageDecoder>>,
num_rows: u32,
rows_drained: u32,
offset_type: DataType,
}
struct ListDecodeTask {
offsets: Vec<u32>,
items: Vec<Box<dyn DecodeArrayTask>>,
offset_type: DataType,
}
impl DecodeArrayTask for ListDecodeTask {
fn decode(self: Box<Self>) -> Result<ArrayRef> {
let items = self
.items
.into_iter()
.map(|task| task.decode())
.collect::<Result<Vec<_>>>()?;
let item_refs = items.iter().map(|item| item.as_ref()).collect::<Vec<_>>();
let items = arrow_select::concat::concat(&item_refs)?;
let item_field = Arc::new(Field::new("item", items.data_type().clone(), true));
let offsets = UInt32Array::from(self.offsets);
match &self.offset_type {
DataType::Int32 => {
let offsets = arrow_cast::cast(&offsets, &DataType::Int32)?;
let offsets_i32 = offsets.as_primitive::<Int32Type>();
let min_offset = Int32Array::new_scalar(offsets_i32.value(0));
let offsets = arrow_arith::numeric::sub(&offsets_i32, &min_offset)?;
let offsets_i32 = offsets.as_primitive::<Int32Type>();
let offsets = OffsetBuffer::new(offsets_i32.values().clone());
Ok(Arc::new(ListArray::try_new(
item_field, offsets, items, None,
)?))
}
DataType::Int64 => {
let offsets = arrow_cast::cast(&offsets, &DataType::Int64)?;
let offsets_i64 = offsets.as_primitive::<Int64Type>();
let min_offset = Int64Array::new_scalar(offsets_i64.value(0));
let offsets = arrow_arith::numeric::sub(&offsets_i64, &min_offset)?;
let offsets_i64 = offsets.as_primitive::<Int64Type>();
let offsets = OffsetBuffer::new(offsets_i64.values().clone());
Ok(Arc::new(LargeListArray::try_new(
item_field, offsets, items, None,
)?))
}
_ => panic!("ListDecodeTask with data type that is not i32 or i64"),
}
}
}
impl LogicalPageDecoder for ListPageDecoder {
fn wait<'a>(
&'a mut self,
_num_rows: u32,
_source: &'a mut mpsc::UnboundedReceiver<Box<dyn LogicalPageDecoder>>,
) -> BoxFuture<'a, Result<()>> {
async move {
if self.unloaded.is_some() {
let indirectly_loaded = self.unloaded.take().unwrap().await.unwrap()?;
self.offsets = indirectly_loaded.offsets;
self.item_decoders.extend(indirectly_loaded.item_decoders);
}
Ok(())
}
.boxed()
}
fn unawaited(&self) -> u32 {
match self.unloaded {
None => 0,
Some(_) => self.num_rows,
}
}
fn drain(&mut self, num_rows: u32) -> Result<NextDecodeTask> {
let offsets = self.offsets
[self.rows_drained as usize..(self.rows_drained + num_rows + 1) as usize]
.to_vec();
let start = offsets[0];
let end = offsets[offsets.len() - 1];
let mut num_items_to_drain = end - start;
let mut item_decodes = Vec::new();
while num_items_to_drain > 0 {
let next_item_page = self.item_decoders.front_mut().unwrap();
let avail = next_item_page.avail();
let to_take = num_items_to_drain.min(avail);
num_items_to_drain -= to_take;
let next_task = next_item_page.drain(to_take)?;
if !next_task.has_more {
self.item_decoders.pop_front();
}
item_decodes.push(next_task.task);
}
self.rows_drained += num_rows;
Ok(NextDecodeTask {
has_more: self.avail() > 0,
num_rows,
task: Box::new(ListDecodeTask {
offsets,
items: item_decodes,
offset_type: self.offset_type.clone(),
}) as Box<dyn DecodeArrayTask>,
})
}
fn avail(&self) -> u32 {
match self.unloaded {
Some(_) => 0,
None => self.num_rows - self.rows_drained,
}
}
}
struct IndirectlyLoaded {
offsets: Vec<u32>,
item_decoders: Vec<Box<dyn LogicalPageDecoder>>,
}
pub struct ListFieldEncoder {
indices_encoder: PrimitiveFieldEncoder,
items_encoder: Box<dyn FieldEncoder>,
}
impl ListFieldEncoder {
pub fn new(
items_encoder: Box<dyn FieldEncoder>,
cache_bytes_per_columns: u64,
column_index: u32,
) -> Self {
Self {
indices_encoder: PrimitiveFieldEncoder::try_new(
cache_bytes_per_columns,
&DataType::Int32,
column_index,
)
.unwrap(),
items_encoder,
}
}
fn combine_index_tasks(
index_tasks: Result<Vec<BoxFuture<'static, Result<EncodedPage>>>>,
item_tasks: Result<Vec<BoxFuture<'static, Result<EncodedPage>>>>,
) -> Result<Vec<BoxFuture<'static, Result<EncodedPage>>>> {
let mut index_tasks = index_tasks?;
let item_tasks = item_tasks?;
index_tasks.extend(item_tasks);
Ok(index_tasks)
}
fn wrap_index_encode_tasks(
tasks: Result<Vec<BoxFuture<'static, Result<EncodedPage>>>>,
) -> Result<Vec<BoxFuture<'static, Result<EncodedPage>>>> {
tasks.map(|tasks| {
tasks
.into_iter()
.map(|page_task| {
async move {
let page = page_task.await?;
let array = EncodedArray {
buffers: page.array.buffers,
encoding: pb::ArrayEncoding {
array_encoding: Some(pb::array_encoding::ArrayEncoding::List(
Box::new(pb::List {
offsets: Some(Box::new(page.array.encoding)),
}),
)),
},
};
Ok(EncodedPage { array, ..page })
}
.boxed()
})
.collect::<Vec<_>>()
})
}
}
impl FieldEncoder for ListFieldEncoder {
fn maybe_encode(
&mut self,
array: ArrayRef,
) -> Result<Vec<BoxFuture<'static, Result<EncodedPage>>>> {
let items = match array.data_type() {
DataType::List(_) => array.as_list::<i32>().values().clone(),
DataType::LargeList(_) => array.as_list::<i64>().values().clone(),
_ => panic!(),
};
let offsets = match array.data_type() {
DataType::List(_) => {
let offsets = array.as_list::<i32>().offsets().clone();
Arc::new(Int32Array::new(offsets.into_inner(), None)) as ArrayRef
}
DataType::LargeList(_) => {
let offsets = array.as_list::<i64>().offsets().clone();
Arc::new(Int64Array::new(offsets.into_inner(), None)) as ArrayRef
}
_ => panic!(),
};
let index_tasks = self.indices_encoder.maybe_encode(offsets);
let index_tasks = Self::wrap_index_encode_tasks(index_tasks);
let item_tasks = self.items_encoder.maybe_encode(items);
Self::combine_index_tasks(index_tasks, item_tasks)
}
fn flush(&mut self) -> Result<Vec<BoxFuture<'static, Result<EncodedPage>>>> {
let index_tasks = self.indices_encoder.flush();
let index_tasks = Self::wrap_index_encode_tasks(index_tasks);
let item_tasks = self.items_encoder.flush();
Self::combine_index_tasks(index_tasks, item_tasks)
}
fn num_columns(&self) -> u32 {
2
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow_schema::{DataType, Field};
use crate::testing::check_round_trip_encoding;
#[test_log::test(tokio::test)]
async fn test_simple_list() {
let data_type = DataType::List(Arc::new(Field::new("item", DataType::Int32, true)));
let field = Field::new("", data_type, false);
check_round_trip_encoding(field).await;
}
}