Skip to main content

datafusion_physical_expr/expressions/
lambda_variable.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Physical lambda variable reference: [`LambdaVariable`]
19
20use std::hash::Hash;
21use std::sync::Arc;
22
23use crate::physical_expr::PhysicalExpr;
24use arrow::datatypes::FieldRef;
25use arrow::{
26    datatypes::{DataType, Schema},
27    record_batch::RecordBatch,
28};
29
30use datafusion_common::{Result, exec_err, internal_err};
31use datafusion_expr::ColumnarValue;
32
33/// Represents the lambda variable with a given index and field
34#[derive(Debug, Clone)]
35pub struct LambdaVariable {
36    index: usize,
37    field: FieldRef,
38}
39
40impl Eq for LambdaVariable {}
41
42impl PartialEq for LambdaVariable {
43    fn eq(&self, other: &Self) -> bool {
44        self.index == other.index && self.field == other.field
45    }
46}
47
48impl Hash for LambdaVariable {
49    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
50        self.index.hash(state);
51        self.field.hash(state);
52    }
53}
54
55impl LambdaVariable {
56    /// Create a new lambda variable expression
57    pub fn new(index: usize, field: FieldRef) -> Self {
58        Self { index, field }
59    }
60
61    /// Get the variable's name
62    pub fn name(&self) -> &str {
63        self.field.name()
64    }
65
66    /// Get the variable's index
67    pub fn index(&self) -> usize {
68        self.index
69    }
70
71    /// Get the variable's field
72    pub fn field(&self) -> &FieldRef {
73        &self.field
74    }
75}
76
77impl std::fmt::Display for LambdaVariable {
78    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
79        write!(f, "{}@{}", self.name(), self.index)
80    }
81}
82
83impl PhysicalExpr for LambdaVariable {
84    fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
85        Ok(self.field.data_type().clone())
86    }
87
88    fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
89        Ok(self.field.is_nullable())
90    }
91
92    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
93        if self.index >= batch.num_columns() {
94            return internal_err!(
95                "PhysicalExpr LambdaVariable references column '{}' at index {} (zero-based) but batch only has {} columns: {:?}",
96                self.name(),
97                self.index,
98                batch.num_columns(),
99                batch
100                    .schema_ref()
101                    .fields()
102                    .iter()
103                    .map(|f| f.name())
104                    .collect::<Vec<_>>()
105            );
106        }
107
108        if self.field.as_ref() != batch.schema_ref().field(self.index) {
109            return exec_err!(
110                "Field of physical LambdaVariable with index {} doesn't match batch field during evaluation {} != {}",
111                self.index,
112                self.field,
113                batch.schema_ref().field(self.index)
114            );
115        }
116
117        Ok(ColumnarValue::Array(Arc::clone(batch.column(self.index))))
118    }
119
120    fn return_field(&self, _input_schema: &Schema) -> Result<FieldRef> {
121        Ok(Arc::clone(&self.field))
122    }
123
124    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
125        vec![]
126    }
127
128    fn with_new_children(
129        self: Arc<Self>,
130        _children: Vec<Arc<dyn PhysicalExpr>>,
131    ) -> Result<Arc<dyn PhysicalExpr>> {
132        Ok(self)
133    }
134
135    fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
136        write!(f, "{}@{}", self.name(), self.index)
137    }
138}
139
140/// Create a lambda variable expression
141pub fn lambda_variable(name: &str, schema: &Schema) -> Result<Arc<dyn PhysicalExpr>> {
142    let index = schema.index_of(name)?;
143    let field = Arc::clone(&schema.fields()[index]);
144
145    Ok(Arc::new(LambdaVariable::new(index, field)))
146}