datafusion_physical_expr/expressions/
column.rs1use std::any::Any;
21use std::hash::Hash;
22use std::sync::Arc;
23
24use crate::physical_expr::PhysicalExpr;
25use arrow::datatypes::FieldRef;
26use arrow::{
27 datatypes::{DataType, Schema, SchemaRef},
28 record_batch::RecordBatch,
29};
30use datafusion_common::tree_node::{Transformed, TreeNode};
31use datafusion_common::{internal_err, plan_err, Result};
32use datafusion_expr::ColumnarValue;
33
34#[derive(Debug, Hash, PartialEq, Eq, Clone)]
67pub struct Column {
68 name: String,
70 index: usize,
72}
73
74impl Column {
75 pub fn new(name: &str, index: usize) -> Self {
78 Self {
79 name: name.to_owned(),
80 index,
81 }
82 }
83
84 pub fn new_with_schema(name: &str, schema: &Schema) -> Result<Self> {
87 Ok(Column::new(name, schema.index_of(name)?))
88 }
89
90 pub fn name(&self) -> &str {
92 &self.name
93 }
94
95 pub fn index(&self) -> usize {
97 self.index
98 }
99}
100
101impl std::fmt::Display for Column {
102 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
103 write!(f, "{}@{}", self.name, self.index)
104 }
105}
106
107impl PhysicalExpr for Column {
108 fn as_any(&self) -> &dyn Any {
110 self
111 }
112
113 fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
115 self.bounds_check(input_schema)?;
116 Ok(input_schema.field(self.index).data_type().clone())
117 }
118
119 fn nullable(&self, input_schema: &Schema) -> Result<bool> {
121 self.bounds_check(input_schema)?;
122 Ok(input_schema.field(self.index).is_nullable())
123 }
124
125 fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
127 self.bounds_check(batch.schema().as_ref())?;
128 Ok(ColumnarValue::Array(Arc::clone(batch.column(self.index))))
129 }
130
131 fn return_field(&self, input_schema: &Schema) -> Result<FieldRef> {
132 Ok(input_schema.field(self.index).clone().into())
133 }
134
135 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
136 vec![]
137 }
138
139 fn with_new_children(
140 self: Arc<Self>,
141 _children: Vec<Arc<dyn PhysicalExpr>>,
142 ) -> Result<Arc<dyn PhysicalExpr>> {
143 Ok(self)
144 }
145
146 fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147 write!(f, "{}", self.name)
148 }
149}
150
151impl Column {
152 fn bounds_check(&self, input_schema: &Schema) -> Result<()> {
153 if self.index < input_schema.fields.len() {
154 Ok(())
155 } else {
156 internal_err!(
157 "PhysicalExpr Column references column '{}' at index {} (zero-based) but input schema only has {} columns: {:?}",
158 self.name,
159 self.index,
160 input_schema.fields.len(),
161 input_schema.fields().iter().map(|f| f.name()).collect::<Vec<_>>()
162 )
163 }
164 }
165}
166
167pub fn col(name: &str, schema: &Schema) -> Result<Arc<dyn PhysicalExpr>> {
169 Ok(Arc::new(Column::new_with_schema(name, schema)?))
170}
171
172pub fn with_new_schema(
178 expr: Arc<dyn PhysicalExpr>,
179 schema: &SchemaRef,
180) -> Result<Arc<dyn PhysicalExpr>> {
181 Ok(expr
182 .transform_up(|expr| {
183 if let Some(col) = expr.as_any().downcast_ref::<Column>() {
184 let idx = col.index();
185 let Some(field) = schema.fields().get(idx) else {
186 return plan_err!(
187 "New schema has fewer columns than original schema"
188 );
189 };
190 let new_col = Column::new(field.name(), idx);
191 Ok(Transformed::yes(Arc::new(new_col) as _))
192 } else {
193 Ok(Transformed::no(expr))
194 }
195 })?
196 .data)
197}
198
199#[cfg(test)]
200mod test {
201 use super::Column;
202 use crate::physical_expr::PhysicalExpr;
203
204 use arrow::array::StringArray;
205 use arrow::datatypes::{DataType, Field, Schema};
206 use arrow::record_batch::RecordBatch;
207
208 use std::sync::Arc;
209
210 #[test]
211 fn out_of_bounds_data_type() {
212 let schema = Schema::new(vec![Field::new("foo", DataType::Utf8, true)]);
213 let col = Column::new("id", 9);
214 let error = col.data_type(&schema).expect_err("error").strip_backtrace();
215 assert!("Internal error: PhysicalExpr Column references column 'id' at index 9 (zero-based) \
216 but input schema only has 1 columns: [\"foo\"].\nThis issue was likely caused by a bug \
217 in DataFusion's code. Please help us to resolve this by filing a bug report \
218 in our issue tracker: https://github.com/apache/datafusion/issues".starts_with(&error))
219 }
220
221 #[test]
222 fn out_of_bounds_nullable() {
223 let schema = Schema::new(vec![Field::new("foo", DataType::Utf8, true)]);
224 let col = Column::new("id", 9);
225 let error = col.nullable(&schema).expect_err("error").strip_backtrace();
226 assert!("Internal error: PhysicalExpr Column references column 'id' at index 9 (zero-based) \
227 but input schema only has 1 columns: [\"foo\"].\nThis issue was likely caused by a bug \
228 in DataFusion's code. Please help us to resolve this by filing a bug report \
229 in our issue tracker: https://github.com/apache/datafusion/issues".starts_with(&error));
230 }
231
232 #[test]
233 fn out_of_bounds_evaluate() {
234 let schema = Schema::new(vec![Field::new("foo", DataType::Utf8, true)]);
235 let data: StringArray = vec!["data"].into();
236 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)]).unwrap();
237 let col = Column::new("id", 9);
238 let error = col.evaluate(&batch).expect_err("error").strip_backtrace();
239 assert!("Internal error: PhysicalExpr Column references column 'id' at index 9 (zero-based) \
240 but input schema only has 1 columns: [\"foo\"].\nThis issue was likely caused by a bug \
241 in DataFusion's code. Please help us to resolve this by filing a bug report \
242 in our issue tracker: https://github.com/apache/datafusion/issues".starts_with(&error));
243 }
244}