use std::sync::Arc;
use arrow_array::{
new_null_array,
types::{
ArrowPrimitiveType, Date32Type, Date64Type, Decimal128Type, Decimal256Type,
DurationMicrosecondType, DurationMillisecondType, DurationNanosecondType,
DurationSecondType, Float16Type, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type,
Int8Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType,
Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType,
TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
},
ArrayRef, BooleanArray, FixedSizeListArray, PrimitiveArray,
};
use arrow_buffer::{BooleanBuffer, Buffer, NullBuffer, ScalarBuffer};
use arrow_schema::{DataType, IntervalUnit, TimeUnit};
use bytes::BytesMut;
use futures::{future::BoxFuture, FutureExt};
use log::trace;
use snafu::{location, Location};
use lance_core::{Error, Result};
use tokio::sync::mpsc;
use crate::{
decoder::{
DecodeArrayTask, LogicalPageDecoder, LogicalPageScheduler, NextDecodeTask, PageInfo,
PhysicalPageDecoder, PhysicalPageScheduler,
},
encoder::{ArrayEncoder, EncodedPage, FieldEncoder},
encodings::physical::{
basic::BasicEncoder, decoder_from_array_encoding, fixed_size_list::FslEncoder,
value::ValueEncoder, ColumnBuffers, PageBuffers,
},
EncodingsIo,
};
#[derive(Debug)]
pub struct PrimitivePageScheduler {
data_type: DataType,
physical_decoder: Box<dyn PhysicalPageScheduler>,
num_rows: u32,
}
impl PrimitivePageScheduler {
pub fn new(data_type: DataType, page: Arc<PageInfo>, buffers: ColumnBuffers) -> Self {
let page_buffers = PageBuffers {
column_buffers: buffers,
positions: &page.buffer_offsets,
};
Self {
data_type,
physical_decoder: decoder_from_array_encoding(&page.encoding, &page_buffers),
num_rows: page.num_rows,
}
}
}
impl LogicalPageScheduler for PrimitivePageScheduler {
fn num_rows(&self) -> u32 {
self.num_rows
}
fn schedule_ranges(
&self,
ranges: &[std::ops::Range<u32>],
scheduler: &Arc<dyn EncodingsIo>,
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
) -> Result<()> {
let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
trace!("Scheduling ranges {:?} from physical page", ranges);
let physical_decoder = self
.physical_decoder
.schedule_ranges(ranges, scheduler.as_ref());
let logical_decoder = PrimitiveFieldDecoder {
data_type: self.data_type.clone(),
unloaded_physical_decoder: Some(physical_decoder),
physical_decoder: None,
rows_drained: 0,
num_rows,
};
sink.send(Box::new(logical_decoder)).unwrap();
Ok(())
}
fn schedule_take(
&self,
indices: &[u32],
scheduler: &Arc<dyn EncodingsIo>,
sink: &mpsc::UnboundedSender<Box<dyn LogicalPageDecoder>>,
) -> Result<()> {
trace!(
"Scheduling take of {} indices from physical page",
indices.len()
);
self.schedule_ranges(
&indices
.iter()
.map(|&idx| idx..(idx + 1))
.collect::<Vec<_>>(),
scheduler,
sink,
)
}
}
struct PrimitiveFieldDecoder {
data_type: DataType,
unloaded_physical_decoder: Option<BoxFuture<'static, Result<Box<dyn PhysicalPageDecoder>>>>,
physical_decoder: Option<Arc<dyn PhysicalPageDecoder>>,
num_rows: u32,
rows_drained: u32,
}
struct PrimitiveFieldDecodeTask {
rows_to_skip: u32,
rows_to_take: u32,
physical_decoder: Arc<dyn PhysicalPageDecoder>,
data_type: DataType,
}
impl DecodeArrayTask for PrimitiveFieldDecodeTask {
fn decode(self: Box<Self>) -> Result<ArrayRef> {
let mut capacities = vec![(0, false); self.physical_decoder.num_buffers() as usize];
let mut all_null = false;
self.physical_decoder.update_capacity(
self.rows_to_skip,
self.rows_to_take,
&mut capacities,
&mut all_null,
);
if all_null {
return Ok(new_null_array(&self.data_type, self.rows_to_take as usize));
}
let mut bufs = capacities
.into_iter()
.map(|(num_bytes, is_needed)| {
if is_needed {
BytesMut::with_capacity(num_bytes as usize)
} else {
BytesMut::default()
}
})
.collect::<Vec<_>>();
self.physical_decoder
.decode_into(self.rows_to_skip, self.rows_to_take, &mut bufs);
Self::primitive_array_from_buffers(&self.data_type, bufs, self.rows_to_take)
}
}
impl PrimitiveFieldDecodeTask {
fn new_primitive_array<T: ArrowPrimitiveType>(
buffers: Vec<BytesMut>,
num_rows: u32,
data_type: &DataType,
) -> ArrayRef {
let mut buffer_iter = buffers.into_iter();
let null_buffer = buffer_iter.next().unwrap();
let null_buffer = if null_buffer.is_empty() {
None
} else {
let null_buffer = null_buffer.freeze().into();
Some(NullBuffer::new(BooleanBuffer::new(
Buffer::from_bytes(null_buffer),
0,
num_rows as usize,
)))
};
let data_buffer = buffer_iter.next().unwrap().freeze();
let data_buffer = Buffer::from(data_buffer);
let data_buffer = ScalarBuffer::<T::Native>::new(data_buffer, 0, num_rows as usize);
Arc::new(
PrimitiveArray::<T>::new(data_buffer, null_buffer).with_data_type(data_type.clone()),
)
}
fn bytes_to_validity(bytes: BytesMut, num_rows: u32) -> Option<NullBuffer> {
if bytes.is_empty() {
None
} else {
let null_buffer = bytes.freeze().into();
Some(NullBuffer::new(BooleanBuffer::new(
Buffer::from_bytes(null_buffer),
0,
num_rows as usize,
)))
}
}
fn primitive_array_from_buffers(
data_type: &DataType,
buffers: Vec<BytesMut>,
num_rows: u32,
) -> Result<ArrayRef> {
match data_type {
DataType::Boolean => {
let mut buffer_iter = buffers.into_iter();
let null_buffer = buffer_iter.next().unwrap();
let null_buffer = Self::bytes_to_validity(null_buffer, num_rows);
let data_buffer = buffer_iter.next().unwrap().freeze();
let data_buffer = Buffer::from(data_buffer);
let data_buffer = BooleanBuffer::new(data_buffer, 0, num_rows as usize);
Ok(Arc::new(BooleanArray::new(data_buffer, null_buffer)))
}
DataType::Date32 => Ok(Self::new_primitive_array::<Date32Type>(
buffers, num_rows, data_type,
)),
DataType::Date64 => Ok(Self::new_primitive_array::<Date64Type>(
buffers, num_rows, data_type,
)),
DataType::Decimal128(_, _) => Ok(Self::new_primitive_array::<Decimal128Type>(
buffers, num_rows, data_type,
)),
DataType::Decimal256(_, _) => Ok(Self::new_primitive_array::<Decimal256Type>(
buffers, num_rows, data_type,
)),
DataType::Duration(units) => Ok(match units {
TimeUnit::Second => {
Self::new_primitive_array::<DurationSecondType>(buffers, num_rows, data_type)
}
TimeUnit::Microsecond => Self::new_primitive_array::<DurationMicrosecondType>(
buffers, num_rows, data_type,
),
TimeUnit::Millisecond => Self::new_primitive_array::<DurationMillisecondType>(
buffers, num_rows, data_type,
),
TimeUnit::Nanosecond => Self::new_primitive_array::<DurationNanosecondType>(
buffers, num_rows, data_type,
),
}),
DataType::Float16 => Ok(Self::new_primitive_array::<Float16Type>(
buffers, num_rows, data_type,
)),
DataType::Float32 => Ok(Self::new_primitive_array::<Float32Type>(
buffers, num_rows, data_type,
)),
DataType::Float64 => Ok(Self::new_primitive_array::<Float64Type>(
buffers, num_rows, data_type,
)),
DataType::Int16 => Ok(Self::new_primitive_array::<Int16Type>(
buffers, num_rows, data_type,
)),
DataType::Int32 => Ok(Self::new_primitive_array::<Int32Type>(
buffers, num_rows, data_type,
)),
DataType::Int64 => Ok(Self::new_primitive_array::<Int64Type>(
buffers, num_rows, data_type,
)),
DataType::Int8 => Ok(Self::new_primitive_array::<Int8Type>(
buffers, num_rows, data_type,
)),
DataType::Interval(unit) => Ok(match unit {
IntervalUnit::DayTime => {
Self::new_primitive_array::<IntervalDayTimeType>(buffers, num_rows, data_type)
}
IntervalUnit::MonthDayNano => {
Self::new_primitive_array::<IntervalMonthDayNanoType>(
buffers, num_rows, data_type,
)
}
IntervalUnit::YearMonth => {
Self::new_primitive_array::<IntervalYearMonthType>(buffers, num_rows, data_type)
}
}),
DataType::Null => Ok(new_null_array(data_type, num_rows as usize)),
DataType::Time32(unit) => match unit {
TimeUnit::Millisecond => Ok(Self::new_primitive_array::<Time32MillisecondType>(
buffers, num_rows, data_type,
)),
TimeUnit::Second => Ok(Self::new_primitive_array::<Time32SecondType>(
buffers, num_rows, data_type,
)),
_ => Err(Error::IO {
message: format!("invalid time unit {:?} for 32-bit time type", unit),
location: location!(),
}),
},
DataType::Time64(unit) => match unit {
TimeUnit::Microsecond => Ok(Self::new_primitive_array::<Time64MicrosecondType>(
buffers, num_rows, data_type,
)),
TimeUnit::Nanosecond => Ok(Self::new_primitive_array::<Time64NanosecondType>(
buffers, num_rows, data_type,
)),
_ => Err(Error::IO {
message: format!("invalid time unit {:?} for 64-bit time type", unit),
location: location!(),
}),
},
DataType::Timestamp(unit, _) => Ok(match unit {
TimeUnit::Microsecond => Self::new_primitive_array::<TimestampMicrosecondType>(
buffers, num_rows, data_type,
),
TimeUnit::Millisecond => Self::new_primitive_array::<TimestampMillisecondType>(
buffers, num_rows, data_type,
),
TimeUnit::Nanosecond => Self::new_primitive_array::<TimestampNanosecondType>(
buffers, num_rows, data_type,
),
TimeUnit::Second => {
Self::new_primitive_array::<TimestampSecondType>(buffers, num_rows, data_type)
}
}),
DataType::UInt16 => Ok(Self::new_primitive_array::<UInt16Type>(
buffers, num_rows, data_type,
)),
DataType::UInt32 => Ok(Self::new_primitive_array::<UInt32Type>(
buffers, num_rows, data_type,
)),
DataType::UInt64 => Ok(Self::new_primitive_array::<UInt64Type>(
buffers, num_rows, data_type,
)),
DataType::UInt8 => Ok(Self::new_primitive_array::<UInt8Type>(
buffers, num_rows, data_type,
)),
DataType::FixedSizeList(items, dimension) => {
let mut buffers_iter = buffers.into_iter();
let fsl_validity = buffers_iter.next().unwrap();
let fsl_nulls = Self::bytes_to_validity(fsl_validity, num_rows);
let remaining_buffers = buffers_iter.collect::<Vec<_>>();
let items_array = Self::primitive_array_from_buffers(
items.data_type(),
remaining_buffers,
num_rows * (*dimension as u32),
)?;
Ok(Arc::new(FixedSizeListArray::new(
items.clone(),
*dimension,
items_array,
fsl_nulls,
)))
}
_ => Err(Error::IO {
message: format!(
"The data type {} cannot be decoded from a primitive encoding",
data_type
),
location: location!(),
}),
}
}
}
impl LogicalPageDecoder for PrimitiveFieldDecoder {
fn wait<'a>(
&'a mut self,
_: u32,
_: &'a mut mpsc::UnboundedReceiver<Box<dyn LogicalPageDecoder>>,
) -> BoxFuture<'a, Result<()>> {
async move {
let physical_decoder = self.unloaded_physical_decoder.take().unwrap().await?;
self.physical_decoder = Some(Arc::from(physical_decoder));
Ok(())
}
.boxed()
}
fn drain(&mut self, num_rows: u32) -> Result<NextDecodeTask> {
let rows_to_skip = self.rows_drained;
let rows_to_take = num_rows;
self.rows_drained += rows_to_take;
let task = Box::new(PrimitiveFieldDecodeTask {
rows_to_skip,
rows_to_take,
physical_decoder: self.physical_decoder.as_ref().unwrap().clone(),
data_type: self.data_type.clone(),
});
Ok(NextDecodeTask {
task,
num_rows: rows_to_take,
has_more: self.rows_drained != self.num_rows,
})
}
fn unawaited(&self) -> u32 {
if self.unloaded_physical_decoder.is_some() {
self.num_rows
} else {
0
}
}
fn avail(&self) -> u32 {
if self.unloaded_physical_decoder.is_some() {
0
} else {
self.num_rows - self.rows_drained
}
}
}
pub struct PrimitiveFieldEncoder {
cache_bytes: u64,
buffered_arrays: Vec<ArrayRef>,
current_bytes: u64,
encoder: Arc<dyn ArrayEncoder>,
column_index: u32,
}
impl PrimitiveFieldEncoder {
fn array_encoder_from_data_type(data_type: &DataType) -> Result<Box<dyn ArrayEncoder>> {
match data_type {
DataType::FixedSizeList(inner, dimension) => {
Ok(Box::new(BasicEncoder::new(Box::new(FslEncoder::new(
Self::array_encoder_from_data_type(inner.data_type())?,
*dimension as u32,
)))))
}
_ => Ok(Box::new(BasicEncoder::new(Box::new(
ValueEncoder::try_new(data_type)?,
)))),
}
}
pub fn try_new(cache_bytes: u64, data_type: &DataType, column_index: u32) -> Result<Self> {
Ok(Self {
cache_bytes,
column_index,
buffered_arrays: Vec::with_capacity(8),
current_bytes: 0,
encoder: Arc::from(Self::array_encoder_from_data_type(data_type)?),
})
}
fn do_flush(&mut self) -> BoxFuture<'static, Result<EncodedPage>> {
let mut arrays = Vec::new();
std::mem::swap(&mut arrays, &mut self.buffered_arrays);
self.current_bytes = 0;
let encoder = self.encoder.clone();
let column_idx = self.column_index;
tokio::task::spawn(async move {
let num_rows = arrays.iter().map(|arr| arr.len() as u32).sum();
let mut buffer_index = 0;
let array = encoder.encode(&arrays, &mut buffer_index)?;
Ok(EncodedPage {
array,
num_rows,
column_idx,
})
})
.map(|res_res| res_res.unwrap())
.boxed()
}
}
impl FieldEncoder for PrimitiveFieldEncoder {
fn maybe_encode(
&mut self,
array: ArrayRef,
) -> Result<Vec<BoxFuture<'static, Result<EncodedPage>>>> {
self.current_bytes += array.get_array_memory_size() as u64;
self.buffered_arrays.push(array);
if self.current_bytes > self.cache_bytes {
Ok(vec![self.do_flush()])
} else {
Ok(vec![])
}
}
fn flush(&mut self) -> Result<Vec<BoxFuture<'static, Result<EncodedPage>>>> {
if self.current_bytes > 0 {
Ok(vec![self.do_flush()])
} else {
Ok(vec![])
}
}
fn num_columns(&self) -> u32 {
1
}
}