datafusion_python/expr/
statement.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use arrow::datatypes::Field;
21use arrow::pyarrow::PyArrowType;
22use datafusion::logical_expr::{
23    Deallocate, Execute, Prepare, SetVariable, TransactionAccessMode, TransactionConclusion,
24    TransactionEnd, TransactionIsolationLevel, TransactionStart,
25};
26use pyo3::prelude::*;
27use pyo3::IntoPyObjectExt;
28
29use super::logical_node::LogicalNode;
30use super::PyExpr;
31use crate::sql::logical::PyLogicalPlan;
32
33#[pyclass(
34    frozen,
35    name = "TransactionStart",
36    module = "datafusion.expr",
37    subclass
38)]
39#[derive(Clone)]
40pub struct PyTransactionStart {
41    transaction_start: TransactionStart,
42}
43
44impl From<TransactionStart> for PyTransactionStart {
45    fn from(transaction_start: TransactionStart) -> PyTransactionStart {
46        PyTransactionStart { transaction_start }
47    }
48}
49
50impl TryFrom<PyTransactionStart> for TransactionStart {
51    type Error = PyErr;
52
53    fn try_from(py: PyTransactionStart) -> Result<Self, Self::Error> {
54        Ok(py.transaction_start)
55    }
56}
57
58impl LogicalNode for PyTransactionStart {
59    fn inputs(&self) -> Vec<PyLogicalPlan> {
60        vec![]
61    }
62
63    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
64        self.clone().into_bound_py_any(py)
65    }
66}
67
68#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
69#[pyclass(
70    frozen,
71    eq,
72    eq_int,
73    name = "TransactionAccessMode",
74    module = "datafusion.expr"
75)]
76pub enum PyTransactionAccessMode {
77    ReadOnly,
78    ReadWrite,
79}
80
81impl From<TransactionAccessMode> for PyTransactionAccessMode {
82    fn from(access_mode: TransactionAccessMode) -> PyTransactionAccessMode {
83        match access_mode {
84            TransactionAccessMode::ReadOnly => PyTransactionAccessMode::ReadOnly,
85            TransactionAccessMode::ReadWrite => PyTransactionAccessMode::ReadWrite,
86        }
87    }
88}
89
90impl TryFrom<PyTransactionAccessMode> for TransactionAccessMode {
91    type Error = PyErr;
92
93    fn try_from(py: PyTransactionAccessMode) -> Result<Self, Self::Error> {
94        match py {
95            PyTransactionAccessMode::ReadOnly => Ok(TransactionAccessMode::ReadOnly),
96            PyTransactionAccessMode::ReadWrite => Ok(TransactionAccessMode::ReadWrite),
97        }
98    }
99}
100
101#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
102#[pyclass(
103    frozen,
104    eq,
105    eq_int,
106    name = "TransactionIsolationLevel",
107    module = "datafusion.expr"
108)]
109pub enum PyTransactionIsolationLevel {
110    ReadUncommitted,
111    ReadCommitted,
112    RepeatableRead,
113    Serializable,
114    Snapshot,
115}
116
117impl From<TransactionIsolationLevel> for PyTransactionIsolationLevel {
118    fn from(isolation_level: TransactionIsolationLevel) -> PyTransactionIsolationLevel {
119        match isolation_level {
120            TransactionIsolationLevel::ReadUncommitted => {
121                PyTransactionIsolationLevel::ReadUncommitted
122            }
123            TransactionIsolationLevel::ReadCommitted => PyTransactionIsolationLevel::ReadCommitted,
124            TransactionIsolationLevel::RepeatableRead => {
125                PyTransactionIsolationLevel::RepeatableRead
126            }
127            TransactionIsolationLevel::Serializable => PyTransactionIsolationLevel::Serializable,
128            TransactionIsolationLevel::Snapshot => PyTransactionIsolationLevel::Snapshot,
129        }
130    }
131}
132
133impl TryFrom<PyTransactionIsolationLevel> for TransactionIsolationLevel {
134    type Error = PyErr;
135
136    fn try_from(value: PyTransactionIsolationLevel) -> Result<Self, Self::Error> {
137        match value {
138            PyTransactionIsolationLevel::ReadUncommitted => {
139                Ok(TransactionIsolationLevel::ReadUncommitted)
140            }
141            PyTransactionIsolationLevel::ReadCommitted => {
142                Ok(TransactionIsolationLevel::ReadCommitted)
143            }
144            PyTransactionIsolationLevel::RepeatableRead => {
145                Ok(TransactionIsolationLevel::RepeatableRead)
146            }
147            PyTransactionIsolationLevel::Serializable => {
148                Ok(TransactionIsolationLevel::Serializable)
149            }
150            PyTransactionIsolationLevel::Snapshot => Ok(TransactionIsolationLevel::Snapshot),
151        }
152    }
153}
154
155#[pymethods]
156impl PyTransactionStart {
157    #[new]
158    pub fn new(
159        access_mode: PyTransactionAccessMode,
160        isolation_level: PyTransactionIsolationLevel,
161    ) -> PyResult<Self> {
162        let access_mode = access_mode.try_into()?;
163        let isolation_level = isolation_level.try_into()?;
164        Ok(PyTransactionStart {
165            transaction_start: TransactionStart {
166                access_mode,
167                isolation_level,
168            },
169        })
170    }
171
172    pub fn access_mode(&self) -> PyResult<PyTransactionAccessMode> {
173        Ok(self.transaction_start.access_mode.clone().into())
174    }
175
176    pub fn isolation_level(&self) -> PyResult<PyTransactionIsolationLevel> {
177        Ok(self.transaction_start.isolation_level.clone().into())
178    }
179}
180
181#[pyclass(frozen, name = "TransactionEnd", module = "datafusion.expr", subclass)]
182#[derive(Clone)]
183pub struct PyTransactionEnd {
184    transaction_end: TransactionEnd,
185}
186
187impl From<TransactionEnd> for PyTransactionEnd {
188    fn from(transaction_end: TransactionEnd) -> PyTransactionEnd {
189        PyTransactionEnd { transaction_end }
190    }
191}
192
193impl TryFrom<PyTransactionEnd> for TransactionEnd {
194    type Error = PyErr;
195
196    fn try_from(py: PyTransactionEnd) -> Result<Self, Self::Error> {
197        Ok(py.transaction_end)
198    }
199}
200
201impl LogicalNode for PyTransactionEnd {
202    fn inputs(&self) -> Vec<PyLogicalPlan> {
203        vec![]
204    }
205
206    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
207        self.clone().into_bound_py_any(py)
208    }
209}
210
211#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
212#[pyclass(
213    frozen,
214    eq,
215    eq_int,
216    name = "TransactionConclusion",
217    module = "datafusion.expr"
218)]
219pub enum PyTransactionConclusion {
220    Commit,
221    Rollback,
222}
223
224impl From<TransactionConclusion> for PyTransactionConclusion {
225    fn from(value: TransactionConclusion) -> Self {
226        match value {
227            TransactionConclusion::Commit => PyTransactionConclusion::Commit,
228            TransactionConclusion::Rollback => PyTransactionConclusion::Rollback,
229        }
230    }
231}
232
233impl TryFrom<PyTransactionConclusion> for TransactionConclusion {
234    type Error = PyErr;
235
236    fn try_from(value: PyTransactionConclusion) -> Result<Self, Self::Error> {
237        match value {
238            PyTransactionConclusion::Commit => Ok(TransactionConclusion::Commit),
239            PyTransactionConclusion::Rollback => Ok(TransactionConclusion::Rollback),
240        }
241    }
242}
243#[pymethods]
244impl PyTransactionEnd {
245    #[new]
246    pub fn new(conclusion: PyTransactionConclusion, chain: bool) -> PyResult<Self> {
247        let conclusion = conclusion.try_into()?;
248        Ok(PyTransactionEnd {
249            transaction_end: TransactionEnd { conclusion, chain },
250        })
251    }
252
253    pub fn conclusion(&self) -> PyResult<PyTransactionConclusion> {
254        Ok(self.transaction_end.conclusion.clone().into())
255    }
256
257    pub fn chain(&self) -> bool {
258        self.transaction_end.chain
259    }
260}
261
262#[pyclass(frozen, name = "SetVariable", module = "datafusion.expr", subclass)]
263#[derive(Clone)]
264pub struct PySetVariable {
265    set_variable: SetVariable,
266}
267
268impl From<SetVariable> for PySetVariable {
269    fn from(set_variable: SetVariable) -> PySetVariable {
270        PySetVariable { set_variable }
271    }
272}
273
274impl TryFrom<PySetVariable> for SetVariable {
275    type Error = PyErr;
276
277    fn try_from(py: PySetVariable) -> Result<Self, Self::Error> {
278        Ok(py.set_variable)
279    }
280}
281
282impl LogicalNode for PySetVariable {
283    fn inputs(&self) -> Vec<PyLogicalPlan> {
284        vec![]
285    }
286
287    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
288        self.clone().into_bound_py_any(py)
289    }
290}
291
292#[pymethods]
293impl PySetVariable {
294    #[new]
295    pub fn new(variable: String, value: String) -> Self {
296        PySetVariable {
297            set_variable: SetVariable { variable, value },
298        }
299    }
300
301    pub fn variable(&self) -> String {
302        self.set_variable.variable.clone()
303    }
304
305    pub fn value(&self) -> String {
306        self.set_variable.value.clone()
307    }
308}
309
310#[pyclass(frozen, name = "Prepare", module = "datafusion.expr", subclass)]
311#[derive(Clone)]
312pub struct PyPrepare {
313    prepare: Prepare,
314}
315
316impl From<Prepare> for PyPrepare {
317    fn from(prepare: Prepare) -> PyPrepare {
318        PyPrepare { prepare }
319    }
320}
321
322impl TryFrom<PyPrepare> for Prepare {
323    type Error = PyErr;
324
325    fn try_from(py: PyPrepare) -> Result<Self, Self::Error> {
326        Ok(py.prepare)
327    }
328}
329
330impl LogicalNode for PyPrepare {
331    fn inputs(&self) -> Vec<PyLogicalPlan> {
332        vec![PyLogicalPlan::from((*self.prepare.input).clone())]
333    }
334
335    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
336        self.clone().into_bound_py_any(py)
337    }
338}
339
340#[pymethods]
341impl PyPrepare {
342    #[new]
343    pub fn new(name: String, fields: Vec<PyArrowType<Field>>, input: PyLogicalPlan) -> Self {
344        let input = input.plan().clone();
345        let fields = fields.into_iter().map(|field| Arc::new(field.0)).collect();
346        PyPrepare {
347            prepare: Prepare {
348                name,
349                fields,
350                input,
351            },
352        }
353    }
354
355    pub fn name(&self) -> String {
356        self.prepare.name.clone()
357    }
358
359    pub fn fields(&self) -> Vec<PyArrowType<Field>> {
360        self.prepare
361            .fields
362            .clone()
363            .into_iter()
364            .map(|f| f.as_ref().clone().into())
365            .collect()
366    }
367
368    pub fn input(&self) -> PyLogicalPlan {
369        PyLogicalPlan {
370            plan: self.prepare.input.clone(),
371        }
372    }
373}
374
375#[pyclass(frozen, name = "Execute", module = "datafusion.expr", subclass)]
376#[derive(Clone)]
377pub struct PyExecute {
378    execute: Execute,
379}
380
381impl From<Execute> for PyExecute {
382    fn from(execute: Execute) -> PyExecute {
383        PyExecute { execute }
384    }
385}
386
387impl TryFrom<PyExecute> for Execute {
388    type Error = PyErr;
389
390    fn try_from(py: PyExecute) -> Result<Self, Self::Error> {
391        Ok(py.execute)
392    }
393}
394
395impl LogicalNode for PyExecute {
396    fn inputs(&self) -> Vec<PyLogicalPlan> {
397        vec![]
398    }
399
400    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
401        self.clone().into_bound_py_any(py)
402    }
403}
404
405#[pymethods]
406impl PyExecute {
407    #[new]
408    pub fn new(name: String, parameters: Vec<PyExpr>) -> Self {
409        let parameters = parameters
410            .into_iter()
411            .map(|parameter| parameter.into())
412            .collect();
413        PyExecute {
414            execute: Execute { name, parameters },
415        }
416    }
417
418    pub fn name(&self) -> String {
419        self.execute.name.clone()
420    }
421
422    pub fn parameters(&self) -> Vec<PyExpr> {
423        self.execute
424            .parameters
425            .clone()
426            .into_iter()
427            .map(|t| t.into())
428            .collect()
429    }
430}
431
432#[pyclass(frozen, name = "Deallocate", module = "datafusion.expr", subclass)]
433#[derive(Clone)]
434pub struct PyDeallocate {
435    deallocate: Deallocate,
436}
437
438impl From<Deallocate> for PyDeallocate {
439    fn from(deallocate: Deallocate) -> PyDeallocate {
440        PyDeallocate { deallocate }
441    }
442}
443
444impl TryFrom<PyDeallocate> for Deallocate {
445    type Error = PyErr;
446
447    fn try_from(py: PyDeallocate) -> Result<Self, Self::Error> {
448        Ok(py.deallocate)
449    }
450}
451
452impl LogicalNode for PyDeallocate {
453    fn inputs(&self) -> Vec<PyLogicalPlan> {
454        vec![]
455    }
456
457    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
458        self.clone().into_bound_py_any(py)
459    }
460}
461
462#[pymethods]
463impl PyDeallocate {
464    #[new]
465    pub fn new(name: String) -> Self {
466        PyDeallocate {
467            deallocate: Deallocate { name },
468        }
469    }
470
471    pub fn name(&self) -> String {
472        self.deallocate.name.clone()
473    }
474}