datafusion_physical_expr/expressions/
lambda_variable.rs1use std::hash::Hash;
21use std::sync::Arc;
22
23use crate::physical_expr::PhysicalExpr;
24use arrow::datatypes::FieldRef;
25use arrow::{
26 datatypes::{DataType, Schema},
27 record_batch::RecordBatch,
28};
29
30use datafusion_common::{Result, exec_err, internal_err};
31use datafusion_expr::ColumnarValue;
32
33#[derive(Debug, Clone)]
35pub struct LambdaVariable {
36 index: usize,
37 field: FieldRef,
38}
39
40impl Eq for LambdaVariable {}
41
42impl PartialEq for LambdaVariable {
43 fn eq(&self, other: &Self) -> bool {
44 self.index == other.index && self.field == other.field
45 }
46}
47
48impl Hash for LambdaVariable {
49 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
50 self.index.hash(state);
51 self.field.hash(state);
52 }
53}
54
55impl LambdaVariable {
56 pub fn new(index: usize, field: FieldRef) -> Self {
58 Self { index, field }
59 }
60
61 pub fn name(&self) -> &str {
63 self.field.name()
64 }
65
66 pub fn index(&self) -> usize {
68 self.index
69 }
70
71 pub fn field(&self) -> &FieldRef {
73 &self.field
74 }
75}
76
77impl std::fmt::Display for LambdaVariable {
78 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
79 write!(f, "{}@{}", self.name(), self.index)
80 }
81}
82
83impl PhysicalExpr for LambdaVariable {
84 fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
85 Ok(self.field.data_type().clone())
86 }
87
88 fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
89 Ok(self.field.is_nullable())
90 }
91
92 fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
93 if self.index >= batch.num_columns() {
94 return internal_err!(
95 "PhysicalExpr LambdaVariable references column '{}' at index {} (zero-based) but batch only has {} columns: {:?}",
96 self.name(),
97 self.index,
98 batch.num_columns(),
99 batch
100 .schema_ref()
101 .fields()
102 .iter()
103 .map(|f| f.name())
104 .collect::<Vec<_>>()
105 );
106 }
107
108 if self.field.as_ref() != batch.schema_ref().field(self.index) {
109 return exec_err!(
110 "Field of physical LambdaVariable with index {} doesn't match batch field during evaluation {} != {}",
111 self.index,
112 self.field,
113 batch.schema_ref().field(self.index)
114 );
115 }
116
117 Ok(ColumnarValue::Array(Arc::clone(batch.column(self.index))))
118 }
119
120 fn return_field(&self, _input_schema: &Schema) -> Result<FieldRef> {
121 Ok(Arc::clone(&self.field))
122 }
123
124 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
125 vec![]
126 }
127
128 fn with_new_children(
129 self: Arc<Self>,
130 _children: Vec<Arc<dyn PhysicalExpr>>,
131 ) -> Result<Arc<dyn PhysicalExpr>> {
132 Ok(self)
133 }
134
135 fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
136 write!(f, "{}@{}", self.name(), self.index)
137 }
138}
139
140pub fn lambda_variable(name: &str, schema: &Schema) -> Result<Arc<dyn PhysicalExpr>> {
142 let index = schema.index_of(name)?;
143 let field = Arc::clone(&schema.fields()[index]);
144
145 Ok(Arc::new(LambdaVariable::new(index, field)))
146}