datafusion_python/
physical_plan.rs1use std::sync::Arc;
19
20use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties, displayable};
21use datafusion_proto::physical_plan::{AsExecutionPlan, DefaultPhysicalExtensionCodec};
22use prost::Message;
23use pyo3::exceptions::PyRuntimeError;
24use pyo3::prelude::*;
25use pyo3::types::PyBytes;
26
27use crate::context::PySessionContext;
28use crate::errors::PyDataFusionResult;
29
30#[pyclass(
31 from_py_object,
32 frozen,
33 name = "ExecutionPlan",
34 module = "datafusion",
35 subclass
36)]
37#[derive(Debug, Clone)]
38pub struct PyExecutionPlan {
39 pub plan: Arc<dyn ExecutionPlan>,
40}
41
42impl PyExecutionPlan {
43 pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
45 Self { plan }
46 }
47}
48
49#[pymethods]
50impl PyExecutionPlan {
51 pub fn children(&self) -> Vec<PyExecutionPlan> {
53 self.plan
54 .children()
55 .iter()
56 .map(|&p| p.to_owned().into())
57 .collect()
58 }
59
60 pub fn display(&self) -> String {
61 let d = displayable(self.plan.as_ref());
62 format!("{}", d.one_line())
63 }
64
65 pub fn display_indent(&self) -> String {
66 let d = displayable(self.plan.as_ref());
67 format!("{}", d.indent(false))
68 }
69
70 pub fn to_proto<'py>(&'py self, py: Python<'py>) -> PyDataFusionResult<Bound<'py, PyBytes>> {
71 let codec = DefaultPhysicalExtensionCodec {};
72 let proto = datafusion_proto::protobuf::PhysicalPlanNode::try_from_physical_plan(
73 self.plan.clone(),
74 &codec,
75 )?;
76
77 let bytes = proto.encode_to_vec();
78 Ok(PyBytes::new(py, &bytes))
79 }
80
81 #[staticmethod]
82 pub fn from_proto(
83 ctx: PySessionContext,
84 proto_msg: Bound<'_, PyBytes>,
85 ) -> PyDataFusionResult<Self> {
86 let bytes: &[u8] = proto_msg.extract().map_err(Into::<PyErr>::into)?;
87 let proto_plan =
88 datafusion_proto::protobuf::PhysicalPlanNode::decode(bytes).map_err(|e| {
89 PyRuntimeError::new_err(format!(
90 "Unable to decode logical node from serialized bytes: {e}"
91 ))
92 })?;
93
94 let codec = DefaultPhysicalExtensionCodec {};
95 let plan = proto_plan.try_into_physical_plan(ctx.ctx.task_ctx().as_ref(), &codec)?;
96 Ok(Self::new(plan))
97 }
98
99 fn __repr__(&self) -> String {
100 self.display_indent()
101 }
102
103 #[getter]
104 pub fn partition_count(&self) -> usize {
105 self.plan.output_partitioning().partition_count()
106 }
107}
108
109impl From<PyExecutionPlan> for Arc<dyn ExecutionPlan> {
110 fn from(plan: PyExecutionPlan) -> Arc<dyn ExecutionPlan> {
111 plan.plan.clone()
112 }
113}
114
115impl From<Arc<dyn ExecutionPlan>> for PyExecutionPlan {
116 fn from(plan: Arc<dyn ExecutionPlan>) -> PyExecutionPlan {
117 PyExecutionPlan { plan: plan.clone() }
118 }
119}