datafusion_physical_expr/expressions/
column.rs1use std::any::Any;
21use std::hash::Hash;
22use std::sync::Arc;
23
24use crate::physical_expr::PhysicalExpr;
25use arrow::{
26 datatypes::{DataType, Schema, SchemaRef},
27 record_batch::RecordBatch,
28};
29use datafusion_common::tree_node::{Transformed, TreeNode};
30use datafusion_common::{internal_err, plan_err, Result};
31use datafusion_expr::ColumnarValue;
32
33#[derive(Debug, Hash, PartialEq, Eq, Clone)]
66pub struct Column {
67 name: String,
69 index: usize,
71}
72
73impl Column {
74 pub fn new(name: &str, index: usize) -> Self {
77 Self {
78 name: name.to_owned(),
79 index,
80 }
81 }
82
83 pub fn new_with_schema(name: &str, schema: &Schema) -> Result<Self> {
86 Ok(Column::new(name, schema.index_of(name)?))
87 }
88
89 pub fn name(&self) -> &str {
91 &self.name
92 }
93
94 pub fn index(&self) -> usize {
96 self.index
97 }
98}
99
100impl std::fmt::Display for Column {
101 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
102 write!(f, "{}@{}", self.name, self.index)
103 }
104}
105
106impl PhysicalExpr for Column {
107 fn as_any(&self) -> &dyn Any {
109 self
110 }
111
112 fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
114 self.bounds_check(input_schema)?;
115 Ok(input_schema.field(self.index).data_type().clone())
116 }
117
118 fn nullable(&self, input_schema: &Schema) -> Result<bool> {
120 self.bounds_check(input_schema)?;
121 Ok(input_schema.field(self.index).is_nullable())
122 }
123
124 fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
126 self.bounds_check(batch.schema().as_ref())?;
127 Ok(ColumnarValue::Array(Arc::clone(batch.column(self.index))))
128 }
129
130 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
131 vec![]
132 }
133
134 fn with_new_children(
135 self: Arc<Self>,
136 _children: Vec<Arc<dyn PhysicalExpr>>,
137 ) -> Result<Arc<dyn PhysicalExpr>> {
138 Ok(self)
139 }
140
141 fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142 write!(f, "{}", self.name)
143 }
144}
145
146impl Column {
147 fn bounds_check(&self, input_schema: &Schema) -> Result<()> {
148 if self.index < input_schema.fields.len() {
149 Ok(())
150 } else {
151 internal_err!(
152 "PhysicalExpr Column references column '{}' at index {} (zero-based) but input schema only has {} columns: {:?}",
153 self.name,
154 self.index,
155 input_schema.fields.len(),
156 input_schema.fields().iter().map(|f| f.name()).collect::<Vec<_>>()
157 )
158 }
159 }
160}
161
162pub fn col(name: &str, schema: &Schema) -> Result<Arc<dyn PhysicalExpr>> {
164 Ok(Arc::new(Column::new_with_schema(name, schema)?))
165}
166
167pub fn with_new_schema(
173 expr: Arc<dyn PhysicalExpr>,
174 schema: &SchemaRef,
175) -> Result<Arc<dyn PhysicalExpr>> {
176 Ok(expr
177 .transform_up(|expr| {
178 if let Some(col) = expr.as_any().downcast_ref::<Column>() {
179 let idx = col.index();
180 let Some(field) = schema.fields().get(idx) else {
181 return plan_err!(
182 "New schema has fewer columns than original schema"
183 );
184 };
185 let new_col = Column::new(field.name(), idx);
186 Ok(Transformed::yes(Arc::new(new_col) as _))
187 } else {
188 Ok(Transformed::no(expr))
189 }
190 })?
191 .data)
192}
193
194#[cfg(test)]
195mod test {
196 use super::Column;
197 use crate::physical_expr::PhysicalExpr;
198
199 use arrow::array::StringArray;
200 use arrow::datatypes::{DataType, Field, Schema};
201 use arrow::record_batch::RecordBatch;
202 use datafusion_common::Result;
203
204 use std::sync::Arc;
205
206 #[test]
207 fn out_of_bounds_data_type() {
208 let schema = Schema::new(vec![Field::new("foo", DataType::Utf8, true)]);
209 let col = Column::new("id", 9);
210 let error = col.data_type(&schema).expect_err("error").strip_backtrace();
211 assert!("Internal error: PhysicalExpr Column references column 'id' at index 9 (zero-based) \
212 but input schema only has 1 columns: [\"foo\"].\nThis was likely caused by a bug in \
213 DataFusion's code and we would welcome that you file an bug report in our issue tracker".starts_with(&error))
214 }
215
216 #[test]
217 fn out_of_bounds_nullable() {
218 let schema = Schema::new(vec![Field::new("foo", DataType::Utf8, true)]);
219 let col = Column::new("id", 9);
220 let error = col.nullable(&schema).expect_err("error").strip_backtrace();
221 assert!("Internal error: PhysicalExpr Column references column 'id' at index 9 (zero-based) \
222 but input schema only has 1 columns: [\"foo\"].\nThis was likely caused by a bug in \
223 DataFusion's code and we would welcome that you file an bug report in our issue tracker".starts_with(&error))
224 }
225
226 #[test]
227 fn out_of_bounds_evaluate() -> Result<()> {
228 let schema = Schema::new(vec![Field::new("foo", DataType::Utf8, true)]);
229 let data: StringArray = vec!["data"].into();
230 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(data)])?;
231 let col = Column::new("id", 9);
232 let error = col.evaluate(&batch).expect_err("error").strip_backtrace();
233 assert!("Internal error: PhysicalExpr Column references column 'id' at index 9 (zero-based) \
234 but input schema only has 1 columns: [\"foo\"].\nThis was likely caused by a bug in \
235 DataFusion's code and we would welcome that you file an bug report in our issue tracker".starts_with(&error));
236 Ok(())
237 }
238}