Skip to main content

datafusion_physical_expr/expressions/
column.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Physical column reference: [`Column`]
19
20use std::any::Any;
21use std::hash::Hash;
22use std::sync::Arc;
23
24use crate::physical_expr::PhysicalExpr;
25use arrow::datatypes::FieldRef;
26use arrow::{
27    datatypes::{DataType, Schema, SchemaRef},
28    record_batch::RecordBatch,
29};
30use datafusion_common::tree_node::{Transformed, TreeNode};
31use datafusion_common::{Result, internal_err, plan_err};
32use datafusion_expr::ColumnarValue;
33use datafusion_expr_common::placement::ExpressionPlacement;
34
35/// Represents the column at a given index in a RecordBatch
36///
37/// This is a physical expression that represents a column at a given index in an
38/// arrow [`Schema`] / [`RecordBatch`].
39///
40/// Unlike the [logical `Expr::Column`], this expression is always resolved by schema index,
41/// even though it does have a name. This is because the physical plan is always
42/// resolved to a specific schema and there is no concept of "relation"
43///
44/// # Example:
45///  If the schema is `a`, `b`, `c` the `Column` for `b` would be represented by
46///  index 1, since `b` is the second column in the schema.
47///
48/// ```
49/// # use datafusion_physical_expr::expressions::Column;
50/// # use arrow::datatypes::{DataType, Field, Schema};
51/// // Schema with columns a, b, c
52/// let schema = Schema::new(vec![
53///     Field::new("a", DataType::Int32, false),
54///     Field::new("b", DataType::Int32, false),
55///     Field::new("c", DataType::Int32, false),
56/// ]);
57///
58/// // reference to column b is index 1
59/// let column_b = Column::new_with_schema("b", &schema).unwrap();
60/// assert_eq!(column_b.index(), 1);
61///
62/// // reference to column c is index 2
63/// let column_c = Column::new_with_schema("c", &schema).unwrap();
64/// assert_eq!(column_c.index(), 2);
65/// ```
66/// [logical `Expr::Column`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.Expr.html#variant.Column
67#[derive(Debug, Hash, PartialEq, Eq, Clone)]
68pub struct Column {
69    /// The name of the column (used for debugging and display purposes)
70    name: String,
71    /// The index of the column in its schema
72    index: usize,
73}
74
75impl Column {
76    /// Create a new column expression which references the
77    /// column with the given index in the schema.
78    pub fn new(name: &str, index: usize) -> Self {
79        Self {
80            name: name.to_owned(),
81            index,
82        }
83    }
84
85    /// Create a new column expression which references the
86    /// column with the given name in the schema
87    pub fn new_with_schema(name: &str, schema: &Schema) -> Result<Self> {
88        Ok(Column::new(name, schema.index_of(name)?))
89    }
90
91    /// Get the column's name
92    pub fn name(&self) -> &str {
93        &self.name
94    }
95
96    /// Get the column's schema index
97    pub fn index(&self) -> usize {
98        self.index
99    }
100}
101
102impl std::fmt::Display for Column {
103    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
104        write!(f, "{}@{}", self.name, self.index)
105    }
106}
107
108impl PhysicalExpr for Column {
109    /// Return a reference to Any that can be used for downcasting
110    fn as_any(&self) -> &dyn Any {
111        self
112    }
113
114    /// Get the data type of this expression, given the schema of the input
115    fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
116        self.bounds_check(input_schema)?;
117        Ok(input_schema.field(self.index).data_type().clone())
118    }
119
120    /// Decide whether this expression is nullable, given the schema of the input
121    fn nullable(&self, input_schema: &Schema) -> Result<bool> {
122        self.bounds_check(input_schema)?;
123        Ok(input_schema.field(self.index).is_nullable())
124    }
125
126    /// Evaluate the expression
127    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
128        self.bounds_check(batch.schema().as_ref())?;
129        Ok(ColumnarValue::Array(Arc::clone(batch.column(self.index))))
130    }
131
132    fn return_field(&self, input_schema: &Schema) -> Result<FieldRef> {
133        Ok(input_schema.field(self.index).clone().into())
134    }
135
136    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
137        vec![]
138    }
139
140    fn with_new_children(
141        self: Arc<Self>,
142        _children: Vec<Arc<dyn PhysicalExpr>>,
143    ) -> Result<Arc<dyn PhysicalExpr>> {
144        Ok(self)
145    }
146
147    fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148        write!(f, "{}", self.name)
149    }
150
151    fn placement(&self) -> ExpressionPlacement {
152        ExpressionPlacement::Column
153    }
154}
155
156impl Column {
157    fn bounds_check(&self, input_schema: &Schema) -> Result<()> {
158        if self.index < input_schema.fields.len() {
159            Ok(())
160        } else {
161            internal_err!(
162                "PhysicalExpr Column references column '{}' at index {} (zero-based) but input schema only has {} columns: {:?}",
163                self.name,
164                self.index,
165                input_schema.fields.len(),
166                input_schema
167                    .fields()
168                    .iter()
169                    .map(|f| f.name())
170                    .collect::<Vec<_>>()
171            )
172        }
173    }
174}
175
176/// Create a column expression
177pub fn col(name: &str, schema: &Schema) -> Result<Arc<dyn PhysicalExpr>> {
178    Ok(Arc::new(Column::new_with_schema(name, schema)?))
179}
180
181/// Rewrites an expression according to new schema; i.e. changes the columns it
182/// refers to with the column at corresponding index in the new schema. Returns
183/// an error if the given schema has fewer columns than the original schema.
184/// Note that the resulting expression may not be valid if data types in the
185/// new schema is incompatible with expression nodes.
186pub fn with_new_schema(
187    expr: Arc<dyn PhysicalExpr>,
188    schema: &SchemaRef,
189) -> Result<Arc<dyn PhysicalExpr>> {
190    Ok(expr
191        .transform_up(|expr| {
192            if let Some(col) = expr.as_any().downcast_ref::<Column>() {
193                let idx = col.index();
194                let Some(field) = schema.fields().get(idx) else {
195                    return plan_err!(
196                        "New schema has fewer columns than original schema"
197                    );
198                };
199                let new_col = Column::new(field.name(), idx);
200                Ok(Transformed::yes(Arc::new(new_col) as _))
201            } else {
202                Ok(Transformed::no(expr))
203            }
204        })?
205        .data)
206}
207
208#[cfg(test)]
209mod test {
210    use super::Column;
211    use crate::physical_expr::PhysicalExpr;
212
213    use arrow::array::StringArray;
214    use arrow::datatypes::{DataType, Field, Schema};
215    use arrow::record_batch::RecordBatch;
216
217    use std::sync::Arc;
218
219    #[test]
220    fn out_of_bounds_data_type() {
221        let schema = Schema::new(vec![Field::new("foo", DataType::Utf8, true)]);
222        let col = Column::new("id", 9);
223        let error = col.data_type(&schema).expect_err("error").strip_backtrace();
224        assert!("Internal error: PhysicalExpr Column references column 'id' at index 9 (zero-based) \
225             but input schema only has 1 columns: [\"foo\"].\nThis issue was likely caused by a bug \
226             in DataFusion's code. Please help us to resolve this by filing a bug report \
227             in our issue tracker: https://github.com/apache/datafusion/issues".starts_with(&error))
228    }
229
230    #[test]
231    fn out_of_bounds_nullable() {
232        let schema = Schema::new(vec![Field::new("foo", DataType::Utf8, true)]);
233        let col = Column::new("id", 9);
234        let error = col.nullable(&schema).expect_err("error").strip_backtrace();
235        assert!("Internal error: PhysicalExpr Column references column 'id' at index 9 (zero-based) \
236             but input schema only has 1 columns: [\"foo\"].\nThis issue was likely caused by a bug \
237             in DataFusion's code. Please help us to resolve this by filing a bug report \
238             in our issue tracker: https://github.com/apache/datafusion/issues".starts_with(&error));
239    }
240
241    #[test]
242    fn out_of_bounds_evaluate() {
243        let schema = Schema::new(vec![Field::new("foo", DataType::Utf8, true)]);
244        let data: StringArray = vec!["data"].into();
245        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)]).unwrap();
246        let col = Column::new("id", 9);
247        let error = col.evaluate(&batch).expect_err("error").strip_backtrace();
248        assert!("Internal error: PhysicalExpr Column references column 'id' at index 9 (zero-based) \
249             but input schema only has 1 columns: [\"foo\"].\nThis issue was likely caused by a bug \
250             in DataFusion's code. Please help us to resolve this by filing a bug report \
251             in our issue tracker: https://github.com/apache/datafusion/issues".starts_with(&error));
252    }
253}