pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay};
use crate::filter_pushdown::{
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
FilterPushdownPropagation,
};
pub use crate::metrics::Metric;
pub use crate::ordering::InputOrderMode;
use crate::sort_pushdown::SortOrderPushdownResult;
pub use crate::stream::EmptyRecordBatchStream;
use arrow_schema::Schema;
pub use datafusion_common::hash_utils;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
pub use datafusion_common::utils::project_schema;
pub use datafusion_common::{ColumnStatistics, Statistics, internal_err};
pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
pub use datafusion_expr::{Accumulator, ColumnarValue};
pub use datafusion_physical_expr::window::WindowExpr;
pub use datafusion_physical_expr::{
Distribution, Partitioning, PhysicalExpr, expressions,
};
use std::any::Any;
use std::fmt::Debug;
use std::sync::{Arc, LazyLock};
use crate::coalesce_partitions::CoalescePartitionsExec;
use crate::display::DisplayableExecutionPlan;
use crate::metrics::MetricsSet;
use crate::projection::ProjectionExec;
use crate::stream::RecordBatchStreamAdapter;
use arrow::array::{Array, RecordBatch};
use arrow::datatypes::SchemaRef;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{
Constraints, DataFusionError, Result, assert_eq_or_internal_err,
assert_or_internal_err, exec_err,
};
use datafusion_common_runtime::JoinSet;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::EquivalenceProperties;
use datafusion_physical_expr_common::sort_expr::{
LexOrdering, OrderingRequirements, PhysicalSortExpr,
};
use futures::stream::{StreamExt, TryStreamExt};
pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
fn name(&self) -> &str;
fn static_name() -> &'static str
where
Self: Sized,
{
let full_name = std::any::type_name::<Self>();
let maybe_start_idx = full_name.rfind(':');
match maybe_start_idx {
Some(start_idx) => &full_name[start_idx + 1..],
None => "UNKNOWN",
}
}
fn as_any(&self) -> &dyn Any;
fn schema(&self) -> SchemaRef {
Arc::clone(self.properties().schema())
}
fn properties(&self) -> &Arc<PlanProperties>;
fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
check_default_invariants(self, check)
}
fn required_input_distribution(&self) -> Vec<Distribution> {
vec![Distribution::UnspecifiedDistribution; self.children().len()]
}
fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>> {
vec![None; self.children().len()]
}
fn maintains_input_order(&self) -> Vec<bool> {
vec![false; self.children().len()]
}
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
self.required_input_distribution()
.into_iter()
.map(|dist| !matches!(dist, Distribution::SinglePartition))
.collect()
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>;
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>>;
fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
let children = self.children().into_iter().cloned().collect();
self.with_new_children(children)
}
fn repartitioned(
&self,
_target_partitions: usize,
_config: &ConfigOptions,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
Ok(None)
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream>;
fn metrics(&self) -> Option<MetricsSet> {
None
}
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
if let Some(idx) = partition {
let partition_count = self.properties().partitioning.partition_count();
assert_or_internal_err!(
idx < partition_count,
"Invalid partition index: {}, the partition count is {}",
idx,
partition_count
);
}
Ok(Statistics::new_unknown(&self.schema()))
}
fn supports_limit_pushdown(&self) -> bool {
false
}
fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
None
}
fn fetch(&self) -> Option<usize> {
None
}
fn cardinality_effect(&self) -> CardinalityEffect {
CardinalityEffect::Unknown
}
fn try_swapping_with_projection(
&self,
_projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
Ok(None)
}
fn gather_filters_for_pushdown(
&self,
_phase: FilterPushdownPhase,
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
_config: &ConfigOptions,
) -> Result<FilterDescription> {
Ok(FilterDescription::all_unsupported(
&parent_filters,
&self.children(),
))
}
fn handle_child_pushdown_result(
&self,
_phase: FilterPushdownPhase,
child_pushdown_result: ChildPushdownResult,
_config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
}
fn with_new_state(
&self,
_state: Arc<dyn Any + Send + Sync>,
) -> Option<Arc<dyn ExecutionPlan>> {
None
}
fn try_pushdown_sort(
&self,
_order: &[PhysicalSortExpr],
) -> Result<SortOrderPushdownResult<Arc<dyn ExecutionPlan>>> {
Ok(SortOrderPushdownResult::Unsupported)
}
fn with_preserve_order(
&self,
_preserve_order: bool,
) -> Option<Arc<dyn ExecutionPlan>> {
None
}
}
#[derive(Clone, Copy)]
pub enum InvariantLevel {
Always,
Executable,
}
pub trait ExecutionPlanProperties {
fn output_partitioning(&self) -> &Partitioning;
fn output_ordering(&self) -> Option<&LexOrdering>;
fn boundedness(&self) -> Boundedness;
fn pipeline_behavior(&self) -> EmissionType;
fn equivalence_properties(&self) -> &EquivalenceProperties;
}
impl ExecutionPlanProperties for Arc<dyn ExecutionPlan> {
fn output_partitioning(&self) -> &Partitioning {
self.properties().output_partitioning()
}
fn output_ordering(&self) -> Option<&LexOrdering> {
self.properties().output_ordering()
}
fn boundedness(&self) -> Boundedness {
self.properties().boundedness
}
fn pipeline_behavior(&self) -> EmissionType {
self.properties().emission_type
}
fn equivalence_properties(&self) -> &EquivalenceProperties {
self.properties().equivalence_properties()
}
}
impl ExecutionPlanProperties for &dyn ExecutionPlan {
fn output_partitioning(&self) -> &Partitioning {
self.properties().output_partitioning()
}
fn output_ordering(&self) -> Option<&LexOrdering> {
self.properties().output_ordering()
}
fn boundedness(&self) -> Boundedness {
self.properties().boundedness
}
fn pipeline_behavior(&self) -> EmissionType {
self.properties().emission_type
}
fn equivalence_properties(&self) -> &EquivalenceProperties {
self.properties().equivalence_properties()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Boundedness {
Bounded,
Unbounded {
requires_infinite_memory: bool,
},
}
impl Boundedness {
pub fn is_unbounded(&self) -> bool {
matches!(self, Boundedness::Unbounded { .. })
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EmissionType {
Incremental,
Final,
Both,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SchedulingType {
NonCooperative,
Cooperative,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EvaluationType {
Lazy,
Eager,
}
pub(crate) fn boundedness_from_children<'a>(
children: impl IntoIterator<Item = &'a Arc<dyn ExecutionPlan>>,
) -> Boundedness {
let mut unbounded_with_finite_mem = false;
for child in children {
match child.boundedness() {
Boundedness::Unbounded {
requires_infinite_memory: true,
} => {
return Boundedness::Unbounded {
requires_infinite_memory: true,
};
}
Boundedness::Unbounded {
requires_infinite_memory: false,
} => {
unbounded_with_finite_mem = true;
}
Boundedness::Bounded => {}
}
}
if unbounded_with_finite_mem {
Boundedness::Unbounded {
requires_infinite_memory: false,
}
} else {
Boundedness::Bounded
}
}
pub(crate) fn emission_type_from_children<'a>(
children: impl IntoIterator<Item = &'a Arc<dyn ExecutionPlan>>,
) -> EmissionType {
let mut inc_and_final = false;
for child in children {
match child.pipeline_behavior() {
EmissionType::Final => return EmissionType::Final,
EmissionType::Both => inc_and_final = true,
EmissionType::Incremental => continue,
}
}
if inc_and_final {
EmissionType::Both
} else {
EmissionType::Incremental
}
}
#[derive(Debug, Clone)]
pub struct PlanProperties {
pub eq_properties: EquivalenceProperties,
pub partitioning: Partitioning,
pub emission_type: EmissionType,
pub boundedness: Boundedness,
pub evaluation_type: EvaluationType,
pub scheduling_type: SchedulingType,
output_ordering: Option<LexOrdering>,
}
impl PlanProperties {
pub fn new(
eq_properties: EquivalenceProperties,
partitioning: Partitioning,
emission_type: EmissionType,
boundedness: Boundedness,
) -> Self {
let output_ordering = eq_properties.output_ordering();
Self {
eq_properties,
partitioning,
emission_type,
boundedness,
evaluation_type: EvaluationType::Lazy,
scheduling_type: SchedulingType::NonCooperative,
output_ordering,
}
}
pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self {
self.partitioning = partitioning;
self
}
pub fn set_eq_properties(&mut self, eq_properties: EquivalenceProperties) {
self.output_ordering = eq_properties.output_ordering();
self.eq_properties = eq_properties;
}
pub fn with_eq_properties(mut self, eq_properties: EquivalenceProperties) -> Self {
self.set_eq_properties(eq_properties);
self
}
pub fn with_boundedness(mut self, boundedness: Boundedness) -> Self {
self.boundedness = boundedness;
self
}
pub fn with_emission_type(mut self, emission_type: EmissionType) -> Self {
self.emission_type = emission_type;
self
}
pub fn with_scheduling_type(mut self, scheduling_type: SchedulingType) -> Self {
self.scheduling_type = scheduling_type;
self
}
pub fn with_evaluation_type(mut self, drive_type: EvaluationType) -> Self {
self.evaluation_type = drive_type;
self
}
pub fn set_constraints(&mut self, constraints: Constraints) {
self.eq_properties.set_constraints(constraints);
}
pub fn with_constraints(mut self, constraints: Constraints) -> Self {
self.set_constraints(constraints);
self
}
pub fn equivalence_properties(&self) -> &EquivalenceProperties {
&self.eq_properties
}
pub fn output_partitioning(&self) -> &Partitioning {
&self.partitioning
}
pub fn output_ordering(&self) -> Option<&LexOrdering> {
self.output_ordering.as_ref()
}
pub(crate) fn schema(&self) -> &SchemaRef {
self.eq_properties.schema()
}
}
macro_rules! check_len {
($target:expr, $func_name:ident, $expected_len:expr) => {
let actual_len = $target.$func_name().len();
assert_eq_or_internal_err!(
actual_len,
$expected_len,
"{}::{} returned Vec with incorrect size: {} != {}",
$target.name(),
stringify!($func_name),
actual_len,
$expected_len
);
};
}
pub fn check_default_invariants<P: ExecutionPlan + ?Sized>(
plan: &P,
_check: InvariantLevel,
) -> Result<(), DataFusionError> {
let children_len = plan.children().len();
check_len!(plan, maintains_input_order, children_len);
check_len!(plan, required_input_ordering, children_len);
check_len!(plan, required_input_distribution, children_len);
check_len!(plan, benefits_from_input_partitioning, children_len);
Ok(())
}
#[expect(clippy::needless_pass_by_value)]
pub fn need_data_exchange(plan: Arc<dyn ExecutionPlan>) -> bool {
plan.properties().evaluation_type == EvaluationType::Eager
}
pub fn with_new_children_if_necessary(
plan: Arc<dyn ExecutionPlan>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
let old_children = plan.children();
assert_eq_or_internal_err!(
children.len(),
old_children.len(),
"Wrong number of children"
);
if children.is_empty()
|| children
.iter()
.zip(old_children.iter())
.any(|(c1, c2)| !Arc::ptr_eq(c1, c2))
{
plan.with_new_children(children)
} else {
Ok(plan)
}
}
pub fn displayable(plan: &dyn ExecutionPlan) -> DisplayableExecutionPlan<'_> {
DisplayableExecutionPlan::new(plan)
}
pub async fn collect(
plan: Arc<dyn ExecutionPlan>,
context: Arc<TaskContext>,
) -> Result<Vec<RecordBatch>> {
let stream = execute_stream(plan, context)?;
crate::common::collect(stream).await
}
#[expect(
clippy::needless_pass_by_value,
reason = "Public API that historically takes owned Arcs"
)]
pub fn execute_stream(
plan: Arc<dyn ExecutionPlan>,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
match plan.output_partitioning().partition_count() {
0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))),
1 => plan.execute(0, context),
2.. => {
let plan = CoalescePartitionsExec::new(Arc::clone(&plan));
assert_eq!(1, plan.properties().output_partitioning().partition_count());
plan.execute(0, context)
}
}
}
pub async fn collect_partitioned(
plan: Arc<dyn ExecutionPlan>,
context: Arc<TaskContext>,
) -> Result<Vec<Vec<RecordBatch>>> {
let streams = execute_stream_partitioned(plan, context)?;
let mut join_set = JoinSet::new();
streams.into_iter().enumerate().for_each(|(idx, stream)| {
join_set.spawn(async move {
let result: Result<Vec<RecordBatch>> = stream.try_collect().await;
(idx, result)
});
});
let mut batches = vec![];
while let Some(result) = join_set.join_next().await {
match result {
Ok((idx, res)) => batches.push((idx, res?)),
Err(e) => {
if e.is_panic() {
std::panic::resume_unwind(e.into_panic());
} else {
unreachable!();
}
}
}
}
batches.sort_by_key(|(idx, _)| *idx);
let batches = batches.into_iter().map(|(_, batch)| batch).collect();
Ok(batches)
}
#[expect(
clippy::needless_pass_by_value,
reason = "Public API that historically takes owned Arcs"
)]
pub fn execute_stream_partitioned(
plan: Arc<dyn ExecutionPlan>,
context: Arc<TaskContext>,
) -> Result<Vec<SendableRecordBatchStream>> {
let num_partitions = plan.output_partitioning().partition_count();
let mut streams = Vec::with_capacity(num_partitions);
for i in 0..num_partitions {
streams.push(plan.execute(i, Arc::clone(&context))?);
}
Ok(streams)
}
#[expect(
clippy::needless_pass_by_value,
reason = "Public API that historically takes owned Arcs"
)]
pub fn execute_input_stream(
input: Arc<dyn ExecutionPlan>,
sink_schema: SchemaRef,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let input_stream = input.execute(partition, context)?;
debug_assert_eq!(sink_schema.fields().len(), input.schema().fields().len());
let risky_columns: Vec<_> = sink_schema
.fields()
.iter()
.zip(input.schema().fields().iter())
.enumerate()
.filter_map(|(idx, (sink_field, input_field))| {
(!sink_field.is_nullable() && input_field.is_nullable()).then_some(idx)
})
.collect();
if risky_columns.is_empty() {
Ok(input_stream)
} else {
Ok(Box::pin(RecordBatchStreamAdapter::new(
sink_schema,
input_stream
.map(move |batch| check_not_null_constraints(batch?, &risky_columns)),
)))
}
}
pub fn check_not_null_constraints(
batch: RecordBatch,
column_indices: &Vec<usize>,
) -> Result<RecordBatch> {
for &index in column_indices {
if batch.num_columns() <= index {
return exec_err!(
"Invalid batch column count {} expected > {}",
batch.num_columns(),
index
);
}
if batch
.column(index)
.logical_nulls()
.map(|nulls| nulls.null_count())
.unwrap_or_default()
> 0
{
return exec_err!(
"Invalid batch column at '{}' has null but schema specifies non-nullable",
index
);
}
}
Ok(batch)
}
pub fn reset_plan_states(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_up(|plan| {
let new_plan = Arc::clone(&plan).reset_state()?;
Ok(Transformed::yes(new_plan))
})
.data()
}
pub fn has_same_children_properties(
plan: &impl ExecutionPlan,
children: &[Arc<dyn ExecutionPlan>],
) -> Result<bool> {
let old_children = plan.children();
assert_eq_or_internal_err!(
children.len(),
old_children.len(),
"Wrong number of children"
);
for (lhs, rhs) in old_children.iter().zip(children.iter()) {
if !Arc::ptr_eq(lhs.properties(), rhs.properties()) {
return Ok(false);
}
}
Ok(true)
}
#[macro_export]
macro_rules! check_if_same_properties {
($plan: expr, $children: expr) => {
if $crate::execution_plan::has_same_children_properties(
$plan.as_ref(),
&$children,
)? {
let plan = $plan.with_new_children_and_same_properties($children);
return Ok(::std::sync::Arc::new(plan));
}
};
}
pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
let formatted = displayable(plan.as_ref()).indent(true).to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
actual.iter().map(|elem| (*elem).to_string()).collect()
}
pub enum CardinalityEffect {
Unknown,
Equal,
LowerEqual,
GreaterEqual,
}
pub(crate) fn stub_properties() -> Arc<PlanProperties> {
static STUB_PROPERTIES: LazyLock<Arc<PlanProperties>> = LazyLock::new(|| {
Arc::new(PlanProperties::new(
EquivalenceProperties::new(Arc::new(Schema::empty())),
Partitioning::UnknownPartitioning(1),
EmissionType::Final,
Boundedness::Bounded,
))
});
Arc::clone(&STUB_PROPERTIES)
}
#[cfg(test)]
mod tests {
use std::any::Any;
use std::sync::Arc;
use super::*;
use crate::{DisplayAs, DisplayFormatType, ExecutionPlan};
use arrow::array::{DictionaryArray, Int32Array, NullArray, RunArray};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::{Result, Statistics};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
#[derive(Debug)]
pub struct EmptyExec;
impl EmptyExec {
pub fn new(_schema: SchemaRef) -> Self {
Self
}
}
impl DisplayAs for EmptyExec {
fn fmt_as(
&self,
_t: DisplayFormatType,
_f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
unimplemented!()
}
}
impl ExecutionPlan for EmptyExec {
fn name(&self) -> &'static str {
Self::static_name()
}
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &Arc<PlanProperties> {
unimplemented!()
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
}
fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
unimplemented!()
}
fn partition_statistics(&self, _partition: Option<usize>) -> Result<Statistics> {
unimplemented!()
}
}
#[derive(Debug)]
pub struct RenamedEmptyExec;
impl RenamedEmptyExec {
pub fn new(_schema: SchemaRef) -> Self {
Self
}
}
impl DisplayAs for RenamedEmptyExec {
fn fmt_as(
&self,
_t: DisplayFormatType,
_f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
unimplemented!()
}
}
impl ExecutionPlan for RenamedEmptyExec {
fn name(&self) -> &'static str {
Self::static_name()
}
fn static_name() -> &'static str
where
Self: Sized,
{
"MyRenamedEmptyExec"
}
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &Arc<PlanProperties> {
unimplemented!()
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
}
fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
unimplemented!()
}
fn partition_statistics(&self, _partition: Option<usize>) -> Result<Statistics> {
unimplemented!()
}
}
#[test]
fn test_execution_plan_name() {
let schema1 = Arc::new(Schema::empty());
let default_name_exec = EmptyExec::new(schema1);
assert_eq!(default_name_exec.name(), "EmptyExec");
let schema2 = Arc::new(Schema::empty());
let renamed_exec = RenamedEmptyExec::new(schema2);
assert_eq!(renamed_exec.name(), "MyRenamedEmptyExec");
assert_eq!(RenamedEmptyExec::static_name(), "MyRenamedEmptyExec");
}
#[expect(unused)]
fn use_execution_plan_as_trait_object(plan: &dyn ExecutionPlan) {
let _ = plan.name();
}
#[test]
fn test_check_not_null_constraints_accept_non_null() -> Result<()> {
check_not_null_constraints(
RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])),
vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)]))],
)?,
&vec![0],
)?;
Ok(())
}
#[test]
fn test_check_not_null_constraints_reject_null() -> Result<()> {
let result = check_not_null_constraints(
RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])),
vec![Arc::new(Int32Array::from(vec![Some(1), None, Some(3)]))],
)?,
&vec![0],
);
assert!(result.is_err());
assert_eq!(
result.err().unwrap().strip_backtrace(),
"Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
);
Ok(())
}
#[test]
fn test_check_not_null_constraints_with_run_end_array() -> Result<()> {
let run_ends = Int32Array::from(vec![1, 2, 3, 4]);
let values = Int32Array::from(vec![Some(0), None, Some(1), None]);
let run_end_array = RunArray::try_new(&run_ends, &values)?;
let result = check_not_null_constraints(
RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new(
"a",
run_end_array.data_type().to_owned(),
true,
)])),
vec![Arc::new(run_end_array)],
)?,
&vec![0],
);
assert!(result.is_err());
assert_eq!(
result.err().unwrap().strip_backtrace(),
"Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
);
Ok(())
}
#[test]
fn test_check_not_null_constraints_with_dictionary_array_with_null() -> Result<()> {
let values = Arc::new(Int32Array::from(vec![Some(1), None, Some(3), Some(4)]));
let keys = Int32Array::from(vec![0, 1, 2, 3]);
let dictionary = DictionaryArray::new(keys, values);
let result = check_not_null_constraints(
RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new(
"a",
dictionary.data_type().to_owned(),
true,
)])),
vec![Arc::new(dictionary)],
)?,
&vec![0],
);
assert!(result.is_err());
assert_eq!(
result.err().unwrap().strip_backtrace(),
"Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
);
Ok(())
}
#[test]
fn test_check_not_null_constraints_with_dictionary_masking_null() -> Result<()> {
let values = Arc::new(Int32Array::from(vec![
Some(1),
None, Some(3),
Some(4),
]));
let keys = Int32Array::from(vec![0, 2, 3]);
let dictionary = DictionaryArray::new(keys, values);
check_not_null_constraints(
RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new(
"a",
dictionary.data_type().to_owned(),
true,
)])),
vec![Arc::new(dictionary)],
)?,
&vec![0],
)?;
Ok(())
}
#[test]
fn test_check_not_null_constraints_on_null_type() -> Result<()> {
let result = check_not_null_constraints(
RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("a", DataType::Null, true)])),
vec![Arc::new(NullArray::new(3))],
)?,
&vec![0],
);
assert!(result.is_err());
assert_eq!(
result.err().unwrap().strip_backtrace(),
"Execution error: Invalid batch column at '0' has null but schema specifies non-nullable",
);
Ok(())
}
}