datafusion_python/expr/
repartition.rs1use std::fmt::{self, Display, Formatter};
19
20use datafusion::logical_expr::logical_plan::Repartition;
21use datafusion::logical_expr::{Expr, Partitioning};
22use pyo3::prelude::*;
23use pyo3::IntoPyObjectExt;
24
25use super::logical_node::LogicalNode;
26use super::PyExpr;
27use crate::errors::py_type_err;
28use crate::sql::logical::PyLogicalPlan;
29
30#[pyclass(frozen, name = "Repartition", module = "datafusion.expr", subclass)]
31#[derive(Clone)]
32pub struct PyRepartition {
33 repartition: Repartition,
34}
35
36#[pyclass(frozen, name = "Partitioning", module = "datafusion.expr", subclass)]
37#[derive(Clone)]
38pub struct PyPartitioning {
39 partitioning: Partitioning,
40}
41
42impl From<PyPartitioning> for Partitioning {
43 fn from(partitioning: PyPartitioning) -> Self {
44 partitioning.partitioning
45 }
46}
47
48impl From<Partitioning> for PyPartitioning {
49 fn from(partitioning: Partitioning) -> Self {
50 PyPartitioning { partitioning }
51 }
52}
53
54impl From<PyRepartition> for Repartition {
55 fn from(repartition: PyRepartition) -> Self {
56 repartition.repartition
57 }
58}
59
60impl From<Repartition> for PyRepartition {
61 fn from(repartition: Repartition) -> PyRepartition {
62 PyRepartition { repartition }
63 }
64}
65
66impl Display for PyRepartition {
67 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
68 write!(
69 f,
70 "Repartition
71 input: {:?}
72 partitioning_scheme: {:?}",
73 &self.repartition.input, &self.repartition.partitioning_scheme,
74 )
75 }
76}
77
78#[pymethods]
79impl PyRepartition {
80 fn input(&self) -> PyResult<Vec<PyLogicalPlan>> {
81 Ok(Self::inputs(self))
82 }
83
84 fn partitioning_scheme(&self) -> PyResult<PyPartitioning> {
85 Ok(PyPartitioning {
86 partitioning: self.repartition.partitioning_scheme.clone(),
87 })
88 }
89
90 fn distribute_list(&self) -> PyResult<Vec<PyExpr>> {
91 match &self.repartition.partitioning_scheme {
92 Partitioning::DistributeBy(distribute_list) => Ok(distribute_list
93 .iter()
94 .map(|e| PyExpr::from(e.clone()))
95 .collect()),
96 _ => Err(py_type_err("unexpected repartition strategy")),
97 }
98 }
99
100 fn distribute_columns(&self) -> PyResult<String> {
101 match &self.repartition.partitioning_scheme {
102 Partitioning::DistributeBy(distribute_list) => Ok(distribute_list
103 .iter()
104 .map(|e| match &e {
105 Expr::Column(column) => column.name.clone(),
106 _ => panic!("Encountered a type other than Expr::Column"),
107 })
108 .collect()),
109 _ => Err(py_type_err("unexpected repartition strategy")),
110 }
111 }
112
113 fn __repr__(&self) -> PyResult<String> {
114 Ok(format!("Repartition({self})"))
115 }
116
117 fn __name__(&self) -> PyResult<String> {
118 Ok("Repartition".to_string())
119 }
120}
121
122impl LogicalNode for PyRepartition {
123 fn inputs(&self) -> Vec<PyLogicalPlan> {
124 vec![PyLogicalPlan::from((*self.repartition.input).clone())]
125 }
126
127 fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
128 self.clone().into_bound_py_any(py)
129 }
130}