datafusion_python/sql/
logical.rs1use std::sync::Arc;
19
20use crate::context::PySessionContext;
21use crate::errors::PyDataFusionResult;
22use crate::expr::aggregate::PyAggregate;
23use crate::expr::analyze::PyAnalyze;
24use crate::expr::copy_to::PyCopyTo;
25use crate::expr::create_catalog::PyCreateCatalog;
26use crate::expr::create_catalog_schema::PyCreateCatalogSchema;
27use crate::expr::create_external_table::PyCreateExternalTable;
28use crate::expr::create_function::PyCreateFunction;
29use crate::expr::create_index::PyCreateIndex;
30use crate::expr::create_memory_table::PyCreateMemoryTable;
31use crate::expr::create_view::PyCreateView;
32use crate::expr::describe_table::PyDescribeTable;
33use crate::expr::distinct::PyDistinct;
34use crate::expr::dml::PyDmlStatement;
35use crate::expr::drop_catalog_schema::PyDropCatalogSchema;
36use crate::expr::drop_function::PyDropFunction;
37use crate::expr::drop_table::PyDropTable;
38use crate::expr::drop_view::PyDropView;
39use crate::expr::empty_relation::PyEmptyRelation;
40use crate::expr::explain::PyExplain;
41use crate::expr::extension::PyExtension;
42use crate::expr::filter::PyFilter;
43use crate::expr::join::PyJoin;
44use crate::expr::limit::PyLimit;
45use crate::expr::projection::PyProjection;
46use crate::expr::recursive_query::PyRecursiveQuery;
47use crate::expr::repartition::PyRepartition;
48use crate::expr::sort::PySort;
49use crate::expr::statement::{
50 PyDeallocate, PyExecute, PyPrepare, PySetVariable, PyTransactionEnd, PyTransactionStart,
51};
52use crate::expr::subquery::PySubquery;
53use crate::expr::subquery_alias::PySubqueryAlias;
54use crate::expr::table_scan::PyTableScan;
55use crate::expr::union::PyUnion;
56use crate::expr::unnest::PyUnnest;
57use crate::expr::values::PyValues;
58use crate::expr::window::PyWindowExpr;
59use datafusion::logical_expr::{DdlStatement, LogicalPlan, Statement};
60use datafusion_proto::logical_plan::{AsLogicalPlan, DefaultLogicalExtensionCodec};
61use prost::Message;
62use pyo3::{exceptions::PyRuntimeError, prelude::*, types::PyBytes};
63
64use crate::expr::logical_node::LogicalNode;
65
66#[pyclass(name = "LogicalPlan", module = "datafusion", subclass)]
67#[derive(Debug, Clone)]
68pub struct PyLogicalPlan {
69 pub(crate) plan: Arc<LogicalPlan>,
70}
71
72impl PyLogicalPlan {
73 pub fn new(plan: LogicalPlan) -> Self {
75 Self {
76 plan: Arc::new(plan),
77 }
78 }
79
80 pub fn plan(&self) -> Arc<LogicalPlan> {
81 self.plan.clone()
82 }
83}
84
85#[pymethods]
86impl PyLogicalPlan {
87 pub fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
89 match self.plan.as_ref() {
90 LogicalPlan::Aggregate(plan) => PyAggregate::from(plan.clone()).to_variant(py),
91 LogicalPlan::Analyze(plan) => PyAnalyze::from(plan.clone()).to_variant(py),
92 LogicalPlan::Distinct(plan) => PyDistinct::from(plan.clone()).to_variant(py),
93 LogicalPlan::EmptyRelation(plan) => PyEmptyRelation::from(plan.clone()).to_variant(py),
94 LogicalPlan::Explain(plan) => PyExplain::from(plan.clone()).to_variant(py),
95 LogicalPlan::Extension(plan) => PyExtension::from(plan.clone()).to_variant(py),
96 LogicalPlan::Filter(plan) => PyFilter::from(plan.clone()).to_variant(py),
97 LogicalPlan::Join(plan) => PyJoin::from(plan.clone()).to_variant(py),
98 LogicalPlan::Limit(plan) => PyLimit::from(plan.clone()).to_variant(py),
99 LogicalPlan::Projection(plan) => PyProjection::from(plan.clone()).to_variant(py),
100 LogicalPlan::Sort(plan) => PySort::from(plan.clone()).to_variant(py),
101 LogicalPlan::TableScan(plan) => PyTableScan::from(plan.clone()).to_variant(py),
102 LogicalPlan::Subquery(plan) => PySubquery::from(plan.clone()).to_variant(py),
103 LogicalPlan::SubqueryAlias(plan) => PySubqueryAlias::from(plan.clone()).to_variant(py),
104 LogicalPlan::Unnest(plan) => PyUnnest::from(plan.clone()).to_variant(py),
105 LogicalPlan::Window(plan) => PyWindowExpr::from(plan.clone()).to_variant(py),
106 LogicalPlan::Repartition(plan) => PyRepartition::from(plan.clone()).to_variant(py),
107 LogicalPlan::Union(plan) => PyUnion::from(plan.clone()).to_variant(py),
108 LogicalPlan::Statement(plan) => match plan {
109 Statement::TransactionStart(plan) => {
110 PyTransactionStart::from(plan.clone()).to_variant(py)
111 }
112 Statement::TransactionEnd(plan) => {
113 PyTransactionEnd::from(plan.clone()).to_variant(py)
114 }
115 Statement::SetVariable(plan) => PySetVariable::from(plan.clone()).to_variant(py),
116 Statement::Prepare(plan) => PyPrepare::from(plan.clone()).to_variant(py),
117 Statement::Execute(plan) => PyExecute::from(plan.clone()).to_variant(py),
118 Statement::Deallocate(plan) => PyDeallocate::from(plan.clone()).to_variant(py),
119 },
120 LogicalPlan::Values(plan) => PyValues::from(plan.clone()).to_variant(py),
121 LogicalPlan::Dml(plan) => PyDmlStatement::from(plan.clone()).to_variant(py),
122 LogicalPlan::Ddl(plan) => match plan {
123 DdlStatement::CreateExternalTable(plan) => {
124 PyCreateExternalTable::from(plan.clone()).to_variant(py)
125 }
126 DdlStatement::CreateMemoryTable(plan) => {
127 PyCreateMemoryTable::from(plan.clone()).to_variant(py)
128 }
129 DdlStatement::CreateView(plan) => PyCreateView::from(plan.clone()).to_variant(py),
130 DdlStatement::CreateCatalogSchema(plan) => {
131 PyCreateCatalogSchema::from(plan.clone()).to_variant(py)
132 }
133 DdlStatement::CreateCatalog(plan) => {
134 PyCreateCatalog::from(plan.clone()).to_variant(py)
135 }
136 DdlStatement::CreateIndex(plan) => PyCreateIndex::from(plan.clone()).to_variant(py),
137 DdlStatement::DropTable(plan) => PyDropTable::from(plan.clone()).to_variant(py),
138 DdlStatement::DropView(plan) => PyDropView::from(plan.clone()).to_variant(py),
139 DdlStatement::DropCatalogSchema(plan) => {
140 PyDropCatalogSchema::from(plan.clone()).to_variant(py)
141 }
142 DdlStatement::CreateFunction(plan) => {
143 PyCreateFunction::from(plan.clone()).to_variant(py)
144 }
145 DdlStatement::DropFunction(plan) => {
146 PyDropFunction::from(plan.clone()).to_variant(py)
147 }
148 },
149 LogicalPlan::Copy(plan) => PyCopyTo::from(plan.clone()).to_variant(py),
150 LogicalPlan::DescribeTable(plan) => PyDescribeTable::from(plan.clone()).to_variant(py),
151 LogicalPlan::RecursiveQuery(plan) => {
152 PyRecursiveQuery::from(plan.clone()).to_variant(py)
153 }
154 }
155 }
156
157 fn inputs(&self) -> Vec<PyLogicalPlan> {
159 let mut inputs = vec![];
160 for input in self.plan.inputs() {
161 inputs.push(input.to_owned().into());
162 }
163 inputs
164 }
165
166 fn __repr__(&self) -> PyResult<String> {
167 Ok(format!("{:?}", self.plan))
168 }
169
170 fn display(&self) -> String {
171 format!("{}", self.plan.display())
172 }
173
174 fn display_indent(&self) -> String {
175 format!("{}", self.plan.display_indent())
176 }
177
178 fn display_indent_schema(&self) -> String {
179 format!("{}", self.plan.display_indent_schema())
180 }
181
182 fn display_graphviz(&self) -> String {
183 format!("{}", self.plan.display_graphviz())
184 }
185
186 pub fn to_proto<'py>(&'py self, py: Python<'py>) -> PyDataFusionResult<Bound<'py, PyBytes>> {
187 let codec = DefaultLogicalExtensionCodec {};
188 let proto =
189 datafusion_proto::protobuf::LogicalPlanNode::try_from_logical_plan(&self.plan, &codec)?;
190
191 let bytes = proto.encode_to_vec();
192 Ok(PyBytes::new(py, &bytes))
193 }
194
195 #[staticmethod]
196 pub fn from_proto(
197 ctx: PySessionContext,
198 proto_msg: Bound<'_, PyBytes>,
199 ) -> PyDataFusionResult<Self> {
200 let bytes: &[u8] = proto_msg.extract()?;
201 let proto_plan =
202 datafusion_proto::protobuf::LogicalPlanNode::decode(bytes).map_err(|e| {
203 PyRuntimeError::new_err(format!(
204 "Unable to decode logical node from serialized bytes: {}",
205 e
206 ))
207 })?;
208
209 let codec = DefaultLogicalExtensionCodec {};
210 let plan = proto_plan.try_into_logical_plan(&ctx.ctx, &codec)?;
211 Ok(Self::new(plan))
212 }
213}
214
215impl From<PyLogicalPlan> for LogicalPlan {
216 fn from(logical_plan: PyLogicalPlan) -> LogicalPlan {
217 logical_plan.plan.as_ref().clone()
218 }
219}
220
221impl From<LogicalPlan> for PyLogicalPlan {
222 fn from(logical_plan: LogicalPlan) -> PyLogicalPlan {
223 PyLogicalPlan {
224 plan: Arc::new(logical_plan),
225 }
226 }
227}