datafusion_python/expr/
join.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use datafusion::common::NullEquality;
19use datafusion::logical_expr::logical_plan::{Join, JoinConstraint, JoinType};
20use pyo3::{prelude::*, IntoPyObjectExt};
21use std::fmt::{self, Display, Formatter};
22
23use crate::common::df_schema::PyDFSchema;
24use crate::expr::{logical_node::LogicalNode, PyExpr};
25use crate::sql::logical::PyLogicalPlan;
26
27#[derive(Debug, Clone, PartialEq, Eq, Hash)]
28#[pyclass(name = "JoinType", module = "datafusion.expr")]
29pub struct PyJoinType {
30    join_type: JoinType,
31}
32
33impl From<JoinType> for PyJoinType {
34    fn from(join_type: JoinType) -> PyJoinType {
35        PyJoinType { join_type }
36    }
37}
38
39impl From<PyJoinType> for JoinType {
40    fn from(join_type: PyJoinType) -> Self {
41        join_type.join_type
42    }
43}
44
45#[pymethods]
46impl PyJoinType {
47    pub fn is_outer(&self) -> bool {
48        self.join_type.is_outer()
49    }
50
51    fn __repr__(&self) -> PyResult<String> {
52        Ok(format!("{}", self.join_type))
53    }
54}
55
56impl Display for PyJoinType {
57    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
58        write!(f, "{}", self.join_type)
59    }
60}
61
62#[derive(Debug, Clone, Copy)]
63#[pyclass(name = "JoinConstraint", module = "datafusion.expr")]
64pub struct PyJoinConstraint {
65    join_constraint: JoinConstraint,
66}
67
68impl From<JoinConstraint> for PyJoinConstraint {
69    fn from(join_constraint: JoinConstraint) -> PyJoinConstraint {
70        PyJoinConstraint { join_constraint }
71    }
72}
73
74impl From<PyJoinConstraint> for JoinConstraint {
75    fn from(join_constraint: PyJoinConstraint) -> Self {
76        join_constraint.join_constraint
77    }
78}
79
80#[pymethods]
81impl PyJoinConstraint {
82    fn __repr__(&self) -> PyResult<String> {
83        match self.join_constraint {
84            JoinConstraint::On => Ok("On".to_string()),
85            JoinConstraint::Using => Ok("Using".to_string()),
86        }
87    }
88}
89
90#[pyclass(name = "Join", module = "datafusion.expr", subclass)]
91#[derive(Clone)]
92pub struct PyJoin {
93    join: Join,
94}
95
96impl From<Join> for PyJoin {
97    fn from(join: Join) -> PyJoin {
98        PyJoin { join }
99    }
100}
101
102impl From<PyJoin> for Join {
103    fn from(join: PyJoin) -> Self {
104        join.join
105    }
106}
107
108impl Display for PyJoin {
109    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
110        write!(
111            f,
112            "Join
113            Left: {:?}
114            Right: {:?}
115            On: {:?}
116            Filter: {:?}
117            JoinType: {:?}
118            JoinConstraint: {:?}
119            Schema: {:?}
120            NullEquality: {:?}",
121            &self.join.left,
122            &self.join.right,
123            &self.join.on,
124            &self.join.filter,
125            &self.join.join_type,
126            &self.join.join_constraint,
127            &self.join.schema,
128            &self.join.null_equality,
129        )
130    }
131}
132
133#[pymethods]
134impl PyJoin {
135    /// Retrieves the left input `LogicalPlan` to this `Join` node
136    fn left(&self) -> PyResult<PyLogicalPlan> {
137        Ok(self.join.left.as_ref().clone().into())
138    }
139
140    /// Retrieves the right input `LogicalPlan` to this `Join` node
141    fn right(&self) -> PyResult<PyLogicalPlan> {
142        Ok(self.join.right.as_ref().clone().into())
143    }
144
145    /// Retrieves the right input `LogicalPlan` to this `Join` node
146    fn on(&self) -> PyResult<Vec<(PyExpr, PyExpr)>> {
147        Ok(self
148            .join
149            .on
150            .iter()
151            .map(|(l, r)| (PyExpr::from(l.clone()), PyExpr::from(r.clone())))
152            .collect())
153    }
154
155    /// Retrieves the filter `Option<PyExpr>` of this `Join` node
156    fn filter(&self) -> PyResult<Option<PyExpr>> {
157        Ok(self.join.filter.clone().map(Into::into))
158    }
159
160    /// Retrieves the `JoinType` to this `Join` node
161    fn join_type(&self) -> PyResult<PyJoinType> {
162        Ok(self.join.join_type.into())
163    }
164
165    /// Retrieves the `JoinConstraint` to this `Join` node
166    fn join_constraint(&self) -> PyResult<PyJoinConstraint> {
167        Ok(self.join.join_constraint.into())
168    }
169
170    /// Resulting Schema for this `Join` node instance
171    fn schema(&self) -> PyResult<PyDFSchema> {
172        Ok(self.join.schema.as_ref().clone().into())
173    }
174
175    /// If null_equals_null is true, null == null else null != null
176    fn null_equals_null(&self) -> PyResult<bool> {
177        match self.join.null_equality {
178            NullEquality::NullEqualsNothing => Ok(false),
179            NullEquality::NullEqualsNull => Ok(true),
180        }
181    }
182
183    fn __repr__(&self) -> PyResult<String> {
184        Ok(format!("Join({self})"))
185    }
186
187    fn __name__(&self) -> PyResult<String> {
188        Ok("Join".to_string())
189    }
190}
191
192impl LogicalNode for PyJoin {
193    fn inputs(&self) -> Vec<PyLogicalPlan> {
194        vec![
195            PyLogicalPlan::from((*self.join.left).clone()),
196            PyLogicalPlan::from((*self.join.right).clone()),
197        ]
198    }
199
200    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
201        self.clone().into_bound_py_any(py)
202    }
203}