datafusion_python/expr/
repartition.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::{self, Display, Formatter};
19
20use datafusion::logical_expr::logical_plan::Repartition;
21use datafusion::logical_expr::{Expr, Partitioning};
22use pyo3::prelude::*;
23use pyo3::IntoPyObjectExt;
24
25use super::logical_node::LogicalNode;
26use super::PyExpr;
27use crate::errors::py_type_err;
28use crate::sql::logical::PyLogicalPlan;
29
30#[pyclass(frozen, name = "Repartition", module = "datafusion.expr", subclass)]
31#[derive(Clone)]
32pub struct PyRepartition {
33    repartition: Repartition,
34}
35
36#[pyclass(frozen, name = "Partitioning", module = "datafusion.expr", subclass)]
37#[derive(Clone)]
38pub struct PyPartitioning {
39    partitioning: Partitioning,
40}
41
42impl From<PyPartitioning> for Partitioning {
43    fn from(partitioning: PyPartitioning) -> Self {
44        partitioning.partitioning
45    }
46}
47
48impl From<Partitioning> for PyPartitioning {
49    fn from(partitioning: Partitioning) -> Self {
50        PyPartitioning { partitioning }
51    }
52}
53
54impl From<PyRepartition> for Repartition {
55    fn from(repartition: PyRepartition) -> Self {
56        repartition.repartition
57    }
58}
59
60impl From<Repartition> for PyRepartition {
61    fn from(repartition: Repartition) -> PyRepartition {
62        PyRepartition { repartition }
63    }
64}
65
66impl Display for PyRepartition {
67    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
68        write!(
69            f,
70            "Repartition
71            input: {:?}
72            partitioning_scheme: {:?}",
73            &self.repartition.input, &self.repartition.partitioning_scheme,
74        )
75    }
76}
77
78#[pymethods]
79impl PyRepartition {
80    fn input(&self) -> PyResult<Vec<PyLogicalPlan>> {
81        Ok(Self::inputs(self))
82    }
83
84    fn partitioning_scheme(&self) -> PyResult<PyPartitioning> {
85        Ok(PyPartitioning {
86            partitioning: self.repartition.partitioning_scheme.clone(),
87        })
88    }
89
90    fn distribute_list(&self) -> PyResult<Vec<PyExpr>> {
91        match &self.repartition.partitioning_scheme {
92            Partitioning::DistributeBy(distribute_list) => Ok(distribute_list
93                .iter()
94                .map(|e| PyExpr::from(e.clone()))
95                .collect()),
96            _ => Err(py_type_err("unexpected repartition strategy")),
97        }
98    }
99
100    fn distribute_columns(&self) -> PyResult<String> {
101        match &self.repartition.partitioning_scheme {
102            Partitioning::DistributeBy(distribute_list) => Ok(distribute_list
103                .iter()
104                .map(|e| match &e {
105                    Expr::Column(column) => column.name.clone(),
106                    _ => panic!("Encountered a type other than Expr::Column"),
107                })
108                .collect()),
109            _ => Err(py_type_err("unexpected repartition strategy")),
110        }
111    }
112
113    fn __repr__(&self) -> PyResult<String> {
114        Ok(format!("Repartition({self})"))
115    }
116
117    fn __name__(&self) -> PyResult<String> {
118        Ok("Repartition".to_string())
119    }
120}
121
122impl LogicalNode for PyRepartition {
123    fn inputs(&self) -> Vec<PyLogicalPlan> {
124        vec![PyLogicalPlan::from((*self.repartition.input).clone())]
125    }
126
127    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
128        self.clone().into_bound_py_any(py)
129    }
130}