datafusion_python/expr/
dml.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use datafusion::logical_expr::dml::InsertOp;
19use datafusion::logical_expr::{DmlStatement, WriteOp};
20use pyo3::prelude::*;
21use pyo3::IntoPyObjectExt;
22
23use super::logical_node::LogicalNode;
24use crate::common::df_schema::PyDFSchema;
25use crate::common::schema::PyTableSource;
26use crate::sql::logical::PyLogicalPlan;
27
28#[pyclass(frozen, name = "DmlStatement", module = "datafusion.expr", subclass)]
29#[derive(Clone)]
30pub struct PyDmlStatement {
31    dml: DmlStatement,
32}
33
34impl From<PyDmlStatement> for DmlStatement {
35    fn from(dml: PyDmlStatement) -> Self {
36        dml.dml
37    }
38}
39
40impl From<DmlStatement> for PyDmlStatement {
41    fn from(dml: DmlStatement) -> PyDmlStatement {
42        PyDmlStatement { dml }
43    }
44}
45
46impl LogicalNode for PyDmlStatement {
47    fn inputs(&self) -> Vec<PyLogicalPlan> {
48        vec![PyLogicalPlan::from((*self.dml.input).clone())]
49    }
50
51    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
52        self.clone().into_bound_py_any(py)
53    }
54}
55
56#[pymethods]
57impl PyDmlStatement {
58    pub fn table_name(&self) -> PyResult<String> {
59        Ok(self.dml.table_name.to_string())
60    }
61
62    pub fn target(&self) -> PyResult<PyTableSource> {
63        Ok(PyTableSource {
64            table_source: self.dml.target.clone(),
65        })
66    }
67
68    pub fn op(&self) -> PyWriteOp {
69        self.dml.op.clone().into()
70    }
71
72    pub fn input(&self) -> PyLogicalPlan {
73        PyLogicalPlan {
74            plan: self.dml.input.clone(),
75        }
76    }
77
78    pub fn output_schema(&self) -> PyDFSchema {
79        (*self.dml.output_schema).clone().into()
80    }
81
82    fn __repr__(&self) -> PyResult<String> {
83        Ok("DmlStatement".to_string())
84    }
85
86    fn __name__(&self) -> PyResult<String> {
87        Ok("DmlStatement".to_string())
88    }
89}
90
91#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
92#[pyclass(eq, eq_int, name = "WriteOp", module = "datafusion.expr")]
93pub enum PyWriteOp {
94    Append,
95    Overwrite,
96    Replace,
97
98    Update,
99    Delete,
100    Ctas,
101}
102
103impl From<WriteOp> for PyWriteOp {
104    fn from(write_op: WriteOp) -> Self {
105        match write_op {
106            WriteOp::Insert(InsertOp::Append) => PyWriteOp::Append,
107            WriteOp::Insert(InsertOp::Overwrite) => PyWriteOp::Overwrite,
108            WriteOp::Insert(InsertOp::Replace) => PyWriteOp::Replace,
109
110            WriteOp::Update => PyWriteOp::Update,
111            WriteOp::Delete => PyWriteOp::Delete,
112            WriteOp::Ctas => PyWriteOp::Ctas,
113        }
114    }
115}
116
117impl From<PyWriteOp> for WriteOp {
118    fn from(py: PyWriteOp) -> Self {
119        match py {
120            PyWriteOp::Append => WriteOp::Insert(InsertOp::Append),
121            PyWriteOp::Overwrite => WriteOp::Insert(InsertOp::Overwrite),
122            PyWriteOp::Replace => WriteOp::Insert(InsertOp::Replace),
123
124            PyWriteOp::Update => WriteOp::Update,
125            PyWriteOp::Delete => WriteOp::Delete,
126            PyWriteOp::Ctas => WriteOp::Ctas,
127        }
128    }
129}
130
131#[pymethods]
132impl PyWriteOp {
133    fn name(&self) -> String {
134        let write_op: WriteOp = self.clone().into();
135        write_op.name().to_string()
136    }
137}