1use std::sync::Arc;
19
20use arrow::datatypes::Field;
21use arrow::pyarrow::PyArrowType;
22use datafusion::logical_expr::{
23 Deallocate, Execute, Prepare, ResetVariable, SetVariable, TransactionAccessMode,
24 TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart,
25};
26use pyo3::IntoPyObjectExt;
27use pyo3::prelude::*;
28
29use super::PyExpr;
30use super::logical_node::LogicalNode;
31use crate::sql::logical::PyLogicalPlan;
32
33#[pyclass(
34 frozen,
35 name = "TransactionStart",
36 module = "datafusion.expr",
37 subclass
38)]
39#[derive(Clone)]
40pub struct PyTransactionStart {
41 transaction_start: TransactionStart,
42}
43
44impl From<TransactionStart> for PyTransactionStart {
45 fn from(transaction_start: TransactionStart) -> PyTransactionStart {
46 PyTransactionStart { transaction_start }
47 }
48}
49
50impl TryFrom<PyTransactionStart> for TransactionStart {
51 type Error = PyErr;
52
53 fn try_from(py: PyTransactionStart) -> Result<Self, Self::Error> {
54 Ok(py.transaction_start)
55 }
56}
57
58impl LogicalNode for PyTransactionStart {
59 fn inputs(&self) -> Vec<PyLogicalPlan> {
60 vec![]
61 }
62
63 fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
64 self.clone().into_bound_py_any(py)
65 }
66}
67
68#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
69#[pyclass(
70 frozen,
71 eq,
72 eq_int,
73 name = "TransactionAccessMode",
74 module = "datafusion.expr"
75)]
76pub enum PyTransactionAccessMode {
77 ReadOnly,
78 ReadWrite,
79}
80
81impl From<TransactionAccessMode> for PyTransactionAccessMode {
82 fn from(access_mode: TransactionAccessMode) -> PyTransactionAccessMode {
83 match access_mode {
84 TransactionAccessMode::ReadOnly => PyTransactionAccessMode::ReadOnly,
85 TransactionAccessMode::ReadWrite => PyTransactionAccessMode::ReadWrite,
86 }
87 }
88}
89
90impl TryFrom<PyTransactionAccessMode> for TransactionAccessMode {
91 type Error = PyErr;
92
93 fn try_from(py: PyTransactionAccessMode) -> Result<Self, Self::Error> {
94 match py {
95 PyTransactionAccessMode::ReadOnly => Ok(TransactionAccessMode::ReadOnly),
96 PyTransactionAccessMode::ReadWrite => Ok(TransactionAccessMode::ReadWrite),
97 }
98 }
99}
100
101#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
102#[pyclass(
103 frozen,
104 eq,
105 eq_int,
106 name = "TransactionIsolationLevel",
107 module = "datafusion.expr"
108)]
109pub enum PyTransactionIsolationLevel {
110 ReadUncommitted,
111 ReadCommitted,
112 RepeatableRead,
113 Serializable,
114 Snapshot,
115}
116
117impl From<TransactionIsolationLevel> for PyTransactionIsolationLevel {
118 fn from(isolation_level: TransactionIsolationLevel) -> PyTransactionIsolationLevel {
119 match isolation_level {
120 TransactionIsolationLevel::ReadUncommitted => {
121 PyTransactionIsolationLevel::ReadUncommitted
122 }
123 TransactionIsolationLevel::ReadCommitted => PyTransactionIsolationLevel::ReadCommitted,
124 TransactionIsolationLevel::RepeatableRead => {
125 PyTransactionIsolationLevel::RepeatableRead
126 }
127 TransactionIsolationLevel::Serializable => PyTransactionIsolationLevel::Serializable,
128 TransactionIsolationLevel::Snapshot => PyTransactionIsolationLevel::Snapshot,
129 }
130 }
131}
132
133impl TryFrom<PyTransactionIsolationLevel> for TransactionIsolationLevel {
134 type Error = PyErr;
135
136 fn try_from(value: PyTransactionIsolationLevel) -> Result<Self, Self::Error> {
137 match value {
138 PyTransactionIsolationLevel::ReadUncommitted => {
139 Ok(TransactionIsolationLevel::ReadUncommitted)
140 }
141 PyTransactionIsolationLevel::ReadCommitted => {
142 Ok(TransactionIsolationLevel::ReadCommitted)
143 }
144 PyTransactionIsolationLevel::RepeatableRead => {
145 Ok(TransactionIsolationLevel::RepeatableRead)
146 }
147 PyTransactionIsolationLevel::Serializable => {
148 Ok(TransactionIsolationLevel::Serializable)
149 }
150 PyTransactionIsolationLevel::Snapshot => Ok(TransactionIsolationLevel::Snapshot),
151 }
152 }
153}
154
155#[pymethods]
156impl PyTransactionStart {
157 #[new]
158 pub fn new(
159 access_mode: PyTransactionAccessMode,
160 isolation_level: PyTransactionIsolationLevel,
161 ) -> PyResult<Self> {
162 let access_mode = access_mode.try_into()?;
163 let isolation_level = isolation_level.try_into()?;
164 Ok(PyTransactionStart {
165 transaction_start: TransactionStart {
166 access_mode,
167 isolation_level,
168 },
169 })
170 }
171
172 pub fn access_mode(&self) -> PyResult<PyTransactionAccessMode> {
173 Ok(self.transaction_start.access_mode.clone().into())
174 }
175
176 pub fn isolation_level(&self) -> PyResult<PyTransactionIsolationLevel> {
177 Ok(self.transaction_start.isolation_level.clone().into())
178 }
179}
180
181#[pyclass(frozen, name = "TransactionEnd", module = "datafusion.expr", subclass)]
182#[derive(Clone)]
183pub struct PyTransactionEnd {
184 transaction_end: TransactionEnd,
185}
186
187impl From<TransactionEnd> for PyTransactionEnd {
188 fn from(transaction_end: TransactionEnd) -> PyTransactionEnd {
189 PyTransactionEnd { transaction_end }
190 }
191}
192
193impl TryFrom<PyTransactionEnd> for TransactionEnd {
194 type Error = PyErr;
195
196 fn try_from(py: PyTransactionEnd) -> Result<Self, Self::Error> {
197 Ok(py.transaction_end)
198 }
199}
200
201impl LogicalNode for PyTransactionEnd {
202 fn inputs(&self) -> Vec<PyLogicalPlan> {
203 vec![]
204 }
205
206 fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
207 self.clone().into_bound_py_any(py)
208 }
209}
210
211#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
212#[pyclass(
213 frozen,
214 eq,
215 eq_int,
216 name = "TransactionConclusion",
217 module = "datafusion.expr"
218)]
219pub enum PyTransactionConclusion {
220 Commit,
221 Rollback,
222}
223
224impl From<TransactionConclusion> for PyTransactionConclusion {
225 fn from(value: TransactionConclusion) -> Self {
226 match value {
227 TransactionConclusion::Commit => PyTransactionConclusion::Commit,
228 TransactionConclusion::Rollback => PyTransactionConclusion::Rollback,
229 }
230 }
231}
232
233impl TryFrom<PyTransactionConclusion> for TransactionConclusion {
234 type Error = PyErr;
235
236 fn try_from(value: PyTransactionConclusion) -> Result<Self, Self::Error> {
237 match value {
238 PyTransactionConclusion::Commit => Ok(TransactionConclusion::Commit),
239 PyTransactionConclusion::Rollback => Ok(TransactionConclusion::Rollback),
240 }
241 }
242}
243#[pymethods]
244impl PyTransactionEnd {
245 #[new]
246 pub fn new(conclusion: PyTransactionConclusion, chain: bool) -> PyResult<Self> {
247 let conclusion = conclusion.try_into()?;
248 Ok(PyTransactionEnd {
249 transaction_end: TransactionEnd { conclusion, chain },
250 })
251 }
252
253 pub fn conclusion(&self) -> PyResult<PyTransactionConclusion> {
254 Ok(self.transaction_end.conclusion.clone().into())
255 }
256
257 pub fn chain(&self) -> bool {
258 self.transaction_end.chain
259 }
260}
261
262#[pyclass(frozen, name = "ResetVariable", module = "datafusion.expr", subclass)]
263#[derive(Clone)]
264pub struct PyResetVariable {
265 reset_variable: ResetVariable,
266}
267
268impl From<ResetVariable> for PyResetVariable {
269 fn from(reset_variable: ResetVariable) -> PyResetVariable {
270 PyResetVariable { reset_variable }
271 }
272}
273
274impl TryFrom<PyResetVariable> for ResetVariable {
275 type Error = PyErr;
276
277 fn try_from(py: PyResetVariable) -> Result<Self, Self::Error> {
278 Ok(py.reset_variable)
279 }
280}
281
282impl LogicalNode for PyResetVariable {
283 fn inputs(&self) -> Vec<PyLogicalPlan> {
284 vec![]
285 }
286
287 fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
288 self.clone().into_bound_py_any(py)
289 }
290}
291
292#[pymethods]
293impl PyResetVariable {
294 #[new]
295 pub fn new(variable: String) -> Self {
296 PyResetVariable {
297 reset_variable: ResetVariable { variable },
298 }
299 }
300
301 pub fn variable(&self) -> String {
302 self.reset_variable.variable.clone()
303 }
304}
305
306#[pyclass(frozen, name = "SetVariable", module = "datafusion.expr", subclass)]
307#[derive(Clone)]
308pub struct PySetVariable {
309 set_variable: SetVariable,
310}
311
312impl From<SetVariable> for PySetVariable {
313 fn from(set_variable: SetVariable) -> PySetVariable {
314 PySetVariable { set_variable }
315 }
316}
317
318impl TryFrom<PySetVariable> for SetVariable {
319 type Error = PyErr;
320
321 fn try_from(py: PySetVariable) -> Result<Self, Self::Error> {
322 Ok(py.set_variable)
323 }
324}
325
326impl LogicalNode for PySetVariable {
327 fn inputs(&self) -> Vec<PyLogicalPlan> {
328 vec![]
329 }
330
331 fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
332 self.clone().into_bound_py_any(py)
333 }
334}
335
336#[pymethods]
337impl PySetVariable {
338 #[new]
339 pub fn new(variable: String, value: String) -> Self {
340 PySetVariable {
341 set_variable: SetVariable { variable, value },
342 }
343 }
344
345 pub fn variable(&self) -> String {
346 self.set_variable.variable.clone()
347 }
348
349 pub fn value(&self) -> String {
350 self.set_variable.value.clone()
351 }
352}
353
354#[pyclass(frozen, name = "Prepare", module = "datafusion.expr", subclass)]
355#[derive(Clone)]
356pub struct PyPrepare {
357 prepare: Prepare,
358}
359
360impl From<Prepare> for PyPrepare {
361 fn from(prepare: Prepare) -> PyPrepare {
362 PyPrepare { prepare }
363 }
364}
365
366impl TryFrom<PyPrepare> for Prepare {
367 type Error = PyErr;
368
369 fn try_from(py: PyPrepare) -> Result<Self, Self::Error> {
370 Ok(py.prepare)
371 }
372}
373
374impl LogicalNode for PyPrepare {
375 fn inputs(&self) -> Vec<PyLogicalPlan> {
376 vec![PyLogicalPlan::from((*self.prepare.input).clone())]
377 }
378
379 fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
380 self.clone().into_bound_py_any(py)
381 }
382}
383
384#[pymethods]
385impl PyPrepare {
386 #[new]
387 pub fn new(name: String, fields: Vec<PyArrowType<Field>>, input: PyLogicalPlan) -> Self {
388 let input = input.plan().clone();
389 let fields = fields.into_iter().map(|field| Arc::new(field.0)).collect();
390 PyPrepare {
391 prepare: Prepare {
392 name,
393 fields,
394 input,
395 },
396 }
397 }
398
399 pub fn name(&self) -> String {
400 self.prepare.name.clone()
401 }
402
403 pub fn fields(&self) -> Vec<PyArrowType<Field>> {
404 self.prepare
405 .fields
406 .clone()
407 .into_iter()
408 .map(|f| f.as_ref().clone().into())
409 .collect()
410 }
411
412 pub fn input(&self) -> PyLogicalPlan {
413 PyLogicalPlan {
414 plan: self.prepare.input.clone(),
415 }
416 }
417}
418
419#[pyclass(frozen, name = "Execute", module = "datafusion.expr", subclass)]
420#[derive(Clone)]
421pub struct PyExecute {
422 execute: Execute,
423}
424
425impl From<Execute> for PyExecute {
426 fn from(execute: Execute) -> PyExecute {
427 PyExecute { execute }
428 }
429}
430
431impl TryFrom<PyExecute> for Execute {
432 type Error = PyErr;
433
434 fn try_from(py: PyExecute) -> Result<Self, Self::Error> {
435 Ok(py.execute)
436 }
437}
438
439impl LogicalNode for PyExecute {
440 fn inputs(&self) -> Vec<PyLogicalPlan> {
441 vec![]
442 }
443
444 fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
445 self.clone().into_bound_py_any(py)
446 }
447}
448
449#[pymethods]
450impl PyExecute {
451 #[new]
452 pub fn new(name: String, parameters: Vec<PyExpr>) -> Self {
453 let parameters = parameters
454 .into_iter()
455 .map(|parameter| parameter.into())
456 .collect();
457 PyExecute {
458 execute: Execute { name, parameters },
459 }
460 }
461
462 pub fn name(&self) -> String {
463 self.execute.name.clone()
464 }
465
466 pub fn parameters(&self) -> Vec<PyExpr> {
467 self.execute
468 .parameters
469 .clone()
470 .into_iter()
471 .map(|t| t.into())
472 .collect()
473 }
474}
475
476#[pyclass(frozen, name = "Deallocate", module = "datafusion.expr", subclass)]
477#[derive(Clone)]
478pub struct PyDeallocate {
479 deallocate: Deallocate,
480}
481
482impl From<Deallocate> for PyDeallocate {
483 fn from(deallocate: Deallocate) -> PyDeallocate {
484 PyDeallocate { deallocate }
485 }
486}
487
488impl TryFrom<PyDeallocate> for Deallocate {
489 type Error = PyErr;
490
491 fn try_from(py: PyDeallocate) -> Result<Self, Self::Error> {
492 Ok(py.deallocate)
493 }
494}
495
496impl LogicalNode for PyDeallocate {
497 fn inputs(&self) -> Vec<PyLogicalPlan> {
498 vec![]
499 }
500
501 fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
502 self.clone().into_bound_py_any(py)
503 }
504}
505
506#[pymethods]
507impl PyDeallocate {
508 #[new]
509 pub fn new(name: String) -> Self {
510 PyDeallocate {
511 deallocate: Deallocate { name },
512 }
513 }
514
515 pub fn name(&self) -> String {
516 self.deallocate.name.clone()
517 }
518}