datafusion_python/expr/
repartition.rs1use std::fmt::{self, Display, Formatter};
19
20use datafusion::logical_expr::{logical_plan::Repartition, Expr, Partitioning};
21use pyo3::{prelude::*, IntoPyObjectExt};
22
23use crate::{errors::py_type_err, sql::logical::PyLogicalPlan};
24
25use super::{logical_node::LogicalNode, PyExpr};
26
27#[pyclass(name = "Repartition", module = "datafusion.expr", subclass)]
28#[derive(Clone)]
29pub struct PyRepartition {
30 repartition: Repartition,
31}
32
33#[pyclass(name = "Partitioning", module = "datafusion.expr", subclass)]
34#[derive(Clone)]
35pub struct PyPartitioning {
36 partitioning: Partitioning,
37}
38
39impl From<PyPartitioning> for Partitioning {
40 fn from(partitioning: PyPartitioning) -> Self {
41 partitioning.partitioning
42 }
43}
44
45impl From<Partitioning> for PyPartitioning {
46 fn from(partitioning: Partitioning) -> Self {
47 PyPartitioning { partitioning }
48 }
49}
50
51impl From<PyRepartition> for Repartition {
52 fn from(repartition: PyRepartition) -> Self {
53 repartition.repartition
54 }
55}
56
57impl From<Repartition> for PyRepartition {
58 fn from(repartition: Repartition) -> PyRepartition {
59 PyRepartition { repartition }
60 }
61}
62
63impl Display for PyRepartition {
64 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
65 write!(
66 f,
67 "Repartition
68 input: {:?}
69 partitioning_scheme: {:?}",
70 &self.repartition.input, &self.repartition.partitioning_scheme,
71 )
72 }
73}
74
75#[pymethods]
76impl PyRepartition {
77 fn input(&self) -> PyResult<Vec<PyLogicalPlan>> {
78 Ok(Self::inputs(self))
79 }
80
81 fn partitioning_scheme(&self) -> PyResult<PyPartitioning> {
82 Ok(PyPartitioning {
83 partitioning: self.repartition.partitioning_scheme.clone(),
84 })
85 }
86
87 fn distribute_list(&self) -> PyResult<Vec<PyExpr>> {
88 match &self.repartition.partitioning_scheme {
89 Partitioning::DistributeBy(distribute_list) => Ok(distribute_list
90 .iter()
91 .map(|e| PyExpr::from(e.clone()))
92 .collect()),
93 _ => Err(py_type_err("unexpected repartition strategy")),
94 }
95 }
96
97 fn distribute_columns(&self) -> PyResult<String> {
98 match &self.repartition.partitioning_scheme {
99 Partitioning::DistributeBy(distribute_list) => Ok(distribute_list
100 .iter()
101 .map(|e| match &e {
102 Expr::Column(column) => column.name.clone(),
103 _ => panic!("Encountered a type other than Expr::Column"),
104 })
105 .collect()),
106 _ => Err(py_type_err("unexpected repartition strategy")),
107 }
108 }
109
110 fn __repr__(&self) -> PyResult<String> {
111 Ok(format!("Repartition({})", self))
112 }
113
114 fn __name__(&self) -> PyResult<String> {
115 Ok("Repartition".to_string())
116 }
117}
118
119impl LogicalNode for PyRepartition {
120 fn inputs(&self) -> Vec<PyLogicalPlan> {
121 vec![PyLogicalPlan::from((*self.repartition.input).clone())]
122 }
123
124 fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
125 self.clone().into_bound_py_any(py)
126 }
127}