datafusion_physical_expr/expressions/
column.rs1use std::any::Any;
21use std::hash::Hash;
22use std::sync::Arc;
23
24use crate::physical_expr::PhysicalExpr;
25use arrow::datatypes::FieldRef;
26use arrow::{
27 datatypes::{DataType, Schema, SchemaRef},
28 record_batch::RecordBatch,
29};
30use datafusion_common::tree_node::{Transformed, TreeNode};
31use datafusion_common::{Result, internal_err, plan_err};
32use datafusion_expr::ColumnarValue;
33use datafusion_expr_common::placement::ExpressionPlacement;
34
35#[derive(Debug, Hash, PartialEq, Eq, Clone)]
68pub struct Column {
69 name: String,
71 index: usize,
73}
74
75impl Column {
76 pub fn new(name: &str, index: usize) -> Self {
79 Self {
80 name: name.to_owned(),
81 index,
82 }
83 }
84
85 pub fn new_with_schema(name: &str, schema: &Schema) -> Result<Self> {
88 Ok(Column::new(name, schema.index_of(name)?))
89 }
90
91 pub fn name(&self) -> &str {
93 &self.name
94 }
95
96 pub fn index(&self) -> usize {
98 self.index
99 }
100}
101
102impl std::fmt::Display for Column {
103 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
104 write!(f, "{}@{}", self.name, self.index)
105 }
106}
107
108impl PhysicalExpr for Column {
109 fn as_any(&self) -> &dyn Any {
111 self
112 }
113
114 fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
116 self.bounds_check(input_schema)?;
117 Ok(input_schema.field(self.index).data_type().clone())
118 }
119
120 fn nullable(&self, input_schema: &Schema) -> Result<bool> {
122 self.bounds_check(input_schema)?;
123 Ok(input_schema.field(self.index).is_nullable())
124 }
125
126 fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
128 self.bounds_check(batch.schema().as_ref())?;
129 Ok(ColumnarValue::Array(Arc::clone(batch.column(self.index))))
130 }
131
132 fn return_field(&self, input_schema: &Schema) -> Result<FieldRef> {
133 Ok(input_schema.field(self.index).clone().into())
134 }
135
136 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
137 vec![]
138 }
139
140 fn with_new_children(
141 self: Arc<Self>,
142 _children: Vec<Arc<dyn PhysicalExpr>>,
143 ) -> Result<Arc<dyn PhysicalExpr>> {
144 Ok(self)
145 }
146
147 fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
148 write!(f, "{}", self.name)
149 }
150
151 fn placement(&self) -> ExpressionPlacement {
152 ExpressionPlacement::Column
153 }
154}
155
156impl Column {
157 fn bounds_check(&self, input_schema: &Schema) -> Result<()> {
158 if self.index < input_schema.fields.len() {
159 Ok(())
160 } else {
161 internal_err!(
162 "PhysicalExpr Column references column '{}' at index {} (zero-based) but input schema only has {} columns: {:?}",
163 self.name,
164 self.index,
165 input_schema.fields.len(),
166 input_schema
167 .fields()
168 .iter()
169 .map(|f| f.name())
170 .collect::<Vec<_>>()
171 )
172 }
173 }
174}
175
176pub fn col(name: &str, schema: &Schema) -> Result<Arc<dyn PhysicalExpr>> {
178 Ok(Arc::new(Column::new_with_schema(name, schema)?))
179}
180
181pub fn with_new_schema(
187 expr: Arc<dyn PhysicalExpr>,
188 schema: &SchemaRef,
189) -> Result<Arc<dyn PhysicalExpr>> {
190 Ok(expr
191 .transform_up(|expr| {
192 if let Some(col) = expr.as_any().downcast_ref::<Column>() {
193 let idx = col.index();
194 let Some(field) = schema.fields().get(idx) else {
195 return plan_err!(
196 "New schema has fewer columns than original schema"
197 );
198 };
199 let new_col = Column::new(field.name(), idx);
200 Ok(Transformed::yes(Arc::new(new_col) as _))
201 } else {
202 Ok(Transformed::no(expr))
203 }
204 })?
205 .data)
206}
207
208#[cfg(test)]
209mod test {
210 use super::Column;
211 use crate::physical_expr::PhysicalExpr;
212
213 use arrow::array::StringArray;
214 use arrow::datatypes::{DataType, Field, Schema};
215 use arrow::record_batch::RecordBatch;
216
217 use std::sync::Arc;
218
219 #[test]
220 fn out_of_bounds_data_type() {
221 let schema = Schema::new(vec![Field::new("foo", DataType::Utf8, true)]);
222 let col = Column::new("id", 9);
223 let error = col.data_type(&schema).expect_err("error").strip_backtrace();
224 assert!("Internal error: PhysicalExpr Column references column 'id' at index 9 (zero-based) \
225 but input schema only has 1 columns: [\"foo\"].\nThis issue was likely caused by a bug \
226 in DataFusion's code. Please help us to resolve this by filing a bug report \
227 in our issue tracker: https://github.com/apache/datafusion/issues".starts_with(&error))
228 }
229
230 #[test]
231 fn out_of_bounds_nullable() {
232 let schema = Schema::new(vec![Field::new("foo", DataType::Utf8, true)]);
233 let col = Column::new("id", 9);
234 let error = col.nullable(&schema).expect_err("error").strip_backtrace();
235 assert!("Internal error: PhysicalExpr Column references column 'id' at index 9 (zero-based) \
236 but input schema only has 1 columns: [\"foo\"].\nThis issue was likely caused by a bug \
237 in DataFusion's code. Please help us to resolve this by filing a bug report \
238 in our issue tracker: https://github.com/apache/datafusion/issues".starts_with(&error));
239 }
240
241 #[test]
242 fn out_of_bounds_evaluate() {
243 let schema = Schema::new(vec![Field::new("foo", DataType::Utf8, true)]);
244 let data: StringArray = vec!["data"].into();
245 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)]).unwrap();
246 let col = Column::new("id", 9);
247 let error = col.evaluate(&batch).expect_err("error").strip_backtrace();
248 assert!("Internal error: PhysicalExpr Column references column 'id' at index 9 (zero-based) \
249 but input schema only has 1 columns: [\"foo\"].\nThis issue was likely caused by a bug \
250 in DataFusion's code. Please help us to resolve this by filing a bug report \
251 in our issue tracker: https://github.com/apache/datafusion/issues".starts_with(&error));
252 }
253}