Skip to main content

datafusion_python/expr/
repartition.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::fmt::{self, Display, Formatter};
19
20use datafusion::logical_expr::logical_plan::Repartition;
21use datafusion::logical_expr::{Expr, Partitioning};
22use pyo3::IntoPyObjectExt;
23use pyo3::prelude::*;
24
25use super::PyExpr;
26use super::logical_node::LogicalNode;
27use crate::errors::py_type_err;
28use crate::sql::logical::PyLogicalPlan;
29
30#[pyclass(
31    from_py_object,
32    frozen,
33    name = "Repartition",
34    module = "datafusion.expr",
35    subclass
36)]
37#[derive(Clone)]
38pub struct PyRepartition {
39    repartition: Repartition,
40}
41
42#[pyclass(
43    from_py_object,
44    frozen,
45    name = "Partitioning",
46    module = "datafusion.expr",
47    subclass
48)]
49#[derive(Clone)]
50pub struct PyPartitioning {
51    partitioning: Partitioning,
52}
53
54impl From<PyPartitioning> for Partitioning {
55    fn from(partitioning: PyPartitioning) -> Self {
56        partitioning.partitioning
57    }
58}
59
60impl From<Partitioning> for PyPartitioning {
61    fn from(partitioning: Partitioning) -> Self {
62        PyPartitioning { partitioning }
63    }
64}
65
66impl From<PyRepartition> for Repartition {
67    fn from(repartition: PyRepartition) -> Self {
68        repartition.repartition
69    }
70}
71
72impl From<Repartition> for PyRepartition {
73    fn from(repartition: Repartition) -> PyRepartition {
74        PyRepartition { repartition }
75    }
76}
77
78impl Display for PyRepartition {
79    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
80        write!(
81            f,
82            "Repartition
83            input: {:?}
84            partitioning_scheme: {:?}",
85            &self.repartition.input, &self.repartition.partitioning_scheme,
86        )
87    }
88}
89
90#[pymethods]
91impl PyRepartition {
92    fn input(&self) -> PyResult<Vec<PyLogicalPlan>> {
93        Ok(Self::inputs(self))
94    }
95
96    fn partitioning_scheme(&self) -> PyResult<PyPartitioning> {
97        Ok(PyPartitioning {
98            partitioning: self.repartition.partitioning_scheme.clone(),
99        })
100    }
101
102    fn distribute_list(&self) -> PyResult<Vec<PyExpr>> {
103        match &self.repartition.partitioning_scheme {
104            Partitioning::DistributeBy(distribute_list) => Ok(distribute_list
105                .iter()
106                .map(|e| PyExpr::from(e.clone()))
107                .collect()),
108            _ => Err(py_type_err("unexpected repartition strategy")),
109        }
110    }
111
112    fn distribute_columns(&self) -> PyResult<String> {
113        match &self.repartition.partitioning_scheme {
114            Partitioning::DistributeBy(distribute_list) => Ok(distribute_list
115                .iter()
116                .map(|e| match &e {
117                    Expr::Column(column) => column.name.clone(),
118                    _ => panic!("Encountered a type other than Expr::Column"),
119                })
120                .collect()),
121            _ => Err(py_type_err("unexpected repartition strategy")),
122        }
123    }
124
125    fn __repr__(&self) -> PyResult<String> {
126        Ok(format!("Repartition({self})"))
127    }
128
129    fn __name__(&self) -> PyResult<String> {
130        Ok("Repartition".to_string())
131    }
132}
133
134impl LogicalNode for PyRepartition {
135    fn inputs(&self) -> Vec<PyLogicalPlan> {
136        vec![PyLogicalPlan::from((*self.repartition.input).clone())]
137    }
138
139    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
140        self.clone().into_bound_py_any(py)
141    }
142}