datafusion_expr/logical_plan/
statement.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow::datatypes::FieldRef;
19use datafusion_common::metadata::format_type_and_metadata;
20use datafusion_common::{DFSchema, DFSchemaRef};
21use itertools::Itertools as _;
22use std::fmt::{self, Display};
23use std::sync::{Arc, LazyLock};
24
25use crate::{expr_vec_fmt, Expr, LogicalPlan};
26
27/// Various types of Statements.
28///
29/// # Transactions:
30///
31/// While DataFusion does not offer support transactions, it provides
32/// [`LogicalPlan`] support to assist building database systems
33/// using DataFusion
34#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
35pub enum Statement {
36    // Begin a transaction
37    TransactionStart(TransactionStart),
38    // Commit or rollback a transaction
39    TransactionEnd(TransactionEnd),
40    /// Set a Variable
41    SetVariable(SetVariable),
42    /// Prepare a statement and find any bind parameters
43    /// (e.g. `?`). This is used to implement SQL-prepared statements.
44    Prepare(Prepare),
45    /// Execute a prepared statement. This is used to implement SQL 'EXECUTE'.
46    Execute(Execute),
47    /// Deallocate a prepared statement.
48    /// This is used to implement SQL 'DEALLOCATE'.
49    Deallocate(Deallocate),
50}
51
52impl Statement {
53    /// Get a reference to the logical plan's schema
54    pub fn schema(&self) -> &DFSchemaRef {
55        // Statements have an unchanging empty schema.
56        static STATEMENT_EMPTY_SCHEMA: LazyLock<DFSchemaRef> =
57            LazyLock::new(|| Arc::new(DFSchema::empty()));
58
59        &STATEMENT_EMPTY_SCHEMA
60    }
61
62    /// Return a descriptive string describing the type of this
63    /// [`Statement`]
64    pub fn name(&self) -> &str {
65        match self {
66            Statement::TransactionStart(_) => "TransactionStart",
67            Statement::TransactionEnd(_) => "TransactionEnd",
68            Statement::SetVariable(_) => "SetVariable",
69            Statement::Prepare(_) => "Prepare",
70            Statement::Execute(_) => "Execute",
71            Statement::Deallocate(_) => "Deallocate",
72        }
73    }
74
75    /// Returns input LogicalPlans in the current `Statement`.
76    pub(super) fn inputs(&self) -> Vec<&LogicalPlan> {
77        match self {
78            Statement::Prepare(Prepare { input, .. }) => vec![input.as_ref()],
79            _ => vec![],
80        }
81    }
82
83    /// Return a `format`able structure with the a human readable
84    /// description of this LogicalPlan node per node, not including
85    /// children.
86    ///
87    /// See [crate::LogicalPlan::display] for an example
88    pub fn display(&self) -> impl Display + '_ {
89        struct Wrapper<'a>(&'a Statement);
90        impl Display for Wrapper<'_> {
91            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
92                match self.0 {
93                    Statement::TransactionStart(TransactionStart {
94                        access_mode,
95                        isolation_level,
96                        ..
97                    }) => {
98                        write!(f, "TransactionStart: {access_mode:?} {isolation_level:?}")
99                    }
100                    Statement::TransactionEnd(TransactionEnd {
101                        conclusion,
102                        chain,
103                        ..
104                    }) => {
105                        write!(f, "TransactionEnd: {conclusion:?} chain:={chain}")
106                    }
107                    Statement::SetVariable(SetVariable {
108                        variable, value, ..
109                    }) => {
110                        write!(f, "SetVariable: set {variable:?} to {value:?}")
111                    }
112                    Statement::Prepare(Prepare { name, fields, .. }) => {
113                        write!(
114                            f,
115                            "Prepare: {name:?} [{}]",
116                            fields
117                                .iter()
118                                .map(|f| format_type_and_metadata(
119                                    f.data_type(),
120                                    Some(f.metadata())
121                                ))
122                                .join(", ")
123                        )
124                    }
125                    Statement::Execute(Execute {
126                        name, parameters, ..
127                    }) => {
128                        write!(
129                            f,
130                            "Execute: {} params=[{}]",
131                            name,
132                            expr_vec_fmt!(parameters)
133                        )
134                    }
135                    Statement::Deallocate(Deallocate { name }) => {
136                        write!(f, "Deallocate: {name}")
137                    }
138                }
139            }
140        }
141        Wrapper(self)
142    }
143}
144
145/// Indicates if a transaction was committed or aborted
146#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
147pub enum TransactionConclusion {
148    Commit,
149    Rollback,
150}
151
152/// Indicates if this transaction is allowed to write
153#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
154pub enum TransactionAccessMode {
155    ReadOnly,
156    ReadWrite,
157}
158
159/// Indicates ANSI transaction isolation level
160#[derive(Clone, PartialEq, Eq, PartialOrd, Hash, Debug)]
161pub enum TransactionIsolationLevel {
162    ReadUncommitted,
163    ReadCommitted,
164    RepeatableRead,
165    Serializable,
166    Snapshot,
167}
168
169/// Indicator that the following statements should be committed or rolled back atomically
170#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Hash)]
171pub struct TransactionStart {
172    /// indicates if transaction is allowed to write
173    pub access_mode: TransactionAccessMode,
174    // indicates ANSI isolation level
175    pub isolation_level: TransactionIsolationLevel,
176}
177
178/// Indicator that any current transaction should be terminated
179#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Hash)]
180pub struct TransactionEnd {
181    /// whether the transaction committed or aborted
182    pub conclusion: TransactionConclusion,
183    /// if specified a new transaction is immediately started with same characteristics
184    pub chain: bool,
185}
186
187/// Set a Variable's value -- value in
188/// [`ConfigOptions`](datafusion_common::config::ConfigOptions)
189#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Hash)]
190pub struct SetVariable {
191    /// The variable name
192    pub variable: String,
193    /// The value to set
194    pub value: String,
195}
196
197/// Prepare a statement but do not execute it. Prepare statements can have 0 or more
198/// `Expr::Placeholder` expressions that are filled in during execution
199#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
200pub struct Prepare {
201    /// The name of the statement
202    pub name: String,
203    /// Data types of the parameters ([`Expr::Placeholder`])
204    pub fields: Vec<FieldRef>,
205    /// The logical plan of the statements
206    pub input: Arc<LogicalPlan>,
207}
208
209/// Execute a prepared statement.
210#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Hash)]
211pub struct Execute {
212    /// The name of the prepared statement to execute
213    pub name: String,
214    /// The execute parameters
215    pub parameters: Vec<Expr>,
216}
217
218/// Deallocate a prepared statement.
219#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Hash)]
220pub struct Deallocate {
221    /// The name of the prepared statement to deallocate
222    pub name: String,
223}