Skip to main content

datafusion_python/expr/
statement.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use arrow::datatypes::Field;
21use arrow::pyarrow::PyArrowType;
22use datafusion::logical_expr::{
23    Deallocate, Execute, Prepare, ResetVariable, SetVariable, TransactionAccessMode,
24    TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart,
25};
26use pyo3::IntoPyObjectExt;
27use pyo3::prelude::*;
28
29use super::PyExpr;
30use super::logical_node::LogicalNode;
31use crate::sql::logical::PyLogicalPlan;
32
33#[pyclass(
34    from_py_object,
35    frozen,
36    name = "TransactionStart",
37    module = "datafusion.expr",
38    subclass
39)]
40#[derive(Clone)]
41pub struct PyTransactionStart {
42    transaction_start: TransactionStart,
43}
44
45impl From<TransactionStart> for PyTransactionStart {
46    fn from(transaction_start: TransactionStart) -> PyTransactionStart {
47        PyTransactionStart { transaction_start }
48    }
49}
50
51impl TryFrom<PyTransactionStart> for TransactionStart {
52    type Error = PyErr;
53
54    fn try_from(py: PyTransactionStart) -> Result<Self, Self::Error> {
55        Ok(py.transaction_start)
56    }
57}
58
59impl LogicalNode for PyTransactionStart {
60    fn inputs(&self) -> Vec<PyLogicalPlan> {
61        vec![]
62    }
63
64    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
65        self.clone().into_bound_py_any(py)
66    }
67}
68
69#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
70#[pyclass(
71    from_py_object,
72    frozen,
73    eq,
74    eq_int,
75    name = "TransactionAccessMode",
76    module = "datafusion.expr"
77)]
78pub enum PyTransactionAccessMode {
79    ReadOnly,
80    ReadWrite,
81}
82
83impl From<TransactionAccessMode> for PyTransactionAccessMode {
84    fn from(access_mode: TransactionAccessMode) -> PyTransactionAccessMode {
85        match access_mode {
86            TransactionAccessMode::ReadOnly => PyTransactionAccessMode::ReadOnly,
87            TransactionAccessMode::ReadWrite => PyTransactionAccessMode::ReadWrite,
88        }
89    }
90}
91
92impl TryFrom<PyTransactionAccessMode> for TransactionAccessMode {
93    type Error = PyErr;
94
95    fn try_from(py: PyTransactionAccessMode) -> Result<Self, Self::Error> {
96        match py {
97            PyTransactionAccessMode::ReadOnly => Ok(TransactionAccessMode::ReadOnly),
98            PyTransactionAccessMode::ReadWrite => Ok(TransactionAccessMode::ReadWrite),
99        }
100    }
101}
102
103#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
104#[pyclass(
105    from_py_object,
106    frozen,
107    eq,
108    eq_int,
109    name = "TransactionIsolationLevel",
110    module = "datafusion.expr"
111)]
112pub enum PyTransactionIsolationLevel {
113    ReadUncommitted,
114    ReadCommitted,
115    RepeatableRead,
116    Serializable,
117    Snapshot,
118}
119
120impl From<TransactionIsolationLevel> for PyTransactionIsolationLevel {
121    fn from(isolation_level: TransactionIsolationLevel) -> PyTransactionIsolationLevel {
122        match isolation_level {
123            TransactionIsolationLevel::ReadUncommitted => {
124                PyTransactionIsolationLevel::ReadUncommitted
125            }
126            TransactionIsolationLevel::ReadCommitted => PyTransactionIsolationLevel::ReadCommitted,
127            TransactionIsolationLevel::RepeatableRead => {
128                PyTransactionIsolationLevel::RepeatableRead
129            }
130            TransactionIsolationLevel::Serializable => PyTransactionIsolationLevel::Serializable,
131            TransactionIsolationLevel::Snapshot => PyTransactionIsolationLevel::Snapshot,
132        }
133    }
134}
135
136impl TryFrom<PyTransactionIsolationLevel> for TransactionIsolationLevel {
137    type Error = PyErr;
138
139    fn try_from(value: PyTransactionIsolationLevel) -> Result<Self, Self::Error> {
140        match value {
141            PyTransactionIsolationLevel::ReadUncommitted => {
142                Ok(TransactionIsolationLevel::ReadUncommitted)
143            }
144            PyTransactionIsolationLevel::ReadCommitted => {
145                Ok(TransactionIsolationLevel::ReadCommitted)
146            }
147            PyTransactionIsolationLevel::RepeatableRead => {
148                Ok(TransactionIsolationLevel::RepeatableRead)
149            }
150            PyTransactionIsolationLevel::Serializable => {
151                Ok(TransactionIsolationLevel::Serializable)
152            }
153            PyTransactionIsolationLevel::Snapshot => Ok(TransactionIsolationLevel::Snapshot),
154        }
155    }
156}
157
158#[pymethods]
159impl PyTransactionStart {
160    #[new]
161    pub fn new(
162        access_mode: PyTransactionAccessMode,
163        isolation_level: PyTransactionIsolationLevel,
164    ) -> PyResult<Self> {
165        let access_mode = access_mode.try_into()?;
166        let isolation_level = isolation_level.try_into()?;
167        Ok(PyTransactionStart {
168            transaction_start: TransactionStart {
169                access_mode,
170                isolation_level,
171            },
172        })
173    }
174
175    pub fn access_mode(&self) -> PyResult<PyTransactionAccessMode> {
176        Ok(self.transaction_start.access_mode.clone().into())
177    }
178
179    pub fn isolation_level(&self) -> PyResult<PyTransactionIsolationLevel> {
180        Ok(self.transaction_start.isolation_level.clone().into())
181    }
182}
183
184#[pyclass(
185    from_py_object,
186    frozen,
187    name = "TransactionEnd",
188    module = "datafusion.expr",
189    subclass
190)]
191#[derive(Clone)]
192pub struct PyTransactionEnd {
193    transaction_end: TransactionEnd,
194}
195
196impl From<TransactionEnd> for PyTransactionEnd {
197    fn from(transaction_end: TransactionEnd) -> PyTransactionEnd {
198        PyTransactionEnd { transaction_end }
199    }
200}
201
202impl TryFrom<PyTransactionEnd> for TransactionEnd {
203    type Error = PyErr;
204
205    fn try_from(py: PyTransactionEnd) -> Result<Self, Self::Error> {
206        Ok(py.transaction_end)
207    }
208}
209
210impl LogicalNode for PyTransactionEnd {
211    fn inputs(&self) -> Vec<PyLogicalPlan> {
212        vec![]
213    }
214
215    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
216        self.clone().into_bound_py_any(py)
217    }
218}
219
220#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
221#[pyclass(
222    from_py_object,
223    frozen,
224    eq,
225    eq_int,
226    name = "TransactionConclusion",
227    module = "datafusion.expr"
228)]
229pub enum PyTransactionConclusion {
230    Commit,
231    Rollback,
232}
233
234impl From<TransactionConclusion> for PyTransactionConclusion {
235    fn from(value: TransactionConclusion) -> Self {
236        match value {
237            TransactionConclusion::Commit => PyTransactionConclusion::Commit,
238            TransactionConclusion::Rollback => PyTransactionConclusion::Rollback,
239        }
240    }
241}
242
243impl TryFrom<PyTransactionConclusion> for TransactionConclusion {
244    type Error = PyErr;
245
246    fn try_from(value: PyTransactionConclusion) -> Result<Self, Self::Error> {
247        match value {
248            PyTransactionConclusion::Commit => Ok(TransactionConclusion::Commit),
249            PyTransactionConclusion::Rollback => Ok(TransactionConclusion::Rollback),
250        }
251    }
252}
253#[pymethods]
254impl PyTransactionEnd {
255    #[new]
256    pub fn new(conclusion: PyTransactionConclusion, chain: bool) -> PyResult<Self> {
257        let conclusion = conclusion.try_into()?;
258        Ok(PyTransactionEnd {
259            transaction_end: TransactionEnd { conclusion, chain },
260        })
261    }
262
263    pub fn conclusion(&self) -> PyResult<PyTransactionConclusion> {
264        Ok(self.transaction_end.conclusion.clone().into())
265    }
266
267    pub fn chain(&self) -> bool {
268        self.transaction_end.chain
269    }
270}
271
272#[pyclass(
273    from_py_object,
274    frozen,
275    name = "ResetVariable",
276    module = "datafusion.expr",
277    subclass
278)]
279#[derive(Clone)]
280pub struct PyResetVariable {
281    reset_variable: ResetVariable,
282}
283
284impl From<ResetVariable> for PyResetVariable {
285    fn from(reset_variable: ResetVariable) -> PyResetVariable {
286        PyResetVariable { reset_variable }
287    }
288}
289
290impl TryFrom<PyResetVariable> for ResetVariable {
291    type Error = PyErr;
292
293    fn try_from(py: PyResetVariable) -> Result<Self, Self::Error> {
294        Ok(py.reset_variable)
295    }
296}
297
298impl LogicalNode for PyResetVariable {
299    fn inputs(&self) -> Vec<PyLogicalPlan> {
300        vec![]
301    }
302
303    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
304        self.clone().into_bound_py_any(py)
305    }
306}
307
308#[pymethods]
309impl PyResetVariable {
310    #[new]
311    pub fn new(variable: String) -> Self {
312        PyResetVariable {
313            reset_variable: ResetVariable { variable },
314        }
315    }
316
317    pub fn variable(&self) -> String {
318        self.reset_variable.variable.clone()
319    }
320}
321
322#[pyclass(
323    from_py_object,
324    frozen,
325    name = "SetVariable",
326    module = "datafusion.expr",
327    subclass
328)]
329#[derive(Clone)]
330pub struct PySetVariable {
331    set_variable: SetVariable,
332}
333
334impl From<SetVariable> for PySetVariable {
335    fn from(set_variable: SetVariable) -> PySetVariable {
336        PySetVariable { set_variable }
337    }
338}
339
340impl TryFrom<PySetVariable> for SetVariable {
341    type Error = PyErr;
342
343    fn try_from(py: PySetVariable) -> Result<Self, Self::Error> {
344        Ok(py.set_variable)
345    }
346}
347
348impl LogicalNode for PySetVariable {
349    fn inputs(&self) -> Vec<PyLogicalPlan> {
350        vec![]
351    }
352
353    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
354        self.clone().into_bound_py_any(py)
355    }
356}
357
358#[pymethods]
359impl PySetVariable {
360    #[new]
361    pub fn new(variable: String, value: String) -> Self {
362        PySetVariable {
363            set_variable: SetVariable { variable, value },
364        }
365    }
366
367    pub fn variable(&self) -> String {
368        self.set_variable.variable.clone()
369    }
370
371    pub fn value(&self) -> String {
372        self.set_variable.value.clone()
373    }
374}
375
376#[pyclass(
377    from_py_object,
378    frozen,
379    name = "Prepare",
380    module = "datafusion.expr",
381    subclass
382)]
383#[derive(Clone)]
384pub struct PyPrepare {
385    prepare: Prepare,
386}
387
388impl From<Prepare> for PyPrepare {
389    fn from(prepare: Prepare) -> PyPrepare {
390        PyPrepare { prepare }
391    }
392}
393
394impl TryFrom<PyPrepare> for Prepare {
395    type Error = PyErr;
396
397    fn try_from(py: PyPrepare) -> Result<Self, Self::Error> {
398        Ok(py.prepare)
399    }
400}
401
402impl LogicalNode for PyPrepare {
403    fn inputs(&self) -> Vec<PyLogicalPlan> {
404        vec![PyLogicalPlan::from((*self.prepare.input).clone())]
405    }
406
407    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
408        self.clone().into_bound_py_any(py)
409    }
410}
411
412#[pymethods]
413impl PyPrepare {
414    #[new]
415    pub fn new(name: String, fields: Vec<PyArrowType<Field>>, input: PyLogicalPlan) -> Self {
416        let input = input.plan().clone();
417        let fields = fields.into_iter().map(|field| Arc::new(field.0)).collect();
418        PyPrepare {
419            prepare: Prepare {
420                name,
421                fields,
422                input,
423            },
424        }
425    }
426
427    pub fn name(&self) -> String {
428        self.prepare.name.clone()
429    }
430
431    pub fn fields(&self) -> Vec<PyArrowType<Field>> {
432        self.prepare
433            .fields
434            .clone()
435            .into_iter()
436            .map(|f| f.as_ref().clone().into())
437            .collect()
438    }
439
440    pub fn input(&self) -> PyLogicalPlan {
441        PyLogicalPlan {
442            plan: self.prepare.input.clone(),
443        }
444    }
445}
446
447#[pyclass(
448    from_py_object,
449    frozen,
450    name = "Execute",
451    module = "datafusion.expr",
452    subclass
453)]
454#[derive(Clone)]
455pub struct PyExecute {
456    execute: Execute,
457}
458
459impl From<Execute> for PyExecute {
460    fn from(execute: Execute) -> PyExecute {
461        PyExecute { execute }
462    }
463}
464
465impl TryFrom<PyExecute> for Execute {
466    type Error = PyErr;
467
468    fn try_from(py: PyExecute) -> Result<Self, Self::Error> {
469        Ok(py.execute)
470    }
471}
472
473impl LogicalNode for PyExecute {
474    fn inputs(&self) -> Vec<PyLogicalPlan> {
475        vec![]
476    }
477
478    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
479        self.clone().into_bound_py_any(py)
480    }
481}
482
483#[pymethods]
484impl PyExecute {
485    #[new]
486    pub fn new(name: String, parameters: Vec<PyExpr>) -> Self {
487        let parameters = parameters
488            .into_iter()
489            .map(|parameter| parameter.into())
490            .collect();
491        PyExecute {
492            execute: Execute { name, parameters },
493        }
494    }
495
496    pub fn name(&self) -> String {
497        self.execute.name.clone()
498    }
499
500    pub fn parameters(&self) -> Vec<PyExpr> {
501        self.execute
502            .parameters
503            .clone()
504            .into_iter()
505            .map(|t| t.into())
506            .collect()
507    }
508}
509
510#[pyclass(
511    from_py_object,
512    frozen,
513    name = "Deallocate",
514    module = "datafusion.expr",
515    subclass
516)]
517#[derive(Clone)]
518pub struct PyDeallocate {
519    deallocate: Deallocate,
520}
521
522impl From<Deallocate> for PyDeallocate {
523    fn from(deallocate: Deallocate) -> PyDeallocate {
524        PyDeallocate { deallocate }
525    }
526}
527
528impl TryFrom<PyDeallocate> for Deallocate {
529    type Error = PyErr;
530
531    fn try_from(py: PyDeallocate) -> Result<Self, Self::Error> {
532        Ok(py.deallocate)
533    }
534}
535
536impl LogicalNode for PyDeallocate {
537    fn inputs(&self) -> Vec<PyLogicalPlan> {
538        vec![]
539    }
540
541    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
542        self.clone().into_bound_py_any(py)
543    }
544}
545
546#[pymethods]
547impl PyDeallocate {
548    #[new]
549    pub fn new(name: String) -> Self {
550        PyDeallocate {
551            deallocate: Deallocate { name },
552        }
553    }
554
555    pub fn name(&self) -> String {
556        self.deallocate.name.clone()
557    }
558}