datafusion_python/
physical_plan.rs1use std::sync::Arc;
19
20use datafusion::physical_plan::{displayable, ExecutionPlan, ExecutionPlanProperties};
21use datafusion_proto::physical_plan::{AsExecutionPlan, DefaultPhysicalExtensionCodec};
22use prost::Message;
23use pyo3::exceptions::PyRuntimeError;
24use pyo3::prelude::*;
25use pyo3::types::PyBytes;
26
27use crate::context::PySessionContext;
28use crate::errors::PyDataFusionResult;
29
30#[pyclass(frozen, name = "ExecutionPlan", module = "datafusion", subclass)]
31#[derive(Debug, Clone)]
32pub struct PyExecutionPlan {
33 pub plan: Arc<dyn ExecutionPlan>,
34}
35
36impl PyExecutionPlan {
37 pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
39 Self { plan }
40 }
41}
42
43#[pymethods]
44impl PyExecutionPlan {
45 pub fn children(&self) -> Vec<PyExecutionPlan> {
47 self.plan
48 .children()
49 .iter()
50 .map(|&p| p.to_owned().into())
51 .collect()
52 }
53
54 pub fn display(&self) -> String {
55 let d = displayable(self.plan.as_ref());
56 format!("{}", d.one_line())
57 }
58
59 pub fn display_indent(&self) -> String {
60 let d = displayable(self.plan.as_ref());
61 format!("{}", d.indent(false))
62 }
63
64 pub fn to_proto<'py>(&'py self, py: Python<'py>) -> PyDataFusionResult<Bound<'py, PyBytes>> {
65 let codec = DefaultPhysicalExtensionCodec {};
66 let proto = datafusion_proto::protobuf::PhysicalPlanNode::try_from_physical_plan(
67 self.plan.clone(),
68 &codec,
69 )?;
70
71 let bytes = proto.encode_to_vec();
72 Ok(PyBytes::new(py, &bytes))
73 }
74
75 #[staticmethod]
76 pub fn from_proto(
77 ctx: PySessionContext,
78 proto_msg: Bound<'_, PyBytes>,
79 ) -> PyDataFusionResult<Self> {
80 let bytes: &[u8] = proto_msg.extract()?;
81 let proto_plan =
82 datafusion_proto::protobuf::PhysicalPlanNode::decode(bytes).map_err(|e| {
83 PyRuntimeError::new_err(format!(
84 "Unable to decode logical node from serialized bytes: {e}"
85 ))
86 })?;
87
88 let codec = DefaultPhysicalExtensionCodec {};
89 let plan = proto_plan.try_into_physical_plan(ctx.ctx.task_ctx().as_ref(), &codec)?;
90 Ok(Self::new(plan))
91 }
92
93 fn __repr__(&self) -> String {
94 self.display_indent()
95 }
96
97 #[getter]
98 pub fn partition_count(&self) -> usize {
99 self.plan.output_partitioning().partition_count()
100 }
101}
102
103impl From<PyExecutionPlan> for Arc<dyn ExecutionPlan> {
104 fn from(plan: PyExecutionPlan) -> Arc<dyn ExecutionPlan> {
105 plan.plan.clone()
106 }
107}
108
109impl From<Arc<dyn ExecutionPlan>> for PyExecutionPlan {
110 fn from(plan: Arc<dyn ExecutionPlan>) -> PyExecutionPlan {
111 PyExecutionPlan { plan: plan.clone() }
112 }
113}