use crate::SendableRecordBatchStream;
use crate::sorts::cursor::{ArrayValues, CursorArray, RowValues};
use crate::{PhysicalExpr, PhysicalSortExpr};
use arrow::array::Array;
use arrow::datatypes::Schema;
use arrow::record_batch::RecordBatch;
use arrow::row::{RowConverter, Rows, SortField};
use datafusion_common::{Result, internal_datafusion_err};
use datafusion_execution::memory_pool::MemoryReservation;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
use futures::stream::{Fuse, StreamExt};
use std::marker::PhantomData;
use std::sync::Arc;
use std::task::{Context, Poll, ready};
pub trait PartitionedStream: std::fmt::Debug + Send {
type Output;
fn partitions(&self) -> usize;
fn poll_next(
&mut self,
cx: &mut Context<'_>,
stream_idx: usize,
) -> Poll<Option<Self::Output>>;
}
struct FusedStreams(Vec<Fuse<SendableRecordBatchStream>>);
impl std::fmt::Debug for FusedStreams {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FusedStreams")
.field("num_streams", &self.0.len())
.finish()
}
}
impl FusedStreams {
fn poll_next(
&mut self,
cx: &mut Context<'_>,
stream_idx: usize,
) -> Poll<Option<Result<RecordBatch>>> {
loop {
match ready!(self.0[stream_idx].poll_next_unpin(cx)) {
Some(Ok(b)) if b.num_rows() == 0 => continue,
r => return Poll::Ready(r),
}
}
}
}
#[derive(Debug)]
struct ReusableRows {
inner: Vec<[Option<Arc<Rows>>; 2]>,
}
impl ReusableRows {
fn take_next(&mut self, stream_idx: usize) -> Result<Rows> {
Arc::try_unwrap(self.inner[stream_idx][1].take().unwrap()).map_err(|_| {
internal_datafusion_err!(
"Rows from RowCursorStream is still in use by consumer"
)
})
}
fn save(&mut self, stream_idx: usize, rows: &Arc<Rows>) {
self.inner[stream_idx][1] = Some(Arc::clone(rows));
let [a, b] = &mut self.inner[stream_idx];
std::mem::swap(a, b);
}
}
#[derive(Debug)]
pub struct RowCursorStream {
converter: RowConverter,
column_expressions: Vec<Arc<dyn PhysicalExpr>>,
streams: FusedStreams,
reservation: MemoryReservation,
rows: ReusableRows,
}
impl RowCursorStream {
pub fn try_new(
schema: &Schema,
expressions: &LexOrdering,
streams: Vec<SendableRecordBatchStream>,
reservation: MemoryReservation,
) -> Result<Self> {
let sort_fields = expressions
.iter()
.map(|expr| {
let data_type = expr.expr.data_type(schema)?;
Ok(SortField::new_with_options(data_type, expr.options))
})
.collect::<Result<Vec<_>>>()?;
let streams: Vec<_> = streams.into_iter().map(|s| s.fuse()).collect();
let converter = RowConverter::new(sort_fields)?;
let mut rows = Vec::with_capacity(streams.len());
for _ in &streams {
rows.push([
Some(Arc::new(converter.empty_rows(0, 0))),
Some(Arc::new(converter.empty_rows(0, 0))),
]);
}
Ok(Self {
converter,
reservation,
column_expressions: expressions.iter().map(|x| Arc::clone(&x.expr)).collect(),
streams: FusedStreams(streams),
rows: ReusableRows { inner: rows },
})
}
fn convert_batch(
&mut self,
batch: &RecordBatch,
stream_idx: usize,
) -> Result<RowValues> {
let cols = evaluate_expressions_to_arrays(&self.column_expressions, batch)?;
let mut rows = self.rows.take_next(stream_idx)?;
rows.clear();
self.converter.append(&mut rows, &cols)?;
self.reservation.try_resize(self.converter.size())?;
let rows = Arc::new(rows);
self.rows.save(stream_idx, &rows);
let mut rows_reservation = self.reservation.new_empty();
rows_reservation.try_grow(rows.size())?;
Ok(RowValues::new(rows, rows_reservation))
}
}
impl PartitionedStream for RowCursorStream {
type Output = Result<(RowValues, RecordBatch)>;
fn partitions(&self) -> usize {
self.streams.0.len()
}
fn poll_next(
&mut self,
cx: &mut Context<'_>,
stream_idx: usize,
) -> Poll<Option<Self::Output>> {
Poll::Ready(ready!(self.streams.poll_next(cx, stream_idx)).map(|r| {
r.and_then(|batch| {
let cursor = self.convert_batch(&batch, stream_idx)?;
Ok((cursor, batch))
})
}))
}
}
pub struct FieldCursorStream<T: CursorArray> {
sort: PhysicalSortExpr,
streams: FusedStreams,
reservation: MemoryReservation,
phantom: PhantomData<fn(T) -> T>,
}
impl<T: CursorArray> std::fmt::Debug for FieldCursorStream<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PrimitiveCursorStream")
.field("num_streams", &self.streams)
.finish()
}
}
impl<T: CursorArray> FieldCursorStream<T> {
pub fn new(
sort: PhysicalSortExpr,
streams: Vec<SendableRecordBatchStream>,
reservation: MemoryReservation,
) -> Self {
let streams = streams.into_iter().map(|s| s.fuse()).collect();
Self {
sort,
streams: FusedStreams(streams),
reservation,
phantom: Default::default(),
}
}
fn convert_batch(&mut self, batch: &RecordBatch) -> Result<ArrayValues<T::Values>> {
let value = self.sort.expr.evaluate(batch)?;
let array = value.into_array(batch.num_rows())?;
let size_in_mem = array.get_buffer_memory_size();
let array = array.as_any().downcast_ref::<T>().expect("field values");
let mut array_reservation = self.reservation.new_empty();
array_reservation.try_grow(size_in_mem)?;
Ok(ArrayValues::new(
self.sort.options,
array,
array_reservation,
))
}
}
impl<T: CursorArray> PartitionedStream for FieldCursorStream<T> {
type Output = Result<(ArrayValues<T::Values>, RecordBatch)>;
fn partitions(&self) -> usize {
self.streams.0.len()
}
fn poll_next(
&mut self,
cx: &mut Context<'_>,
stream_idx: usize,
) -> Poll<Option<Self::Output>> {
Poll::Ready(ready!(self.streams.poll_next(cx, stream_idx)).map(|r| {
r.and_then(|batch| {
let cursor = self.convert_batch(&batch)?;
Ok((cursor, batch))
})
}))
}
}