Skip to main content

datafusion_python/expr/
statement.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use arrow::datatypes::Field;
21use arrow::pyarrow::PyArrowType;
22use datafusion::logical_expr::{
23    Deallocate, Execute, Prepare, ResetVariable, SetVariable, TransactionAccessMode,
24    TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart,
25};
26use pyo3::IntoPyObjectExt;
27use pyo3::prelude::*;
28
29use super::PyExpr;
30use super::logical_node::LogicalNode;
31use crate::sql::logical::PyLogicalPlan;
32
33#[pyclass(
34    frozen,
35    name = "TransactionStart",
36    module = "datafusion.expr",
37    subclass
38)]
39#[derive(Clone)]
40pub struct PyTransactionStart {
41    transaction_start: TransactionStart,
42}
43
44impl From<TransactionStart> for PyTransactionStart {
45    fn from(transaction_start: TransactionStart) -> PyTransactionStart {
46        PyTransactionStart { transaction_start }
47    }
48}
49
50impl TryFrom<PyTransactionStart> for TransactionStart {
51    type Error = PyErr;
52
53    fn try_from(py: PyTransactionStart) -> Result<Self, Self::Error> {
54        Ok(py.transaction_start)
55    }
56}
57
58impl LogicalNode for PyTransactionStart {
59    fn inputs(&self) -> Vec<PyLogicalPlan> {
60        vec![]
61    }
62
63    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
64        self.clone().into_bound_py_any(py)
65    }
66}
67
68#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
69#[pyclass(
70    frozen,
71    eq,
72    eq_int,
73    name = "TransactionAccessMode",
74    module = "datafusion.expr"
75)]
76pub enum PyTransactionAccessMode {
77    ReadOnly,
78    ReadWrite,
79}
80
81impl From<TransactionAccessMode> for PyTransactionAccessMode {
82    fn from(access_mode: TransactionAccessMode) -> PyTransactionAccessMode {
83        match access_mode {
84            TransactionAccessMode::ReadOnly => PyTransactionAccessMode::ReadOnly,
85            TransactionAccessMode::ReadWrite => PyTransactionAccessMode::ReadWrite,
86        }
87    }
88}
89
90impl TryFrom<PyTransactionAccessMode> for TransactionAccessMode {
91    type Error = PyErr;
92
93    fn try_from(py: PyTransactionAccessMode) -> Result<Self, Self::Error> {
94        match py {
95            PyTransactionAccessMode::ReadOnly => Ok(TransactionAccessMode::ReadOnly),
96            PyTransactionAccessMode::ReadWrite => Ok(TransactionAccessMode::ReadWrite),
97        }
98    }
99}
100
101#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
102#[pyclass(
103    frozen,
104    eq,
105    eq_int,
106    name = "TransactionIsolationLevel",
107    module = "datafusion.expr"
108)]
109pub enum PyTransactionIsolationLevel {
110    ReadUncommitted,
111    ReadCommitted,
112    RepeatableRead,
113    Serializable,
114    Snapshot,
115}
116
117impl From<TransactionIsolationLevel> for PyTransactionIsolationLevel {
118    fn from(isolation_level: TransactionIsolationLevel) -> PyTransactionIsolationLevel {
119        match isolation_level {
120            TransactionIsolationLevel::ReadUncommitted => {
121                PyTransactionIsolationLevel::ReadUncommitted
122            }
123            TransactionIsolationLevel::ReadCommitted => PyTransactionIsolationLevel::ReadCommitted,
124            TransactionIsolationLevel::RepeatableRead => {
125                PyTransactionIsolationLevel::RepeatableRead
126            }
127            TransactionIsolationLevel::Serializable => PyTransactionIsolationLevel::Serializable,
128            TransactionIsolationLevel::Snapshot => PyTransactionIsolationLevel::Snapshot,
129        }
130    }
131}
132
133impl TryFrom<PyTransactionIsolationLevel> for TransactionIsolationLevel {
134    type Error = PyErr;
135
136    fn try_from(value: PyTransactionIsolationLevel) -> Result<Self, Self::Error> {
137        match value {
138            PyTransactionIsolationLevel::ReadUncommitted => {
139                Ok(TransactionIsolationLevel::ReadUncommitted)
140            }
141            PyTransactionIsolationLevel::ReadCommitted => {
142                Ok(TransactionIsolationLevel::ReadCommitted)
143            }
144            PyTransactionIsolationLevel::RepeatableRead => {
145                Ok(TransactionIsolationLevel::RepeatableRead)
146            }
147            PyTransactionIsolationLevel::Serializable => {
148                Ok(TransactionIsolationLevel::Serializable)
149            }
150            PyTransactionIsolationLevel::Snapshot => Ok(TransactionIsolationLevel::Snapshot),
151        }
152    }
153}
154
155#[pymethods]
156impl PyTransactionStart {
157    #[new]
158    pub fn new(
159        access_mode: PyTransactionAccessMode,
160        isolation_level: PyTransactionIsolationLevel,
161    ) -> PyResult<Self> {
162        let access_mode = access_mode.try_into()?;
163        let isolation_level = isolation_level.try_into()?;
164        Ok(PyTransactionStart {
165            transaction_start: TransactionStart {
166                access_mode,
167                isolation_level,
168            },
169        })
170    }
171
172    pub fn access_mode(&self) -> PyResult<PyTransactionAccessMode> {
173        Ok(self.transaction_start.access_mode.clone().into())
174    }
175
176    pub fn isolation_level(&self) -> PyResult<PyTransactionIsolationLevel> {
177        Ok(self.transaction_start.isolation_level.clone().into())
178    }
179}
180
181#[pyclass(frozen, name = "TransactionEnd", module = "datafusion.expr", subclass)]
182#[derive(Clone)]
183pub struct PyTransactionEnd {
184    transaction_end: TransactionEnd,
185}
186
187impl From<TransactionEnd> for PyTransactionEnd {
188    fn from(transaction_end: TransactionEnd) -> PyTransactionEnd {
189        PyTransactionEnd { transaction_end }
190    }
191}
192
193impl TryFrom<PyTransactionEnd> for TransactionEnd {
194    type Error = PyErr;
195
196    fn try_from(py: PyTransactionEnd) -> Result<Self, Self::Error> {
197        Ok(py.transaction_end)
198    }
199}
200
201impl LogicalNode for PyTransactionEnd {
202    fn inputs(&self) -> Vec<PyLogicalPlan> {
203        vec![]
204    }
205
206    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
207        self.clone().into_bound_py_any(py)
208    }
209}
210
211#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
212#[pyclass(
213    frozen,
214    eq,
215    eq_int,
216    name = "TransactionConclusion",
217    module = "datafusion.expr"
218)]
219pub enum PyTransactionConclusion {
220    Commit,
221    Rollback,
222}
223
224impl From<TransactionConclusion> for PyTransactionConclusion {
225    fn from(value: TransactionConclusion) -> Self {
226        match value {
227            TransactionConclusion::Commit => PyTransactionConclusion::Commit,
228            TransactionConclusion::Rollback => PyTransactionConclusion::Rollback,
229        }
230    }
231}
232
233impl TryFrom<PyTransactionConclusion> for TransactionConclusion {
234    type Error = PyErr;
235
236    fn try_from(value: PyTransactionConclusion) -> Result<Self, Self::Error> {
237        match value {
238            PyTransactionConclusion::Commit => Ok(TransactionConclusion::Commit),
239            PyTransactionConclusion::Rollback => Ok(TransactionConclusion::Rollback),
240        }
241    }
242}
243#[pymethods]
244impl PyTransactionEnd {
245    #[new]
246    pub fn new(conclusion: PyTransactionConclusion, chain: bool) -> PyResult<Self> {
247        let conclusion = conclusion.try_into()?;
248        Ok(PyTransactionEnd {
249            transaction_end: TransactionEnd { conclusion, chain },
250        })
251    }
252
253    pub fn conclusion(&self) -> PyResult<PyTransactionConclusion> {
254        Ok(self.transaction_end.conclusion.clone().into())
255    }
256
257    pub fn chain(&self) -> bool {
258        self.transaction_end.chain
259    }
260}
261
262#[pyclass(frozen, name = "ResetVariable", module = "datafusion.expr", subclass)]
263#[derive(Clone)]
264pub struct PyResetVariable {
265    reset_variable: ResetVariable,
266}
267
268impl From<ResetVariable> for PyResetVariable {
269    fn from(reset_variable: ResetVariable) -> PyResetVariable {
270        PyResetVariable { reset_variable }
271    }
272}
273
274impl TryFrom<PyResetVariable> for ResetVariable {
275    type Error = PyErr;
276
277    fn try_from(py: PyResetVariable) -> Result<Self, Self::Error> {
278        Ok(py.reset_variable)
279    }
280}
281
282impl LogicalNode for PyResetVariable {
283    fn inputs(&self) -> Vec<PyLogicalPlan> {
284        vec![]
285    }
286
287    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
288        self.clone().into_bound_py_any(py)
289    }
290}
291
292#[pymethods]
293impl PyResetVariable {
294    #[new]
295    pub fn new(variable: String) -> Self {
296        PyResetVariable {
297            reset_variable: ResetVariable { variable },
298        }
299    }
300
301    pub fn variable(&self) -> String {
302        self.reset_variable.variable.clone()
303    }
304}
305
306#[pyclass(frozen, name = "SetVariable", module = "datafusion.expr", subclass)]
307#[derive(Clone)]
308pub struct PySetVariable {
309    set_variable: SetVariable,
310}
311
312impl From<SetVariable> for PySetVariable {
313    fn from(set_variable: SetVariable) -> PySetVariable {
314        PySetVariable { set_variable }
315    }
316}
317
318impl TryFrom<PySetVariable> for SetVariable {
319    type Error = PyErr;
320
321    fn try_from(py: PySetVariable) -> Result<Self, Self::Error> {
322        Ok(py.set_variable)
323    }
324}
325
326impl LogicalNode for PySetVariable {
327    fn inputs(&self) -> Vec<PyLogicalPlan> {
328        vec![]
329    }
330
331    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
332        self.clone().into_bound_py_any(py)
333    }
334}
335
336#[pymethods]
337impl PySetVariable {
338    #[new]
339    pub fn new(variable: String, value: String) -> Self {
340        PySetVariable {
341            set_variable: SetVariable { variable, value },
342        }
343    }
344
345    pub fn variable(&self) -> String {
346        self.set_variable.variable.clone()
347    }
348
349    pub fn value(&self) -> String {
350        self.set_variable.value.clone()
351    }
352}
353
354#[pyclass(frozen, name = "Prepare", module = "datafusion.expr", subclass)]
355#[derive(Clone)]
356pub struct PyPrepare {
357    prepare: Prepare,
358}
359
360impl From<Prepare> for PyPrepare {
361    fn from(prepare: Prepare) -> PyPrepare {
362        PyPrepare { prepare }
363    }
364}
365
366impl TryFrom<PyPrepare> for Prepare {
367    type Error = PyErr;
368
369    fn try_from(py: PyPrepare) -> Result<Self, Self::Error> {
370        Ok(py.prepare)
371    }
372}
373
374impl LogicalNode for PyPrepare {
375    fn inputs(&self) -> Vec<PyLogicalPlan> {
376        vec![PyLogicalPlan::from((*self.prepare.input).clone())]
377    }
378
379    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
380        self.clone().into_bound_py_any(py)
381    }
382}
383
384#[pymethods]
385impl PyPrepare {
386    #[new]
387    pub fn new(name: String, fields: Vec<PyArrowType<Field>>, input: PyLogicalPlan) -> Self {
388        let input = input.plan().clone();
389        let fields = fields.into_iter().map(|field| Arc::new(field.0)).collect();
390        PyPrepare {
391            prepare: Prepare {
392                name,
393                fields,
394                input,
395            },
396        }
397    }
398
399    pub fn name(&self) -> String {
400        self.prepare.name.clone()
401    }
402
403    pub fn fields(&self) -> Vec<PyArrowType<Field>> {
404        self.prepare
405            .fields
406            .clone()
407            .into_iter()
408            .map(|f| f.as_ref().clone().into())
409            .collect()
410    }
411
412    pub fn input(&self) -> PyLogicalPlan {
413        PyLogicalPlan {
414            plan: self.prepare.input.clone(),
415        }
416    }
417}
418
419#[pyclass(frozen, name = "Execute", module = "datafusion.expr", subclass)]
420#[derive(Clone)]
421pub struct PyExecute {
422    execute: Execute,
423}
424
425impl From<Execute> for PyExecute {
426    fn from(execute: Execute) -> PyExecute {
427        PyExecute { execute }
428    }
429}
430
431impl TryFrom<PyExecute> for Execute {
432    type Error = PyErr;
433
434    fn try_from(py: PyExecute) -> Result<Self, Self::Error> {
435        Ok(py.execute)
436    }
437}
438
439impl LogicalNode for PyExecute {
440    fn inputs(&self) -> Vec<PyLogicalPlan> {
441        vec![]
442    }
443
444    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
445        self.clone().into_bound_py_any(py)
446    }
447}
448
449#[pymethods]
450impl PyExecute {
451    #[new]
452    pub fn new(name: String, parameters: Vec<PyExpr>) -> Self {
453        let parameters = parameters
454            .into_iter()
455            .map(|parameter| parameter.into())
456            .collect();
457        PyExecute {
458            execute: Execute { name, parameters },
459        }
460    }
461
462    pub fn name(&self) -> String {
463        self.execute.name.clone()
464    }
465
466    pub fn parameters(&self) -> Vec<PyExpr> {
467        self.execute
468            .parameters
469            .clone()
470            .into_iter()
471            .map(|t| t.into())
472            .collect()
473    }
474}
475
476#[pyclass(frozen, name = "Deallocate", module = "datafusion.expr", subclass)]
477#[derive(Clone)]
478pub struct PyDeallocate {
479    deallocate: Deallocate,
480}
481
482impl From<Deallocate> for PyDeallocate {
483    fn from(deallocate: Deallocate) -> PyDeallocate {
484        PyDeallocate { deallocate }
485    }
486}
487
488impl TryFrom<PyDeallocate> for Deallocate {
489    type Error = PyErr;
490
491    fn try_from(py: PyDeallocate) -> Result<Self, Self::Error> {
492        Ok(py.deallocate)
493    }
494}
495
496impl LogicalNode for PyDeallocate {
497    fn inputs(&self) -> Vec<PyLogicalPlan> {
498        vec![]
499    }
500
501    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
502        self.clone().into_bound_py_any(py)
503    }
504}
505
506#[pymethods]
507impl PyDeallocate {
508    #[new]
509    pub fn new(name: String) -> Self {
510        PyDeallocate {
511            deallocate: Deallocate { name },
512        }
513    }
514
515    pub fn name(&self) -> String {
516        self.deallocate.name.clone()
517    }
518}