datafusion_expr/logical_plan/
dml.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::cmp::Ordering;
19use std::collections::HashMap;
20use std::fmt::{self, Debug, Display, Formatter};
21use std::hash::{Hash, Hasher};
22use std::sync::Arc;
23
24use arrow::datatypes::{DataType, Field, Schema};
25use datafusion_common::file_options::file_type::FileType;
26use datafusion_common::{DFSchemaRef, TableReference};
27
28use crate::{LogicalPlan, TableSource};
29
30/// Operator that copies the contents of a database to file(s)
31#[derive(Clone)]
32pub struct CopyTo {
33    /// The relation that determines the tuples to write to the output file(s)
34    pub input: Arc<LogicalPlan>,
35    /// The location to write the file(s)
36    pub output_url: String,
37    /// Determines which, if any, columns should be used for hive-style partitioned writes
38    pub partition_by: Vec<String>,
39    /// File type trait
40    pub file_type: Arc<dyn FileType>,
41    /// SQL Options that can affect the formats
42    pub options: HashMap<String, String>,
43}
44
45impl Debug for CopyTo {
46    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
47        f.debug_struct("CopyTo")
48            .field("input", &self.input)
49            .field("output_url", &self.output_url)
50            .field("partition_by", &self.partition_by)
51            .field("file_type", &"...")
52            .field("options", &self.options)
53            .finish_non_exhaustive()
54    }
55}
56
57// Implement PartialEq manually
58impl PartialEq for CopyTo {
59    fn eq(&self, other: &Self) -> bool {
60        self.input == other.input && self.output_url == other.output_url
61    }
62}
63
64// Implement Eq (no need for additional logic over PartialEq)
65impl Eq for CopyTo {}
66
67// Manual implementation needed because of `file_type` and `options` fields.
68// Comparison excludes these field.
69impl PartialOrd for CopyTo {
70    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
71        match self.input.partial_cmp(&other.input) {
72            Some(Ordering::Equal) => match self.output_url.partial_cmp(&other.output_url)
73            {
74                Some(Ordering::Equal) => {
75                    self.partition_by.partial_cmp(&other.partition_by)
76                }
77                cmp => cmp,
78            },
79            cmp => cmp,
80        }
81    }
82}
83
84// Implement Hash manually
85impl Hash for CopyTo {
86    fn hash<H: Hasher>(&self, state: &mut H) {
87        self.input.hash(state);
88        self.output_url.hash(state);
89    }
90}
91
92/// Modifies the content of a database
93///
94/// This operator is used to perform DML operations such as INSERT, DELETE,
95/// UPDATE, and CTAS (CREATE TABLE AS SELECT).
96///
97/// * `INSERT` - Appends new rows to the existing table. Calls
98///   [`TableProvider::insert_into`]
99///
100/// * `DELETE` - Removes rows from the table. Currently NOT supported by the
101///   [`TableProvider`] trait or builtin sources.
102///
103/// * `UPDATE` - Modifies existing rows in the table. Currently NOT supported by
104///   the [`TableProvider`] trait or builtin sources.
105///
106/// * `CREATE TABLE AS SELECT` - Creates a new table and populates it with data
107///   from a query. This is similar to the `INSERT` operation, but it creates a new
108///   table instead of modifying an existing one.
109///
110/// Note that the structure is adapted from substrait WriteRel)
111///
112/// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/datasource/trait.TableProvider.html
113/// [`TableProvider::insert_into`]: https://docs.rs/datafusion/latest/datafusion/datasource/trait.TableProvider.html#method.insert_into
114#[derive(Clone)]
115pub struct DmlStatement {
116    /// The table name
117    pub table_name: TableReference,
118    /// this is target table to insert into
119    pub target: Arc<dyn TableSource>,
120    /// The type of operation to perform
121    pub op: WriteOp,
122    /// The relation that determines the tuples to add/remove/modify the schema must match with table_schema
123    pub input: Arc<LogicalPlan>,
124    /// The schema of the output relation
125    pub output_schema: DFSchemaRef,
126}
127impl Eq for DmlStatement {}
128impl Hash for DmlStatement {
129    fn hash<H: Hasher>(&self, state: &mut H) {
130        self.table_name.hash(state);
131        self.target.schema().hash(state);
132        self.op.hash(state);
133        self.input.hash(state);
134        self.output_schema.hash(state);
135    }
136}
137
138impl PartialEq for DmlStatement {
139    fn eq(&self, other: &Self) -> bool {
140        self.table_name == other.table_name
141            && self.target.schema() == other.target.schema()
142            && self.op == other.op
143            && self.input == other.input
144            && self.output_schema == other.output_schema
145    }
146}
147
148impl Debug for DmlStatement {
149    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
150        f.debug_struct("DmlStatement")
151            .field("table_name", &self.table_name)
152            .field("target", &"...")
153            .field("target_schema", &self.target.schema())
154            .field("op", &self.op)
155            .field("input", &self.input)
156            .field("output_schema", &self.output_schema)
157            .finish()
158    }
159}
160
161impl DmlStatement {
162    /// Creates a new DML statement with the output schema set to a single `count` column.
163    pub fn new(
164        table_name: TableReference,
165        target: Arc<dyn TableSource>,
166        op: WriteOp,
167        input: Arc<LogicalPlan>,
168    ) -> Self {
169        Self {
170            table_name,
171            target,
172            op,
173            input,
174
175            // The output schema is always a single column with the number of rows affected
176            output_schema: make_count_schema(),
177        }
178    }
179
180    /// Return a descriptive name of this [`DmlStatement`]
181    pub fn name(&self) -> &str {
182        self.op.name()
183    }
184}
185
186// Manual implementation needed because of `table_schema` and `output_schema` fields.
187// Comparison excludes these fields.
188impl PartialOrd for DmlStatement {
189    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
190        match self.table_name.partial_cmp(&other.table_name) {
191            Some(Ordering::Equal) => match self.op.partial_cmp(&other.op) {
192                Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
193                cmp => cmp,
194            },
195            cmp => cmp,
196        }
197    }
198}
199
200/// The type of DML operation to perform.
201///
202/// See [`DmlStatement`] for more details.
203#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
204pub enum WriteOp {
205    /// `INSERT INTO` operation
206    Insert(InsertOp),
207    /// `DELETE` operation
208    Delete,
209    /// `UPDATE` operation
210    Update,
211    /// `CREATE TABLE AS SELECT` operation
212    Ctas,
213}
214
215impl WriteOp {
216    /// Return a descriptive name of this [`WriteOp`]
217    pub fn name(&self) -> &str {
218        match self {
219            WriteOp::Insert(insert) => insert.name(),
220            WriteOp::Delete => "Delete",
221            WriteOp::Update => "Update",
222            WriteOp::Ctas => "Ctas",
223        }
224    }
225}
226
227impl Display for WriteOp {
228    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
229        write!(f, "{}", self.name())
230    }
231}
232
233#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)]
234pub enum InsertOp {
235    /// Appends new rows to the existing table without modifying any
236    /// existing rows. This corresponds to the SQL `INSERT INTO` query.
237    Append,
238    /// Overwrites all existing rows in the table with the new rows.
239    /// This corresponds to the SQL `INSERT OVERWRITE` query.
240    Overwrite,
241    /// If any existing rows collides with the inserted rows (typically based
242    /// on a unique key or primary key), those existing rows are replaced.
243    /// This corresponds to the SQL `REPLACE INTO` query and its equivalents.
244    Replace,
245}
246
247impl InsertOp {
248    /// Return a descriptive name of this [`InsertOp`]
249    pub fn name(&self) -> &str {
250        match self {
251            InsertOp::Append => "Insert Into",
252            InsertOp::Overwrite => "Insert Overwrite",
253            InsertOp::Replace => "Replace Into",
254        }
255    }
256}
257
258impl Display for InsertOp {
259    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
260        write!(f, "{}", self.name())
261    }
262}
263
264fn make_count_schema() -> DFSchemaRef {
265    Arc::new(
266        Schema::new(vec![Field::new("count", DataType::UInt64, false)])
267            .try_into()
268            .unwrap(),
269    )
270}