1use datafusion::logical_expr::{
19 Deallocate, Execute, Prepare, SetVariable, TransactionAccessMode, TransactionConclusion,
20 TransactionEnd, TransactionIsolationLevel, TransactionStart,
21};
22use pyo3::{prelude::*, IntoPyObjectExt};
23
24use crate::{common::data_type::PyDataType, sql::logical::PyLogicalPlan};
25
26use super::{logical_node::LogicalNode, PyExpr};
27
28#[pyclass(
29 frozen,
30 name = "TransactionStart",
31 module = "datafusion.expr",
32 subclass
33)]
34#[derive(Clone)]
35pub struct PyTransactionStart {
36 transaction_start: TransactionStart,
37}
38
39impl From<TransactionStart> for PyTransactionStart {
40 fn from(transaction_start: TransactionStart) -> PyTransactionStart {
41 PyTransactionStart { transaction_start }
42 }
43}
44
45impl TryFrom<PyTransactionStart> for TransactionStart {
46 type Error = PyErr;
47
48 fn try_from(py: PyTransactionStart) -> Result<Self, Self::Error> {
49 Ok(py.transaction_start)
50 }
51}
52
53impl LogicalNode for PyTransactionStart {
54 fn inputs(&self) -> Vec<PyLogicalPlan> {
55 vec![]
56 }
57
58 fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
59 self.clone().into_bound_py_any(py)
60 }
61}
62
63#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
64#[pyclass(
65 frozen,
66 eq,
67 eq_int,
68 name = "TransactionAccessMode",
69 module = "datafusion.expr"
70)]
71pub enum PyTransactionAccessMode {
72 ReadOnly,
73 ReadWrite,
74}
75
76impl From<TransactionAccessMode> for PyTransactionAccessMode {
77 fn from(access_mode: TransactionAccessMode) -> PyTransactionAccessMode {
78 match access_mode {
79 TransactionAccessMode::ReadOnly => PyTransactionAccessMode::ReadOnly,
80 TransactionAccessMode::ReadWrite => PyTransactionAccessMode::ReadWrite,
81 }
82 }
83}
84
85impl TryFrom<PyTransactionAccessMode> for TransactionAccessMode {
86 type Error = PyErr;
87
88 fn try_from(py: PyTransactionAccessMode) -> Result<Self, Self::Error> {
89 match py {
90 PyTransactionAccessMode::ReadOnly => Ok(TransactionAccessMode::ReadOnly),
91 PyTransactionAccessMode::ReadWrite => Ok(TransactionAccessMode::ReadWrite),
92 }
93 }
94}
95
96#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
97#[pyclass(
98 frozen,
99 eq,
100 eq_int,
101 name = "TransactionIsolationLevel",
102 module = "datafusion.expr"
103)]
104pub enum PyTransactionIsolationLevel {
105 ReadUncommitted,
106 ReadCommitted,
107 RepeatableRead,
108 Serializable,
109 Snapshot,
110}
111
112impl From<TransactionIsolationLevel> for PyTransactionIsolationLevel {
113 fn from(isolation_level: TransactionIsolationLevel) -> PyTransactionIsolationLevel {
114 match isolation_level {
115 TransactionIsolationLevel::ReadUncommitted => {
116 PyTransactionIsolationLevel::ReadUncommitted
117 }
118 TransactionIsolationLevel::ReadCommitted => PyTransactionIsolationLevel::ReadCommitted,
119 TransactionIsolationLevel::RepeatableRead => {
120 PyTransactionIsolationLevel::RepeatableRead
121 }
122 TransactionIsolationLevel::Serializable => PyTransactionIsolationLevel::Serializable,
123 TransactionIsolationLevel::Snapshot => PyTransactionIsolationLevel::Snapshot,
124 }
125 }
126}
127
128impl TryFrom<PyTransactionIsolationLevel> for TransactionIsolationLevel {
129 type Error = PyErr;
130
131 fn try_from(value: PyTransactionIsolationLevel) -> Result<Self, Self::Error> {
132 match value {
133 PyTransactionIsolationLevel::ReadUncommitted => {
134 Ok(TransactionIsolationLevel::ReadUncommitted)
135 }
136 PyTransactionIsolationLevel::ReadCommitted => {
137 Ok(TransactionIsolationLevel::ReadCommitted)
138 }
139 PyTransactionIsolationLevel::RepeatableRead => {
140 Ok(TransactionIsolationLevel::RepeatableRead)
141 }
142 PyTransactionIsolationLevel::Serializable => {
143 Ok(TransactionIsolationLevel::Serializable)
144 }
145 PyTransactionIsolationLevel::Snapshot => Ok(TransactionIsolationLevel::Snapshot),
146 }
147 }
148}
149
150#[pymethods]
151impl PyTransactionStart {
152 #[new]
153 pub fn new(
154 access_mode: PyTransactionAccessMode,
155 isolation_level: PyTransactionIsolationLevel,
156 ) -> PyResult<Self> {
157 let access_mode = access_mode.try_into()?;
158 let isolation_level = isolation_level.try_into()?;
159 Ok(PyTransactionStart {
160 transaction_start: TransactionStart {
161 access_mode,
162 isolation_level,
163 },
164 })
165 }
166
167 pub fn access_mode(&self) -> PyResult<PyTransactionAccessMode> {
168 Ok(self.transaction_start.access_mode.clone().into())
169 }
170
171 pub fn isolation_level(&self) -> PyResult<PyTransactionIsolationLevel> {
172 Ok(self.transaction_start.isolation_level.clone().into())
173 }
174}
175
176#[pyclass(frozen, name = "TransactionEnd", module = "datafusion.expr", subclass)]
177#[derive(Clone)]
178pub struct PyTransactionEnd {
179 transaction_end: TransactionEnd,
180}
181
182impl From<TransactionEnd> for PyTransactionEnd {
183 fn from(transaction_end: TransactionEnd) -> PyTransactionEnd {
184 PyTransactionEnd { transaction_end }
185 }
186}
187
188impl TryFrom<PyTransactionEnd> for TransactionEnd {
189 type Error = PyErr;
190
191 fn try_from(py: PyTransactionEnd) -> Result<Self, Self::Error> {
192 Ok(py.transaction_end)
193 }
194}
195
196impl LogicalNode for PyTransactionEnd {
197 fn inputs(&self) -> Vec<PyLogicalPlan> {
198 vec![]
199 }
200
201 fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
202 self.clone().into_bound_py_any(py)
203 }
204}
205
206#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
207#[pyclass(
208 frozen,
209 eq,
210 eq_int,
211 name = "TransactionConclusion",
212 module = "datafusion.expr"
213)]
214pub enum PyTransactionConclusion {
215 Commit,
216 Rollback,
217}
218
219impl From<TransactionConclusion> for PyTransactionConclusion {
220 fn from(value: TransactionConclusion) -> Self {
221 match value {
222 TransactionConclusion::Commit => PyTransactionConclusion::Commit,
223 TransactionConclusion::Rollback => PyTransactionConclusion::Rollback,
224 }
225 }
226}
227
228impl TryFrom<PyTransactionConclusion> for TransactionConclusion {
229 type Error = PyErr;
230
231 fn try_from(value: PyTransactionConclusion) -> Result<Self, Self::Error> {
232 match value {
233 PyTransactionConclusion::Commit => Ok(TransactionConclusion::Commit),
234 PyTransactionConclusion::Rollback => Ok(TransactionConclusion::Rollback),
235 }
236 }
237}
238#[pymethods]
239impl PyTransactionEnd {
240 #[new]
241 pub fn new(conclusion: PyTransactionConclusion, chain: bool) -> PyResult<Self> {
242 let conclusion = conclusion.try_into()?;
243 Ok(PyTransactionEnd {
244 transaction_end: TransactionEnd { conclusion, chain },
245 })
246 }
247
248 pub fn conclusion(&self) -> PyResult<PyTransactionConclusion> {
249 Ok(self.transaction_end.conclusion.clone().into())
250 }
251
252 pub fn chain(&self) -> bool {
253 self.transaction_end.chain
254 }
255}
256
257#[pyclass(frozen, name = "SetVariable", module = "datafusion.expr", subclass)]
258#[derive(Clone)]
259pub struct PySetVariable {
260 set_variable: SetVariable,
261}
262
263impl From<SetVariable> for PySetVariable {
264 fn from(set_variable: SetVariable) -> PySetVariable {
265 PySetVariable { set_variable }
266 }
267}
268
269impl TryFrom<PySetVariable> for SetVariable {
270 type Error = PyErr;
271
272 fn try_from(py: PySetVariable) -> Result<Self, Self::Error> {
273 Ok(py.set_variable)
274 }
275}
276
277impl LogicalNode for PySetVariable {
278 fn inputs(&self) -> Vec<PyLogicalPlan> {
279 vec![]
280 }
281
282 fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
283 self.clone().into_bound_py_any(py)
284 }
285}
286
287#[pymethods]
288impl PySetVariable {
289 #[new]
290 pub fn new(variable: String, value: String) -> Self {
291 PySetVariable {
292 set_variable: SetVariable { variable, value },
293 }
294 }
295
296 pub fn variable(&self) -> String {
297 self.set_variable.variable.clone()
298 }
299
300 pub fn value(&self) -> String {
301 self.set_variable.value.clone()
302 }
303}
304
305#[pyclass(frozen, name = "Prepare", module = "datafusion.expr", subclass)]
306#[derive(Clone)]
307pub struct PyPrepare {
308 prepare: Prepare,
309}
310
311impl From<Prepare> for PyPrepare {
312 fn from(prepare: Prepare) -> PyPrepare {
313 PyPrepare { prepare }
314 }
315}
316
317impl TryFrom<PyPrepare> for Prepare {
318 type Error = PyErr;
319
320 fn try_from(py: PyPrepare) -> Result<Self, Self::Error> {
321 Ok(py.prepare)
322 }
323}
324
325impl LogicalNode for PyPrepare {
326 fn inputs(&self) -> Vec<PyLogicalPlan> {
327 vec![PyLogicalPlan::from((*self.prepare.input).clone())]
328 }
329
330 fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
331 self.clone().into_bound_py_any(py)
332 }
333}
334
335#[pymethods]
336impl PyPrepare {
337 #[new]
338 pub fn new(name: String, data_types: Vec<PyDataType>, input: PyLogicalPlan) -> Self {
339 let input = input.plan().clone();
340 let data_types = data_types
341 .into_iter()
342 .map(|data_type| data_type.into())
343 .collect();
344 PyPrepare {
345 prepare: Prepare {
346 name,
347 data_types,
348 input,
349 },
350 }
351 }
352
353 pub fn name(&self) -> String {
354 self.prepare.name.clone()
355 }
356
357 pub fn data_types(&self) -> Vec<PyDataType> {
358 self.prepare
359 .data_types
360 .clone()
361 .into_iter()
362 .map(|t| t.into())
363 .collect()
364 }
365
366 pub fn input(&self) -> PyLogicalPlan {
367 PyLogicalPlan {
368 plan: self.prepare.input.clone(),
369 }
370 }
371}
372
373#[pyclass(frozen, name = "Execute", module = "datafusion.expr", subclass)]
374#[derive(Clone)]
375pub struct PyExecute {
376 execute: Execute,
377}
378
379impl From<Execute> for PyExecute {
380 fn from(execute: Execute) -> PyExecute {
381 PyExecute { execute }
382 }
383}
384
385impl TryFrom<PyExecute> for Execute {
386 type Error = PyErr;
387
388 fn try_from(py: PyExecute) -> Result<Self, Self::Error> {
389 Ok(py.execute)
390 }
391}
392
393impl LogicalNode for PyExecute {
394 fn inputs(&self) -> Vec<PyLogicalPlan> {
395 vec![]
396 }
397
398 fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
399 self.clone().into_bound_py_any(py)
400 }
401}
402
403#[pymethods]
404impl PyExecute {
405 #[new]
406 pub fn new(name: String, parameters: Vec<PyExpr>) -> Self {
407 let parameters = parameters
408 .into_iter()
409 .map(|parameter| parameter.into())
410 .collect();
411 PyExecute {
412 execute: Execute { name, parameters },
413 }
414 }
415
416 pub fn name(&self) -> String {
417 self.execute.name.clone()
418 }
419
420 pub fn parameters(&self) -> Vec<PyExpr> {
421 self.execute
422 .parameters
423 .clone()
424 .into_iter()
425 .map(|t| t.into())
426 .collect()
427 }
428}
429
430#[pyclass(frozen, name = "Deallocate", module = "datafusion.expr", subclass)]
431#[derive(Clone)]
432pub struct PyDeallocate {
433 deallocate: Deallocate,
434}
435
436impl From<Deallocate> for PyDeallocate {
437 fn from(deallocate: Deallocate) -> PyDeallocate {
438 PyDeallocate { deallocate }
439 }
440}
441
442impl TryFrom<PyDeallocate> for Deallocate {
443 type Error = PyErr;
444
445 fn try_from(py: PyDeallocate) -> Result<Self, Self::Error> {
446 Ok(py.deallocate)
447 }
448}
449
450impl LogicalNode for PyDeallocate {
451 fn inputs(&self) -> Vec<PyLogicalPlan> {
452 vec![]
453 }
454
455 fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
456 self.clone().into_bound_py_any(py)
457 }
458}
459
460#[pymethods]
461impl PyDeallocate {
462 #[new]
463 pub fn new(name: String) -> Self {
464 PyDeallocate {
465 deallocate: Deallocate { name },
466 }
467 }
468
469 pub fn name(&self) -> String {
470 self.deallocate.name.clone()
471 }
472}