datafusion_python/expr/
repartition.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::{self, Display, Formatter};
19
20use datafusion::logical_expr::{logical_plan::Repartition, Expr, Partitioning};
21use pyo3::{prelude::*, IntoPyObjectExt};
22
23use crate::{errors::py_type_err, sql::logical::PyLogicalPlan};
24
25use super::{logical_node::LogicalNode, PyExpr};
26
27#[pyclass(name = "Repartition", module = "datafusion.expr", subclass)]
28#[derive(Clone)]
29pub struct PyRepartition {
30    repartition: Repartition,
31}
32
33#[pyclass(name = "Partitioning", module = "datafusion.expr", subclass)]
34#[derive(Clone)]
35pub struct PyPartitioning {
36    partitioning: Partitioning,
37}
38
39impl From<PyPartitioning> for Partitioning {
40    fn from(partitioning: PyPartitioning) -> Self {
41        partitioning.partitioning
42    }
43}
44
45impl From<Partitioning> for PyPartitioning {
46    fn from(partitioning: Partitioning) -> Self {
47        PyPartitioning { partitioning }
48    }
49}
50
51impl From<PyRepartition> for Repartition {
52    fn from(repartition: PyRepartition) -> Self {
53        repartition.repartition
54    }
55}
56
57impl From<Repartition> for PyRepartition {
58    fn from(repartition: Repartition) -> PyRepartition {
59        PyRepartition { repartition }
60    }
61}
62
63impl Display for PyRepartition {
64    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
65        write!(
66            f,
67            "Repartition
68            input: {:?}
69            partitioning_scheme: {:?}",
70            &self.repartition.input, &self.repartition.partitioning_scheme,
71        )
72    }
73}
74
75#[pymethods]
76impl PyRepartition {
77    fn input(&self) -> PyResult<Vec<PyLogicalPlan>> {
78        Ok(Self::inputs(self))
79    }
80
81    fn partitioning_scheme(&self) -> PyResult<PyPartitioning> {
82        Ok(PyPartitioning {
83            partitioning: self.repartition.partitioning_scheme.clone(),
84        })
85    }
86
87    fn distribute_list(&self) -> PyResult<Vec<PyExpr>> {
88        match &self.repartition.partitioning_scheme {
89            Partitioning::DistributeBy(distribute_list) => Ok(distribute_list
90                .iter()
91                .map(|e| PyExpr::from(e.clone()))
92                .collect()),
93            _ => Err(py_type_err("unexpected repartition strategy")),
94        }
95    }
96
97    fn distribute_columns(&self) -> PyResult<String> {
98        match &self.repartition.partitioning_scheme {
99            Partitioning::DistributeBy(distribute_list) => Ok(distribute_list
100                .iter()
101                .map(|e| match &e {
102                    Expr::Column(column) => column.name.clone(),
103                    _ => panic!("Encountered a type other than Expr::Column"),
104                })
105                .collect()),
106            _ => Err(py_type_err("unexpected repartition strategy")),
107        }
108    }
109
110    fn __repr__(&self) -> PyResult<String> {
111        Ok(format!("Repartition({})", self))
112    }
113
114    fn __name__(&self) -> PyResult<String> {
115        Ok("Repartition".to_string())
116    }
117}
118
119impl LogicalNode for PyRepartition {
120    fn inputs(&self) -> Vec<PyLogicalPlan> {
121        vec![PyLogicalPlan::from((*self.repartition.input).clone())]
122    }
123
124    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
125        self.clone().into_bound_py_any(py)
126    }
127}