datafusion_python/expr/
repartition.rs1use std::fmt::{self, Display, Formatter};
19
20use datafusion::logical_expr::logical_plan::Repartition;
21use datafusion::logical_expr::{Expr, Partitioning};
22use pyo3::IntoPyObjectExt;
23use pyo3::prelude::*;
24
25use super::PyExpr;
26use super::logical_node::LogicalNode;
27use crate::errors::py_type_err;
28use crate::sql::logical::PyLogicalPlan;
29
30#[pyclass(
31 from_py_object,
32 frozen,
33 name = "Repartition",
34 module = "datafusion.expr",
35 subclass
36)]
37#[derive(Clone)]
38pub struct PyRepartition {
39 repartition: Repartition,
40}
41
42#[pyclass(
43 from_py_object,
44 frozen,
45 name = "Partitioning",
46 module = "datafusion.expr",
47 subclass
48)]
49#[derive(Clone)]
50pub struct PyPartitioning {
51 partitioning: Partitioning,
52}
53
54impl From<PyPartitioning> for Partitioning {
55 fn from(partitioning: PyPartitioning) -> Self {
56 partitioning.partitioning
57 }
58}
59
60impl From<Partitioning> for PyPartitioning {
61 fn from(partitioning: Partitioning) -> Self {
62 PyPartitioning { partitioning }
63 }
64}
65
66impl From<PyRepartition> for Repartition {
67 fn from(repartition: PyRepartition) -> Self {
68 repartition.repartition
69 }
70}
71
72impl From<Repartition> for PyRepartition {
73 fn from(repartition: Repartition) -> PyRepartition {
74 PyRepartition { repartition }
75 }
76}
77
78impl Display for PyRepartition {
79 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
80 write!(
81 f,
82 "Repartition
83 input: {:?}
84 partitioning_scheme: {:?}",
85 &self.repartition.input, &self.repartition.partitioning_scheme,
86 )
87 }
88}
89
90#[pymethods]
91impl PyRepartition {
92 fn input(&self) -> PyResult<Vec<PyLogicalPlan>> {
93 Ok(Self::inputs(self))
94 }
95
96 fn partitioning_scheme(&self) -> PyResult<PyPartitioning> {
97 Ok(PyPartitioning {
98 partitioning: self.repartition.partitioning_scheme.clone(),
99 })
100 }
101
102 fn distribute_list(&self) -> PyResult<Vec<PyExpr>> {
103 match &self.repartition.partitioning_scheme {
104 Partitioning::DistributeBy(distribute_list) => Ok(distribute_list
105 .iter()
106 .map(|e| PyExpr::from(e.clone()))
107 .collect()),
108 _ => Err(py_type_err("unexpected repartition strategy")),
109 }
110 }
111
112 fn distribute_columns(&self) -> PyResult<String> {
113 match &self.repartition.partitioning_scheme {
114 Partitioning::DistributeBy(distribute_list) => Ok(distribute_list
115 .iter()
116 .map(|e| match &e {
117 Expr::Column(column) => column.name.clone(),
118 _ => panic!("Encountered a type other than Expr::Column"),
119 })
120 .collect()),
121 _ => Err(py_type_err("unexpected repartition strategy")),
122 }
123 }
124
125 fn __repr__(&self) -> PyResult<String> {
126 Ok(format!("Repartition({self})"))
127 }
128
129 fn __name__(&self) -> PyResult<String> {
130 Ok("Repartition".to_string())
131 }
132}
133
134impl LogicalNode for PyRepartition {
135 fn inputs(&self) -> Vec<PyLogicalPlan> {
136 vec![PyLogicalPlan::from((*self.repartition.input).clone())]
137 }
138
139 fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
140 self.clone().into_bound_py_any(py)
141 }
142}