datafusion_python/expr/
join.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::{self, Display, Formatter};
19
20use datafusion::common::NullEquality;
21use datafusion::logical_expr::logical_plan::{Join, JoinConstraint, JoinType};
22use pyo3::prelude::*;
23use pyo3::IntoPyObjectExt;
24
25use crate::common::df_schema::PyDFSchema;
26use crate::expr::logical_node::LogicalNode;
27use crate::expr::PyExpr;
28use crate::sql::logical::PyLogicalPlan;
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31#[pyclass(frozen, name = "JoinType", module = "datafusion.expr")]
32pub struct PyJoinType {
33    join_type: JoinType,
34}
35
36impl From<JoinType> for PyJoinType {
37    fn from(join_type: JoinType) -> PyJoinType {
38        PyJoinType { join_type }
39    }
40}
41
42impl From<PyJoinType> for JoinType {
43    fn from(join_type: PyJoinType) -> Self {
44        join_type.join_type
45    }
46}
47
48#[pymethods]
49impl PyJoinType {
50    pub fn is_outer(&self) -> bool {
51        self.join_type.is_outer()
52    }
53
54    fn __repr__(&self) -> PyResult<String> {
55        Ok(format!("{}", self.join_type))
56    }
57}
58
59impl Display for PyJoinType {
60    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
61        write!(f, "{}", self.join_type)
62    }
63}
64
65#[derive(Debug, Clone, Copy)]
66#[pyclass(frozen, name = "JoinConstraint", module = "datafusion.expr")]
67pub struct PyJoinConstraint {
68    join_constraint: JoinConstraint,
69}
70
71impl From<JoinConstraint> for PyJoinConstraint {
72    fn from(join_constraint: JoinConstraint) -> PyJoinConstraint {
73        PyJoinConstraint { join_constraint }
74    }
75}
76
77impl From<PyJoinConstraint> for JoinConstraint {
78    fn from(join_constraint: PyJoinConstraint) -> Self {
79        join_constraint.join_constraint
80    }
81}
82
83#[pymethods]
84impl PyJoinConstraint {
85    fn __repr__(&self) -> PyResult<String> {
86        match self.join_constraint {
87            JoinConstraint::On => Ok("On".to_string()),
88            JoinConstraint::Using => Ok("Using".to_string()),
89        }
90    }
91}
92
93#[pyclass(frozen, name = "Join", module = "datafusion.expr", subclass)]
94#[derive(Clone)]
95pub struct PyJoin {
96    join: Join,
97}
98
99impl From<Join> for PyJoin {
100    fn from(join: Join) -> PyJoin {
101        PyJoin { join }
102    }
103}
104
105impl From<PyJoin> for Join {
106    fn from(join: PyJoin) -> Self {
107        join.join
108    }
109}
110
111impl Display for PyJoin {
112    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
113        write!(
114            f,
115            "Join
116            Left: {:?}
117            Right: {:?}
118            On: {:?}
119            Filter: {:?}
120            JoinType: {:?}
121            JoinConstraint: {:?}
122            Schema: {:?}
123            NullEquality: {:?}",
124            &self.join.left,
125            &self.join.right,
126            &self.join.on,
127            &self.join.filter,
128            &self.join.join_type,
129            &self.join.join_constraint,
130            &self.join.schema,
131            &self.join.null_equality,
132        )
133    }
134}
135
136#[pymethods]
137impl PyJoin {
138    /// Retrieves the left input `LogicalPlan` to this `Join` node
139    fn left(&self) -> PyResult<PyLogicalPlan> {
140        Ok(self.join.left.as_ref().clone().into())
141    }
142
143    /// Retrieves the right input `LogicalPlan` to this `Join` node
144    fn right(&self) -> PyResult<PyLogicalPlan> {
145        Ok(self.join.right.as_ref().clone().into())
146    }
147
148    /// Retrieves the right input `LogicalPlan` to this `Join` node
149    fn on(&self) -> PyResult<Vec<(PyExpr, PyExpr)>> {
150        Ok(self
151            .join
152            .on
153            .iter()
154            .map(|(l, r)| (PyExpr::from(l.clone()), PyExpr::from(r.clone())))
155            .collect())
156    }
157
158    /// Retrieves the filter `Option<PyExpr>` of this `Join` node
159    fn filter(&self) -> PyResult<Option<PyExpr>> {
160        Ok(self.join.filter.clone().map(Into::into))
161    }
162
163    /// Retrieves the `JoinType` to this `Join` node
164    fn join_type(&self) -> PyResult<PyJoinType> {
165        Ok(self.join.join_type.into())
166    }
167
168    /// Retrieves the `JoinConstraint` to this `Join` node
169    fn join_constraint(&self) -> PyResult<PyJoinConstraint> {
170        Ok(self.join.join_constraint.into())
171    }
172
173    /// Resulting Schema for this `Join` node instance
174    fn schema(&self) -> PyResult<PyDFSchema> {
175        Ok(self.join.schema.as_ref().clone().into())
176    }
177
178    /// If null_equals_null is true, null == null else null != null
179    fn null_equals_null(&self) -> PyResult<bool> {
180        match self.join.null_equality {
181            NullEquality::NullEqualsNothing => Ok(false),
182            NullEquality::NullEqualsNull => Ok(true),
183        }
184    }
185
186    fn __repr__(&self) -> PyResult<String> {
187        Ok(format!("Join({self})"))
188    }
189
190    fn __name__(&self) -> PyResult<String> {
191        Ok("Join".to_string())
192    }
193}
194
195impl LogicalNode for PyJoin {
196    fn inputs(&self) -> Vec<PyLogicalPlan> {
197        vec![
198            PyLogicalPlan::from((*self.join.left).clone()),
199            PyLogicalPlan::from((*self.join.right).clone()),
200        ]
201    }
202
203    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
204        self.clone().into_bound_py_any(py)
205    }
206}