datafusion_expr/logical_plan/
dml.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::cmp::Ordering;
19use std::collections::HashMap;
20use std::fmt::{self, Debug, Display, Formatter};
21use std::hash::{Hash, Hasher};
22use std::sync::Arc;
23
24use arrow::datatypes::{DataType, Field, Schema};
25use datafusion_common::file_options::file_type::FileType;
26use datafusion_common::{DFSchemaRef, TableReference};
27
28use crate::{LogicalPlan, TableSource};
29
30/// Operator that copies the contents of a database to file(s)
31#[derive(Clone)]
32pub struct CopyTo {
33    /// The relation that determines the tuples to write to the output file(s)
34    pub input: Arc<LogicalPlan>,
35    /// The location to write the file(s)
36    pub output_url: String,
37    /// Determines which, if any, columns should be used for hive-style partitioned writes
38    pub partition_by: Vec<String>,
39    /// File type trait
40    pub file_type: Arc<dyn FileType>,
41    /// SQL Options that can affect the formats
42    pub options: HashMap<String, String>,
43    /// The schema of the output (a single column "count")
44    pub output_schema: DFSchemaRef,
45}
46
47impl Debug for CopyTo {
48    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
49        f.debug_struct("CopyTo")
50            .field("input", &self.input)
51            .field("output_url", &self.output_url)
52            .field("partition_by", &self.partition_by)
53            .field("file_type", &"...")
54            .field("options", &self.options)
55            .field("output_schema", &self.output_schema)
56            .finish_non_exhaustive()
57    }
58}
59
60// Implement PartialEq manually
61impl PartialEq for CopyTo {
62    fn eq(&self, other: &Self) -> bool {
63        self.input == other.input && self.output_url == other.output_url
64    }
65}
66
67// Implement Eq (no need for additional logic over PartialEq)
68impl Eq for CopyTo {}
69
70// Manual implementation needed because of `file_type` and `options` fields.
71// Comparison excludes these field.
72impl PartialOrd for CopyTo {
73    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
74        match self.input.partial_cmp(&other.input) {
75            Some(Ordering::Equal) => match self.output_url.partial_cmp(&other.output_url)
76            {
77                Some(Ordering::Equal) => {
78                    self.partition_by.partial_cmp(&other.partition_by)
79                }
80                cmp => cmp,
81            },
82            cmp => cmp,
83        }
84        // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
85        .filter(|cmp| *cmp != Ordering::Equal || self == other)
86    }
87}
88
89// Implement Hash manually
90impl Hash for CopyTo {
91    fn hash<H: Hasher>(&self, state: &mut H) {
92        self.input.hash(state);
93        self.output_url.hash(state);
94    }
95}
96
97impl CopyTo {
98    pub fn new(
99        input: Arc<LogicalPlan>,
100        output_url: String,
101        partition_by: Vec<String>,
102        file_type: Arc<dyn FileType>,
103        options: HashMap<String, String>,
104    ) -> Self {
105        Self {
106            input,
107            output_url,
108            partition_by,
109            file_type,
110            options,
111            // The output schema is always a single column "count" with the number of rows copied
112            output_schema: make_count_schema(),
113        }
114    }
115}
116
117/// Modifies the content of a database
118///
119/// This operator is used to perform DML operations such as INSERT, DELETE,
120/// UPDATE, and CTAS (CREATE TABLE AS SELECT).
121///
122/// * `INSERT` - Appends new rows to the existing table. Calls
123///   [`TableProvider::insert_into`]
124///
125/// * `DELETE` - Removes rows from the table. Currently NOT supported by the
126///   [`TableProvider`] trait or builtin sources.
127///
128/// * `UPDATE` - Modifies existing rows in the table. Currently NOT supported by
129///   the [`TableProvider`] trait or builtin sources.
130///
131/// * `CREATE TABLE AS SELECT` - Creates a new table and populates it with data
132///   from a query. This is similar to the `INSERT` operation, but it creates a new
133///   table instead of modifying an existing one.
134///
135/// Note that the structure is adapted from substrait WriteRel)
136///
137/// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/datasource/trait.TableProvider.html
138/// [`TableProvider::insert_into`]: https://docs.rs/datafusion/latest/datafusion/datasource/trait.TableProvider.html#method.insert_into
139#[derive(Clone)]
140pub struct DmlStatement {
141    /// The table name
142    pub table_name: TableReference,
143    /// this is target table to insert into
144    pub target: Arc<dyn TableSource>,
145    /// The type of operation to perform
146    pub op: WriteOp,
147    /// The relation that determines the tuples to add/remove/modify the schema must match with table_schema
148    pub input: Arc<LogicalPlan>,
149    /// The schema of the output relation
150    pub output_schema: DFSchemaRef,
151}
152impl Eq for DmlStatement {}
153impl Hash for DmlStatement {
154    fn hash<H: Hasher>(&self, state: &mut H) {
155        self.table_name.hash(state);
156        self.target.schema().hash(state);
157        self.op.hash(state);
158        self.input.hash(state);
159        self.output_schema.hash(state);
160    }
161}
162
163impl PartialEq for DmlStatement {
164    fn eq(&self, other: &Self) -> bool {
165        self.table_name == other.table_name
166            && self.target.schema() == other.target.schema()
167            && self.op == other.op
168            && self.input == other.input
169            && self.output_schema == other.output_schema
170    }
171}
172
173impl Debug for DmlStatement {
174    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
175        f.debug_struct("DmlStatement")
176            .field("table_name", &self.table_name)
177            .field("target", &"...")
178            .field("target_schema", &self.target.schema())
179            .field("op", &self.op)
180            .field("input", &self.input)
181            .field("output_schema", &self.output_schema)
182            .finish()
183    }
184}
185
186impl DmlStatement {
187    /// Creates a new DML statement with the output schema set to a single `count` column.
188    pub fn new(
189        table_name: TableReference,
190        target: Arc<dyn TableSource>,
191        op: WriteOp,
192        input: Arc<LogicalPlan>,
193    ) -> Self {
194        Self {
195            table_name,
196            target,
197            op,
198            input,
199
200            // The output schema is always a single column with the number of rows affected
201            output_schema: make_count_schema(),
202        }
203    }
204
205    /// Return a descriptive name of this [`DmlStatement`]
206    pub fn name(&self) -> &str {
207        self.op.name()
208    }
209}
210
211// Manual implementation needed because of `table_schema` and `output_schema` fields.
212// Comparison excludes these fields.
213impl PartialOrd for DmlStatement {
214    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
215        match self.table_name.partial_cmp(&other.table_name) {
216            Some(Ordering::Equal) => match self.op.partial_cmp(&other.op) {
217                Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
218                cmp => cmp,
219            },
220            cmp => cmp,
221        }
222        // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
223        .filter(|cmp| *cmp != Ordering::Equal || self == other)
224    }
225}
226
227/// The type of DML operation to perform.
228///
229/// See [`DmlStatement`] for more details.
230#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
231pub enum WriteOp {
232    /// `INSERT INTO` operation
233    Insert(InsertOp),
234    /// `DELETE` operation
235    Delete,
236    /// `UPDATE` operation
237    Update,
238    /// `CREATE TABLE AS SELECT` operation
239    Ctas,
240}
241
242impl WriteOp {
243    /// Return a descriptive name of this [`WriteOp`]
244    pub fn name(&self) -> &str {
245        match self {
246            WriteOp::Insert(insert) => insert.name(),
247            WriteOp::Delete => "Delete",
248            WriteOp::Update => "Update",
249            WriteOp::Ctas => "Ctas",
250        }
251    }
252}
253
254impl Display for WriteOp {
255    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
256        write!(f, "{}", self.name())
257    }
258}
259
260#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)]
261pub enum InsertOp {
262    /// Appends new rows to the existing table without modifying any
263    /// existing rows. This corresponds to the SQL `INSERT INTO` query.
264    Append,
265    /// Overwrites all existing rows in the table with the new rows.
266    /// This corresponds to the SQL `INSERT OVERWRITE` query.
267    Overwrite,
268    /// If any existing rows collides with the inserted rows (typically based
269    /// on a unique key or primary key), those existing rows are replaced.
270    /// This corresponds to the SQL `REPLACE INTO` query and its equivalents.
271    Replace,
272}
273
274impl InsertOp {
275    /// Return a descriptive name of this [`InsertOp`]
276    pub fn name(&self) -> &str {
277        match self {
278            InsertOp::Append => "Insert Into",
279            InsertOp::Overwrite => "Insert Overwrite",
280            InsertOp::Replace => "Replace Into",
281        }
282    }
283}
284
285impl Display for InsertOp {
286    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
287        write!(f, "{}", self.name())
288    }
289}
290
291fn make_count_schema() -> DFSchemaRef {
292    Arc::new(
293        Schema::new(vec![Field::new("count", DataType::UInt64, false)])
294            .try_into()
295            .unwrap(),
296    )
297}