use std::any::Any;
use std::fmt;
use std::sync::Arc;
use crate::source::{DataSource, DataSourceExec};
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion_physical_plan::memory::MemoryStream;
use datafusion_physical_plan::projection::{
all_alias_free_columns, new_projections_for_columns, ProjectionExec,
};
use datafusion_physical_plan::{
common, ColumnarValue, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
PhysicalExpr, PlanProperties, SendableRecordBatchStream, Statistics,
};
use arrow::array::{RecordBatch, RecordBatchOptions};
use arrow::datatypes::{Schema, SchemaRef};
use datafusion_common::{
internal_err, plan_err, project_schema, Constraints, Result, ScalarValue,
};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::equivalence::ProjectionMapping;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::utils::collect_columns;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
#[derive(Clone)]
#[deprecated(
since = "46.0.0",
note = "use MemorySourceConfig and DataSourceExec instead"
)]
pub struct MemoryExec {
inner: DataSourceExec,
partitions: Vec<Vec<RecordBatch>>,
projection: Option<Vec<usize>>,
sort_information: Vec<LexOrdering>,
show_sizes: bool,
}
#[allow(unused, deprecated)]
impl fmt::Debug for MemoryExec {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.inner.fmt_as(DisplayFormatType::Default, f)
}
}
#[allow(unused, deprecated)]
impl DisplayAs for MemoryExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
self.inner.fmt_as(t, f)
}
}
#[allow(unused, deprecated)]
impl ExecutionPlan for MemoryExec {
fn name(&self) -> &'static str {
"MemoryExec"
}
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &PlanProperties {
self.inner.properties()
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
if children.is_empty() {
Ok(self)
} else {
internal_err!("Children cannot be replaced in {self:?}")
}
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
self.inner.execute(partition, context)
}
fn statistics(&self) -> Result<Statistics> {
self.inner.statistics()
}
fn try_swapping_with_projection(
&self,
projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
self.inner.try_swapping_with_projection(projection)
}
}
#[allow(unused, deprecated)]
impl MemoryExec {
pub fn try_new(
partitions: &[Vec<RecordBatch>],
schema: SchemaRef,
projection: Option<Vec<usize>>,
) -> Result<Self> {
let source = MemorySourceConfig::try_new(partitions, schema, projection.clone())?;
let data_source = DataSourceExec::new(Arc::new(source));
Ok(Self {
inner: data_source,
partitions: partitions.to_vec(),
projection,
sort_information: vec![],
show_sizes: true,
})
}
pub fn try_new_as_values(
schema: SchemaRef,
data: Vec<Vec<Arc<dyn PhysicalExpr>>>,
) -> Result<Self> {
if data.is_empty() {
return plan_err!("Values list cannot be empty");
}
let n_row = data.len();
let n_col = schema.fields().len();
let placeholder_schema = Arc::new(Schema::empty());
let placeholder_batch = RecordBatch::try_new_with_options(
Arc::clone(&placeholder_schema),
vec![],
&RecordBatchOptions::new().with_row_count(Some(1)),
)?;
let arrays = (0..n_col)
.map(|j| {
(0..n_row)
.map(|i| {
let expr = &data[i][j];
let result = expr.evaluate(&placeholder_batch)?;
match result {
ColumnarValue::Scalar(scalar) => Ok(scalar),
ColumnarValue::Array(array) if array.len() == 1 => {
ScalarValue::try_from_array(&array, 0)
}
ColumnarValue::Array(_) => {
plan_err!("Cannot have array values in a values list")
}
}
})
.collect::<Result<Vec<_>>>()
.and_then(ScalarValue::iter_to_array)
})
.collect::<Result<Vec<_>>>()?;
let batch = RecordBatch::try_new_with_options(
Arc::clone(&schema),
arrays,
&RecordBatchOptions::new().with_row_count(Some(n_row)),
)?;
let partitions = vec![batch];
Self::try_new_from_batches(Arc::clone(&schema), partitions)
}
pub fn try_new_from_batches(
schema: SchemaRef,
batches: Vec<RecordBatch>,
) -> Result<Self> {
if batches.is_empty() {
return plan_err!("Values list cannot be empty");
}
for batch in &batches {
let batch_schema = batch.schema();
if batch_schema != schema {
return plan_err!(
"Batch has invalid schema. Expected: {}, got: {}",
schema,
batch_schema
);
}
}
let partitions = vec![batches];
let source = MemorySourceConfig {
partitions: partitions.clone(),
schema: Arc::clone(&schema),
projected_schema: Arc::clone(&schema),
projection: None,
sort_information: vec![],
show_sizes: true,
fetch: None,
};
let data_source = DataSourceExec::new(Arc::new(source));
Ok(Self {
inner: data_source,
partitions,
projection: None,
sort_information: vec![],
show_sizes: true,
})
}
fn memory_source_config(&self) -> MemorySourceConfig {
self.inner
.data_source()
.as_any()
.downcast_ref::<MemorySourceConfig>()
.unwrap()
.clone()
}
pub fn with_constraints(mut self, constraints: Constraints) -> Self {
self.inner = self.inner.with_constraints(constraints);
self
}
pub fn with_show_sizes(mut self, show_sizes: bool) -> Self {
let mut memory_source = self.memory_source_config();
memory_source.show_sizes = show_sizes;
self.show_sizes = show_sizes;
self.inner = DataSourceExec::new(Arc::new(memory_source));
self
}
pub fn constraints(&self) -> &Constraints {
self.properties().equivalence_properties().constraints()
}
pub fn partitions(&self) -> &[Vec<RecordBatch>] {
&self.partitions
}
pub fn projection(&self) -> &Option<Vec<usize>> {
&self.projection
}
pub fn show_sizes(&self) -> bool {
self.show_sizes
}
pub fn sort_information(&self) -> &[LexOrdering] {
&self.sort_information
}
pub fn try_with_sort_information(
mut self,
sort_information: Vec<LexOrdering>,
) -> Result<Self> {
self.sort_information = sort_information.clone();
let mut memory_source = self.memory_source_config();
memory_source = memory_source.try_with_sort_information(sort_information)?;
self.inner = DataSourceExec::new(Arc::new(memory_source));
Ok(self)
}
pub fn original_schema(&self) -> SchemaRef {
Arc::clone(&self.inner.schema())
}
fn compute_properties(
schema: SchemaRef,
orderings: &[LexOrdering],
constraints: Constraints,
partitions: &[Vec<RecordBatch>],
) -> PlanProperties {
PlanProperties::new(
EquivalenceProperties::new_with_orderings(schema, orderings)
.with_constraints(constraints),
Partitioning::UnknownPartitioning(partitions.len()),
EmissionType::Incremental,
Boundedness::Bounded,
)
}
}
#[derive(Clone, Debug)]
pub struct MemorySourceConfig {
partitions: Vec<Vec<RecordBatch>>,
schema: SchemaRef,
projected_schema: SchemaRef,
projection: Option<Vec<usize>>,
sort_information: Vec<LexOrdering>,
show_sizes: bool,
fetch: Option<usize>,
}
impl DataSource for MemorySourceConfig {
fn open(
&self,
partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
Ok(Box::pin(
MemoryStream::try_new(
self.partitions[partition].clone(),
Arc::clone(&self.projected_schema),
self.projection.clone(),
)?
.with_fetch(self.fetch),
))
}
fn as_any(&self) -> &dyn Any {
self
}
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
let partition_sizes: Vec<_> =
self.partitions.iter().map(|b| b.len()).collect();
let output_ordering = self
.sort_information
.first()
.map(|output_ordering| {
format!(", output_ordering={}", output_ordering)
})
.unwrap_or_default();
let eq_properties = self.eq_properties();
let constraints = eq_properties.constraints();
let constraints = if constraints.is_empty() {
String::new()
} else {
format!(", {}", constraints)
};
let limit = self
.fetch
.map_or(String::new(), |limit| format!(", fetch={}", limit));
if self.show_sizes {
write!(
f,
"partitions={}, partition_sizes={partition_sizes:?}{limit}{output_ordering}{constraints}",
partition_sizes.len(),
)
} else {
write!(
f,
"partitions={}{limit}{output_ordering}{constraints}",
partition_sizes.len(),
)
}
}
}
}
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(self.partitions.len())
}
fn eq_properties(&self) -> EquivalenceProperties {
EquivalenceProperties::new_with_orderings(
Arc::clone(&self.projected_schema),
self.sort_information.as_slice(),
)
}
fn statistics(&self) -> Result<Statistics> {
Ok(common::compute_record_batch_statistics(
&self.partitions,
&self.schema,
self.projection.clone(),
))
}
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
let source = self.clone();
Some(Arc::new(source.with_limit(limit)))
}
fn fetch(&self) -> Option<usize> {
self.fetch
}
fn try_swapping_with_projection(
&self,
projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
all_alias_free_columns(projection.expr())
.then(|| {
let all_projections = (0..self.schema.fields().len()).collect();
let new_projections = new_projections_for_columns(
projection,
self.projection().as_ref().unwrap_or(&all_projections),
);
MemorySourceConfig::try_new_exec(
self.partitions(),
self.original_schema(),
Some(new_projections),
)
.map(|e| e as _)
})
.transpose()
}
}
impl MemorySourceConfig {
pub fn try_new(
partitions: &[Vec<RecordBatch>],
schema: SchemaRef,
projection: Option<Vec<usize>>,
) -> Result<Self> {
let projected_schema = project_schema(&schema, projection.as_ref())?;
Ok(Self {
partitions: partitions.to_vec(),
schema,
projected_schema,
projection,
sort_information: vec![],
show_sizes: true,
fetch: None,
})
}
pub fn try_new_exec(
partitions: &[Vec<RecordBatch>],
schema: SchemaRef,
projection: Option<Vec<usize>>,
) -> Result<Arc<DataSourceExec>> {
let source = Self::try_new(partitions, schema, projection)?;
Ok(Arc::new(DataSourceExec::new(Arc::new(source))))
}
pub fn try_new_as_values(
schema: SchemaRef,
data: Vec<Vec<Arc<dyn PhysicalExpr>>>,
) -> Result<Arc<DataSourceExec>> {
if data.is_empty() {
return plan_err!("Values list cannot be empty");
}
let n_row = data.len();
let n_col = schema.fields().len();
let placeholder_schema = Arc::new(Schema::empty());
let placeholder_batch = RecordBatch::try_new_with_options(
Arc::clone(&placeholder_schema),
vec![],
&RecordBatchOptions::new().with_row_count(Some(1)),
)?;
let arrays = (0..n_col)
.map(|j| {
(0..n_row)
.map(|i| {
let expr = &data[i][j];
let result = expr.evaluate(&placeholder_batch)?;
match result {
ColumnarValue::Scalar(scalar) => Ok(scalar),
ColumnarValue::Array(array) if array.len() == 1 => {
ScalarValue::try_from_array(&array, 0)
}
ColumnarValue::Array(_) => {
plan_err!("Cannot have array values in a values list")
}
}
})
.collect::<Result<Vec<_>>>()
.and_then(ScalarValue::iter_to_array)
})
.collect::<Result<Vec<_>>>()?;
let batch = RecordBatch::try_new_with_options(
Arc::clone(&schema),
arrays,
&RecordBatchOptions::new().with_row_count(Some(n_row)),
)?;
let partitions = vec![batch];
Self::try_new_from_batches(Arc::clone(&schema), partitions)
}
pub fn try_new_from_batches(
schema: SchemaRef,
batches: Vec<RecordBatch>,
) -> Result<Arc<DataSourceExec>> {
if batches.is_empty() {
return plan_err!("Values list cannot be empty");
}
for batch in &batches {
let batch_schema = batch.schema();
if batch_schema != schema {
return plan_err!(
"Batch has invalid schema. Expected: {}, got: {}",
schema,
batch_schema
);
}
}
let partitions = vec![batches];
let source = Self {
partitions,
schema: Arc::clone(&schema),
projected_schema: Arc::clone(&schema),
projection: None,
sort_information: vec![],
show_sizes: true,
fetch: None,
};
Ok(Arc::new(DataSourceExec::new(Arc::new(source))))
}
pub fn with_limit(mut self, limit: Option<usize>) -> Self {
self.fetch = limit;
self
}
pub fn with_show_sizes(mut self, show_sizes: bool) -> Self {
self.show_sizes = show_sizes;
self
}
pub fn partitions(&self) -> &[Vec<RecordBatch>] {
&self.partitions
}
pub fn projection(&self) -> &Option<Vec<usize>> {
&self.projection
}
pub fn show_sizes(&self) -> bool {
self.show_sizes
}
pub fn sort_information(&self) -> &[LexOrdering] {
&self.sort_information
}
pub fn try_with_sort_information(
mut self,
mut sort_information: Vec<LexOrdering>,
) -> Result<Self> {
let fields = self.schema.fields();
let ambiguous_column = sort_information
.iter()
.flat_map(|ordering| ordering.clone())
.flat_map(|expr| collect_columns(&expr.expr))
.find(|col| {
fields
.get(col.index())
.map(|field| field.name() != col.name())
.unwrap_or(true)
});
if let Some(col) = ambiguous_column {
return internal_err!(
"Column {:?} is not found in the original schema of the MemorySourceConfig",
col
);
}
if let Some(projection) = &self.projection {
let base_eqp = EquivalenceProperties::new_with_orderings(
self.original_schema(),
&sort_information,
);
let proj_exprs = projection
.iter()
.map(|idx| {
let base_schema = self.original_schema();
let name = base_schema.field(*idx).name();
(Arc::new(Column::new(name, *idx)) as _, name.to_string())
})
.collect::<Vec<_>>();
let projection_mapping =
ProjectionMapping::try_new(&proj_exprs, &self.original_schema())?;
sort_information = base_eqp
.project(&projection_mapping, Arc::clone(&self.projected_schema))
.into_oeq_class()
.into_inner();
}
self.sort_information = sort_information;
Ok(self)
}
pub fn original_schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
}
#[cfg(test)]
mod memory_source_tests {
use std::sync::Arc;
use crate::memory::MemorySourceConfig;
use crate::source::DataSourceExec;
use datafusion_physical_plan::ExecutionPlan;
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
#[test]
fn test_memory_order_eq() -> datafusion_common::Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int64, false),
Field::new("b", DataType::Int64, false),
Field::new("c", DataType::Int64, false),
]));
let sort1 = LexOrdering::new(vec![
PhysicalSortExpr {
expr: col("a", &schema)?,
options: SortOptions::default(),
},
PhysicalSortExpr {
expr: col("b", &schema)?,
options: SortOptions::default(),
},
]);
let sort2 = LexOrdering::new(vec![PhysicalSortExpr {
expr: col("c", &schema)?,
options: SortOptions::default(),
}]);
let mut expected_output_order = LexOrdering::default();
expected_output_order.extend(sort1.clone());
expected_output_order.extend(sort2.clone());
let sort_information = vec![sort1.clone(), sort2.clone()];
let mem_exec = Arc::new(DataSourceExec::new(Arc::new(
MemorySourceConfig::try_new(&[vec![]], schema, None)?
.try_with_sort_information(sort_information)?,
)));
assert_eq!(
mem_exec.properties().output_ordering().unwrap(),
&expected_output_order
);
let eq_properties = mem_exec.properties().equivalence_properties();
assert!(eq_properties.oeq_class().contains(&sort1));
assert!(eq_properties.oeq_class().contains(&sort2));
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::tests::{aggr_test_schema, make_partition};
use super::*;
use datafusion_physical_plan::expressions::lit;
use arrow::datatypes::{DataType, Field};
use datafusion_common::assert_batches_eq;
use datafusion_common::stats::{ColumnStatistics, Precision};
use futures::StreamExt;
#[tokio::test]
async fn exec_with_limit() -> Result<()> {
let task_ctx = Arc::new(TaskContext::default());
let batch = make_partition(7);
let schema = batch.schema();
let batches = vec![batch.clone(), batch];
let exec = MemorySourceConfig::try_new_from_batches(schema, batches).unwrap();
assert_eq!(exec.fetch(), None);
let exec = exec.with_fetch(Some(4)).unwrap();
assert_eq!(exec.fetch(), Some(4));
let mut it = exec.execute(0, task_ctx)?;
let mut results = vec![];
while let Some(batch) = it.next().await {
results.push(batch?);
}
let expected = [
"+---+", "| i |", "+---+", "| 0 |", "| 1 |", "| 2 |", "| 3 |", "+---+",
];
assert_batches_eq!(expected, &results);
Ok(())
}
#[tokio::test]
async fn values_empty_case() -> Result<()> {
let schema = aggr_test_schema();
let empty = MemorySourceConfig::try_new_as_values(schema, vec![]);
assert!(empty.is_err());
Ok(())
}
#[test]
fn new_exec_with_batches() {
let batch = make_partition(7);
let schema = batch.schema();
let batches = vec![batch.clone(), batch];
let _exec = MemorySourceConfig::try_new_from_batches(schema, batches).unwrap();
}
#[test]
fn new_exec_with_batches_empty() {
let batch = make_partition(7);
let schema = batch.schema();
let _ = MemorySourceConfig::try_new_from_batches(schema, Vec::new()).unwrap_err();
}
#[test]
fn new_exec_with_batches_invalid_schema() {
let batch = make_partition(7);
let batches = vec![batch.clone(), batch];
let invalid_schema = Arc::new(Schema::new(vec![
Field::new("col0", DataType::UInt32, false),
Field::new("col1", DataType::Utf8, false),
]));
let _ = MemorySourceConfig::try_new_from_batches(invalid_schema, batches)
.unwrap_err();
}
#[test]
fn new_exec_with_non_nullable_schema() {
let schema = Arc::new(Schema::new(vec![Field::new(
"col0",
DataType::UInt32,
false,
)]));
let _ = MemorySourceConfig::try_new_as_values(
Arc::clone(&schema),
vec![vec![lit(1u32)]],
)
.unwrap();
let _ = MemorySourceConfig::try_new_as_values(
schema,
vec![vec![lit(ScalarValue::UInt32(None))]],
)
.unwrap_err();
}
#[test]
fn values_stats_with_nulls_only() -> Result<()> {
let data = vec![
vec![lit(ScalarValue::Null)],
vec![lit(ScalarValue::Null)],
vec![lit(ScalarValue::Null)],
];
let rows = data.len();
let values = MemorySourceConfig::try_new_as_values(
Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, true)])),
data,
)?;
assert_eq!(
values.statistics()?,
Statistics {
num_rows: Precision::Exact(rows),
total_byte_size: Precision::Exact(8), column_statistics: vec![ColumnStatistics {
null_count: Precision::Exact(rows), distinct_count: Precision::Absent,
max_value: Precision::Absent,
min_value: Precision::Absent,
sum_value: Precision::Absent,
},],
}
);
Ok(())
}
}