use std::any::Any;
use std::hash::Hash;
use std::sync::Arc;
use crate::physical_expr::PhysicalExpr;
use arrow::datatypes::FieldRef;
use arrow::{
datatypes::{DataType, Schema, SchemaRef},
record_batch::RecordBatch,
};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{Result, internal_err, plan_err};
use datafusion_expr::ColumnarValue;
use datafusion_expr_common::placement::ExpressionPlacement;
#[derive(Debug, Hash, PartialEq, Eq, Clone)]
pub struct Column {
name: String,
index: usize,
}
impl Column {
pub fn new(name: &str, index: usize) -> Self {
Self {
name: name.to_owned(),
index,
}
}
pub fn new_with_schema(name: &str, schema: &Schema) -> Result<Self> {
Ok(Column::new(name, schema.index_of(name)?))
}
pub fn name(&self) -> &str {
&self.name
}
pub fn index(&self) -> usize {
self.index
}
}
impl std::fmt::Display for Column {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}@{}", self.name, self.index)
}
}
impl PhysicalExpr for Column {
fn as_any(&self) -> &dyn Any {
self
}
fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
self.bounds_check(input_schema)?;
Ok(input_schema.field(self.index).data_type().clone())
}
fn nullable(&self, input_schema: &Schema) -> Result<bool> {
self.bounds_check(input_schema)?;
Ok(input_schema.field(self.index).is_nullable())
}
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
self.bounds_check(batch.schema().as_ref())?;
Ok(ColumnarValue::Array(Arc::clone(batch.column(self.index))))
}
fn return_field(&self, input_schema: &Schema) -> Result<FieldRef> {
Ok(input_schema.field(self.index).clone().into())
}
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn PhysicalExpr>> {
Ok(self)
}
fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.name)
}
fn placement(&self) -> ExpressionPlacement {
ExpressionPlacement::Column
}
}
impl Column {
fn bounds_check(&self, input_schema: &Schema) -> Result<()> {
if self.index < input_schema.fields.len() {
Ok(())
} else {
internal_err!(
"PhysicalExpr Column references column '{}' at index {} (zero-based) but input schema only has {} columns: {:?}",
self.name,
self.index,
input_schema.fields.len(),
input_schema
.fields()
.iter()
.map(|f| f.name())
.collect::<Vec<_>>()
)
}
}
}
pub fn col(name: &str, schema: &Schema) -> Result<Arc<dyn PhysicalExpr>> {
Ok(Arc::new(Column::new_with_schema(name, schema)?))
}
pub fn with_new_schema(
expr: Arc<dyn PhysicalExpr>,
schema: &SchemaRef,
) -> Result<Arc<dyn PhysicalExpr>> {
Ok(expr
.transform_up(|expr| {
if let Some(col) = expr.as_any().downcast_ref::<Column>() {
let idx = col.index();
let Some(field) = schema.fields().get(idx) else {
return plan_err!(
"New schema has fewer columns than original schema"
);
};
let new_col = Column::new(field.name(), idx);
Ok(Transformed::yes(Arc::new(new_col) as _))
} else {
Ok(Transformed::no(expr))
}
})?
.data)
}
#[cfg(test)]
mod test {
use super::Column;
use crate::physical_expr::PhysicalExpr;
use arrow::array::StringArray;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use std::sync::Arc;
#[test]
fn out_of_bounds_data_type() {
let schema = Schema::new(vec![Field::new("foo", DataType::Utf8, true)]);
let col = Column::new("id", 9);
let error = col.data_type(&schema).expect_err("error").strip_backtrace();
assert!("Internal error: PhysicalExpr Column references column 'id' at index 9 (zero-based) \
but input schema only has 1 columns: [\"foo\"].\nThis issue was likely caused by a bug \
in DataFusion's code. Please help us to resolve this by filing a bug report \
in our issue tracker: https://github.com/apache/datafusion/issues".starts_with(&error))
}
#[test]
fn out_of_bounds_nullable() {
let schema = Schema::new(vec![Field::new("foo", DataType::Utf8, true)]);
let col = Column::new("id", 9);
let error = col.nullable(&schema).expect_err("error").strip_backtrace();
assert!("Internal error: PhysicalExpr Column references column 'id' at index 9 (zero-based) \
but input schema only has 1 columns: [\"foo\"].\nThis issue was likely caused by a bug \
in DataFusion's code. Please help us to resolve this by filing a bug report \
in our issue tracker: https://github.com/apache/datafusion/issues".starts_with(&error));
}
#[test]
fn out_of_bounds_evaluate() {
let schema = Schema::new(vec![Field::new("foo", DataType::Utf8, true)]);
let data: StringArray = vec!["data"].into();
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)]).unwrap();
let col = Column::new("id", 9);
let error = col.evaluate(&batch).expect_err("error").strip_backtrace();
assert!("Internal error: PhysicalExpr Column references column 'id' at index 9 (zero-based) \
but input schema only has 1 columns: [\"foo\"].\nThis issue was likely caused by a bug \
in DataFusion's code. Please help us to resolve this by filing a bug report \
in our issue tracker: https://github.com/apache/datafusion/issues".starts_with(&error));
}
}