datafusion_python/expr/
join.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use datafusion::logical_expr::logical_plan::{Join, JoinConstraint, JoinType};
19use pyo3::{prelude::*, IntoPyObjectExt};
20use std::fmt::{self, Display, Formatter};
21
22use crate::common::df_schema::PyDFSchema;
23use crate::expr::{logical_node::LogicalNode, PyExpr};
24use crate::sql::logical::PyLogicalPlan;
25
26#[derive(Debug, Clone, PartialEq, Eq, Hash)]
27#[pyclass(name = "JoinType", module = "datafusion.expr")]
28pub struct PyJoinType {
29    join_type: JoinType,
30}
31
32impl From<JoinType> for PyJoinType {
33    fn from(join_type: JoinType) -> PyJoinType {
34        PyJoinType { join_type }
35    }
36}
37
38impl From<PyJoinType> for JoinType {
39    fn from(join_type: PyJoinType) -> Self {
40        join_type.join_type
41    }
42}
43
44#[pymethods]
45impl PyJoinType {
46    pub fn is_outer(&self) -> bool {
47        self.join_type.is_outer()
48    }
49
50    fn __repr__(&self) -> PyResult<String> {
51        Ok(format!("{}", self.join_type))
52    }
53}
54
55impl Display for PyJoinType {
56    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
57        write!(f, "{}", self.join_type)
58    }
59}
60
61#[derive(Debug, Clone, Copy)]
62#[pyclass(name = "JoinConstraint", module = "datafusion.expr")]
63pub struct PyJoinConstraint {
64    join_constraint: JoinConstraint,
65}
66
67impl From<JoinConstraint> for PyJoinConstraint {
68    fn from(join_constraint: JoinConstraint) -> PyJoinConstraint {
69        PyJoinConstraint { join_constraint }
70    }
71}
72
73impl From<PyJoinConstraint> for JoinConstraint {
74    fn from(join_constraint: PyJoinConstraint) -> Self {
75        join_constraint.join_constraint
76    }
77}
78
79#[pymethods]
80impl PyJoinConstraint {
81    fn __repr__(&self) -> PyResult<String> {
82        match self.join_constraint {
83            JoinConstraint::On => Ok("On".to_string()),
84            JoinConstraint::Using => Ok("Using".to_string()),
85        }
86    }
87}
88
89#[pyclass(name = "Join", module = "datafusion.expr", subclass)]
90#[derive(Clone)]
91pub struct PyJoin {
92    join: Join,
93}
94
95impl From<Join> for PyJoin {
96    fn from(join: Join) -> PyJoin {
97        PyJoin { join }
98    }
99}
100
101impl From<PyJoin> for Join {
102    fn from(join: PyJoin) -> Self {
103        join.join
104    }
105}
106
107impl Display for PyJoin {
108    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
109        write!(
110            f,
111            "Join
112            Left: {:?}
113            Right: {:?}
114            On: {:?}
115            Filter: {:?}
116            JoinType: {:?}
117            JoinConstraint: {:?}
118            Schema: {:?}
119            NullEqualsNull: {:?}",
120            &self.join.left,
121            &self.join.right,
122            &self.join.on,
123            &self.join.filter,
124            &self.join.join_type,
125            &self.join.join_constraint,
126            &self.join.schema,
127            &self.join.null_equals_null,
128        )
129    }
130}
131
132#[pymethods]
133impl PyJoin {
134    /// Retrieves the left input `LogicalPlan` to this `Join` node
135    fn left(&self) -> PyResult<PyLogicalPlan> {
136        Ok(self.join.left.as_ref().clone().into())
137    }
138
139    /// Retrieves the right input `LogicalPlan` to this `Join` node
140    fn right(&self) -> PyResult<PyLogicalPlan> {
141        Ok(self.join.right.as_ref().clone().into())
142    }
143
144    /// Retrieves the right input `LogicalPlan` to this `Join` node
145    fn on(&self) -> PyResult<Vec<(PyExpr, PyExpr)>> {
146        Ok(self
147            .join
148            .on
149            .iter()
150            .map(|(l, r)| (PyExpr::from(l.clone()), PyExpr::from(r.clone())))
151            .collect())
152    }
153
154    /// Retrieves the filter `Option<PyExpr>` of this `Join` node
155    fn filter(&self) -> PyResult<Option<PyExpr>> {
156        Ok(self.join.filter.clone().map(Into::into))
157    }
158
159    /// Retrieves the `JoinType` to this `Join` node
160    fn join_type(&self) -> PyResult<PyJoinType> {
161        Ok(self.join.join_type.into())
162    }
163
164    /// Retrieves the `JoinConstraint` to this `Join` node
165    fn join_constraint(&self) -> PyResult<PyJoinConstraint> {
166        Ok(self.join.join_constraint.into())
167    }
168
169    /// Resulting Schema for this `Join` node instance
170    fn schema(&self) -> PyResult<PyDFSchema> {
171        Ok(self.join.schema.as_ref().clone().into())
172    }
173
174    /// If null_equals_null is true, null == null else null != null
175    fn null_equals_null(&self) -> PyResult<bool> {
176        Ok(self.join.null_equals_null)
177    }
178
179    fn __repr__(&self) -> PyResult<String> {
180        Ok(format!("Join({})", self))
181    }
182
183    fn __name__(&self) -> PyResult<String> {
184        Ok("Join".to_string())
185    }
186}
187
188impl LogicalNode for PyJoin {
189    fn inputs(&self) -> Vec<PyLogicalPlan> {
190        vec![
191            PyLogicalPlan::from((*self.join.left).clone()),
192            PyLogicalPlan::from((*self.join.right).clone()),
193        ]
194    }
195
196    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
197        self.clone().into_bound_py_any(py)
198    }
199}