datafusion_expr/logical_plan/dml.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::cmp::Ordering;
19use std::collections::HashMap;
20use std::fmt::{self, Debug, Display, Formatter};
21use std::hash::{Hash, Hasher};
22use std::sync::Arc;
23
24use arrow::datatypes::{DataType, Field, Schema};
25use datafusion_common::file_options::file_type::FileType;
26use datafusion_common::{DFSchemaRef, TableReference};
27
28use crate::{LogicalPlan, TableSource};
29
30/// Operator that copies the contents of a database to file(s)
31#[derive(Clone)]
32pub struct CopyTo {
33 /// The relation that determines the tuples to write to the output file(s)
34 pub input: Arc<LogicalPlan>,
35 /// The location to write the file(s)
36 pub output_url: String,
37 /// Determines which, if any, columns should be used for hive-style partitioned writes
38 pub partition_by: Vec<String>,
39 /// File type trait
40 pub file_type: Arc<dyn FileType>,
41 /// SQL Options that can affect the formats
42 pub options: HashMap<String, String>,
43 /// The schema of the output (a single column "count")
44 pub output_schema: DFSchemaRef,
45}
46
47impl Debug for CopyTo {
48 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
49 f.debug_struct("CopyTo")
50 .field("input", &self.input)
51 .field("output_url", &self.output_url)
52 .field("partition_by", &self.partition_by)
53 .field("file_type", &"...")
54 .field("options", &self.options)
55 .field("output_schema", &self.output_schema)
56 .finish_non_exhaustive()
57 }
58}
59
60// Implement PartialEq manually
61impl PartialEq for CopyTo {
62 fn eq(&self, other: &Self) -> bool {
63 self.input == other.input && self.output_url == other.output_url
64 }
65}
66
67// Implement Eq (no need for additional logic over PartialEq)
68impl Eq for CopyTo {}
69
70// Manual implementation needed because of `file_type` and `options` fields.
71// Comparison excludes these field.
72impl PartialOrd for CopyTo {
73 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
74 match self.input.partial_cmp(&other.input) {
75 Some(Ordering::Equal) => match self.output_url.partial_cmp(&other.output_url)
76 {
77 Some(Ordering::Equal) => {
78 self.partition_by.partial_cmp(&other.partition_by)
79 }
80 cmp => cmp,
81 },
82 cmp => cmp,
83 }
84 }
85}
86
87// Implement Hash manually
88impl Hash for CopyTo {
89 fn hash<H: Hasher>(&self, state: &mut H) {
90 self.input.hash(state);
91 self.output_url.hash(state);
92 }
93}
94
95impl CopyTo {
96 pub fn new(
97 input: Arc<LogicalPlan>,
98 output_url: String,
99 partition_by: Vec<String>,
100 file_type: Arc<dyn FileType>,
101 options: HashMap<String, String>,
102 ) -> Self {
103 Self {
104 input,
105 output_url,
106 partition_by,
107 file_type,
108 options,
109 // The output schema is always a single column "count" with the number of rows copied
110 output_schema: make_count_schema(),
111 }
112 }
113}
114
115/// Modifies the content of a database
116///
117/// This operator is used to perform DML operations such as INSERT, DELETE,
118/// UPDATE, and CTAS (CREATE TABLE AS SELECT).
119///
120/// * `INSERT` - Appends new rows to the existing table. Calls
121/// [`TableProvider::insert_into`]
122///
123/// * `DELETE` - Removes rows from the table. Currently NOT supported by the
124/// [`TableProvider`] trait or builtin sources.
125///
126/// * `UPDATE` - Modifies existing rows in the table. Currently NOT supported by
127/// the [`TableProvider`] trait or builtin sources.
128///
129/// * `CREATE TABLE AS SELECT` - Creates a new table and populates it with data
130/// from a query. This is similar to the `INSERT` operation, but it creates a new
131/// table instead of modifying an existing one.
132///
133/// Note that the structure is adapted from substrait WriteRel)
134///
135/// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/datasource/trait.TableProvider.html
136/// [`TableProvider::insert_into`]: https://docs.rs/datafusion/latest/datafusion/datasource/trait.TableProvider.html#method.insert_into
137#[derive(Clone)]
138pub struct DmlStatement {
139 /// The table name
140 pub table_name: TableReference,
141 /// this is target table to insert into
142 pub target: Arc<dyn TableSource>,
143 /// The type of operation to perform
144 pub op: WriteOp,
145 /// The relation that determines the tuples to add/remove/modify the schema must match with table_schema
146 pub input: Arc<LogicalPlan>,
147 /// The schema of the output relation
148 pub output_schema: DFSchemaRef,
149}
150impl Eq for DmlStatement {}
151impl Hash for DmlStatement {
152 fn hash<H: Hasher>(&self, state: &mut H) {
153 self.table_name.hash(state);
154 self.target.schema().hash(state);
155 self.op.hash(state);
156 self.input.hash(state);
157 self.output_schema.hash(state);
158 }
159}
160
161impl PartialEq for DmlStatement {
162 fn eq(&self, other: &Self) -> bool {
163 self.table_name == other.table_name
164 && self.target.schema() == other.target.schema()
165 && self.op == other.op
166 && self.input == other.input
167 && self.output_schema == other.output_schema
168 }
169}
170
171impl Debug for DmlStatement {
172 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
173 f.debug_struct("DmlStatement")
174 .field("table_name", &self.table_name)
175 .field("target", &"...")
176 .field("target_schema", &self.target.schema())
177 .field("op", &self.op)
178 .field("input", &self.input)
179 .field("output_schema", &self.output_schema)
180 .finish()
181 }
182}
183
184impl DmlStatement {
185 /// Creates a new DML statement with the output schema set to a single `count` column.
186 pub fn new(
187 table_name: TableReference,
188 target: Arc<dyn TableSource>,
189 op: WriteOp,
190 input: Arc<LogicalPlan>,
191 ) -> Self {
192 Self {
193 table_name,
194 target,
195 op,
196 input,
197
198 // The output schema is always a single column with the number of rows affected
199 output_schema: make_count_schema(),
200 }
201 }
202
203 /// Return a descriptive name of this [`DmlStatement`]
204 pub fn name(&self) -> &str {
205 self.op.name()
206 }
207}
208
209// Manual implementation needed because of `table_schema` and `output_schema` fields.
210// Comparison excludes these fields.
211impl PartialOrd for DmlStatement {
212 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
213 match self.table_name.partial_cmp(&other.table_name) {
214 Some(Ordering::Equal) => match self.op.partial_cmp(&other.op) {
215 Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
216 cmp => cmp,
217 },
218 cmp => cmp,
219 }
220 }
221}
222
223/// The type of DML operation to perform.
224///
225/// See [`DmlStatement`] for more details.
226#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
227pub enum WriteOp {
228 /// `INSERT INTO` operation
229 Insert(InsertOp),
230 /// `DELETE` operation
231 Delete,
232 /// `UPDATE` operation
233 Update,
234 /// `CREATE TABLE AS SELECT` operation
235 Ctas,
236}
237
238impl WriteOp {
239 /// Return a descriptive name of this [`WriteOp`]
240 pub fn name(&self) -> &str {
241 match self {
242 WriteOp::Insert(insert) => insert.name(),
243 WriteOp::Delete => "Delete",
244 WriteOp::Update => "Update",
245 WriteOp::Ctas => "Ctas",
246 }
247 }
248}
249
250impl Display for WriteOp {
251 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
252 write!(f, "{}", self.name())
253 }
254}
255
256#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)]
257pub enum InsertOp {
258 /// Appends new rows to the existing table without modifying any
259 /// existing rows. This corresponds to the SQL `INSERT INTO` query.
260 Append,
261 /// Overwrites all existing rows in the table with the new rows.
262 /// This corresponds to the SQL `INSERT OVERWRITE` query.
263 Overwrite,
264 /// If any existing rows collides with the inserted rows (typically based
265 /// on a unique key or primary key), those existing rows are replaced.
266 /// This corresponds to the SQL `REPLACE INTO` query and its equivalents.
267 Replace,
268}
269
270impl InsertOp {
271 /// Return a descriptive name of this [`InsertOp`]
272 pub fn name(&self) -> &str {
273 match self {
274 InsertOp::Append => "Insert Into",
275 InsertOp::Overwrite => "Insert Overwrite",
276 InsertOp::Replace => "Replace Into",
277 }
278 }
279}
280
281impl Display for InsertOp {
282 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
283 write!(f, "{}", self.name())
284 }
285}
286
287fn make_count_schema() -> DFSchemaRef {
288 Arc::new(
289 Schema::new(vec![Field::new("count", DataType::UInt64, false)])
290 .try_into()
291 .unwrap(),
292 )
293}