1mod aggregate;
2mod analyze;
3mod create_index;
4mod create_table;
5mod delete;
6mod drop_index;
7mod drop_table;
8mod empty_relation;
9mod filter;
10mod insert;
11mod join;
12mod limit;
13mod project;
14mod sort;
15mod table_scan;
16mod update;
17mod util;
18mod values;
19
20pub use aggregate::Aggregate;
21pub use analyze::Analyze;
22pub use create_index::CreateIndex;
23pub use create_table::CreateTable;
24pub use delete::Delete;
25pub use drop_index::DropIndex;
26pub use drop_table::DropTable;
27pub use empty_relation::EmptyRelation;
28pub use filter::Filter;
29pub use insert::Insert;
30pub use join::{Join, JoinType};
31pub use limit::Limit;
32pub use project::Project;
33pub use sort::{OrderByExpr, Sort};
34pub use table_scan::TableScan;
35pub use update::Update;
36pub use util::*;
37pub use values::Values;
38
39use crate::catalog::{
40 SchemaRef, DELETE_OUTPUT_SCHEMA_REF, EMPTY_SCHEMA_REF, INSERT_OUTPUT_SCHEMA_REF,
41 UPDATE_OUTPUT_SCHEMA_REF,
42};
43use crate::error::{QuillSQLError, QuillSQLResult};
44use crate::transaction::IsolationLevel;
45use sqlparser::ast::{TransactionAccessMode, TransactionMode};
46use std::sync::Arc;
47
48#[derive(Debug, Clone)]
49pub enum LogicalPlan {
50 CreateTable(CreateTable),
51 CreateIndex(CreateIndex),
52 DropTable(DropTable),
53 DropIndex(DropIndex),
54 Filter(Filter),
55 Insert(Insert),
56 Join(Join),
57 Limit(Limit),
58 Project(Project),
59 TableScan(TableScan),
60 Sort(Sort),
61 Values(Values),
62 EmptyRelation(EmptyRelation),
63 Aggregate(Aggregate),
64 Update(Update),
65 Delete(Delete),
66 Analyze(Analyze),
67 BeginTransaction(TransactionModes),
68 CommitTransaction,
69 RollbackTransaction,
70 SetTransaction {
71 scope: TransactionScope,
72 modes: TransactionModes,
73 },
74}
75
76impl LogicalPlan {
77 pub fn schema(&self) -> &SchemaRef {
78 match self {
79 LogicalPlan::CreateTable(_) => &EMPTY_SCHEMA_REF,
80 LogicalPlan::CreateIndex(_) => &EMPTY_SCHEMA_REF,
81 LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
82 LogicalPlan::Insert(_) => &INSERT_OUTPUT_SCHEMA_REF,
83 LogicalPlan::Join(Join { schema, .. }) => schema,
84 LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
85 LogicalPlan::Project(Project { schema, .. }) => schema,
86 LogicalPlan::TableScan(TableScan { table_schema, .. }) => table_schema,
87 LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
88 LogicalPlan::Values(Values { schema, .. }) => schema,
89 LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema,
90 LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
91 LogicalPlan::Update(_) => &UPDATE_OUTPUT_SCHEMA_REF,
92 LogicalPlan::Delete(_) => &DELETE_OUTPUT_SCHEMA_REF,
93 LogicalPlan::DropTable(_) => &EMPTY_SCHEMA_REF,
94 LogicalPlan::DropIndex(_) => &EMPTY_SCHEMA_REF,
95 LogicalPlan::Analyze(_) => &EMPTY_SCHEMA_REF,
96 LogicalPlan::BeginTransaction(_)
97 | LogicalPlan::CommitTransaction
98 | LogicalPlan::RollbackTransaction
99 | LogicalPlan::SetTransaction { .. } => &EMPTY_SCHEMA_REF,
100 }
101 }
102
103 pub fn inputs(&self) -> Vec<&LogicalPlan> {
104 match self {
105 LogicalPlan::Filter(Filter { input, .. }) => vec![input],
106 LogicalPlan::Insert(Insert { input, .. }) => vec![input],
107 LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
108 LogicalPlan::Limit(Limit { input, .. }) => vec![input],
109 LogicalPlan::Project(Project { input, .. }) => vec![input],
110 LogicalPlan::Sort(Sort { input, .. }) => vec![input],
111 LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input],
112 LogicalPlan::CreateTable(_)
113 | LogicalPlan::CreateIndex(_)
114 | LogicalPlan::DropTable(_)
115 | LogicalPlan::DropIndex(_)
116 | LogicalPlan::TableScan(_)
117 | LogicalPlan::Values(_)
118 | LogicalPlan::Update(_)
119 | LogicalPlan::Delete(_)
120 | LogicalPlan::EmptyRelation(_)
121 | LogicalPlan::Analyze(_)
122 | LogicalPlan::BeginTransaction(_)
123 | LogicalPlan::CommitTransaction
124 | LogicalPlan::RollbackTransaction
125 | LogicalPlan::SetTransaction { .. } => vec![],
126 }
127 }
128
129 pub fn with_new_inputs(&self, inputs: &[LogicalPlan]) -> QuillSQLResult<LogicalPlan> {
130 match self {
131 LogicalPlan::Filter(Filter { predicate, .. }) => Ok(LogicalPlan::Filter(Filter {
132 predicate: predicate.clone(),
133 input: expect_single_input(inputs)?,
134 })),
135 LogicalPlan::Insert(Insert {
136 table,
137 table_schema,
138 projected_schema,
139 ..
140 }) => Ok(LogicalPlan::Insert(Insert {
141 table: table.clone(),
142 table_schema: table_schema.clone(),
143 projected_schema: projected_schema.clone(),
144 input: expect_single_input(inputs)?,
145 })),
146 LogicalPlan::Join(Join {
147 join_type,
148 condition,
149 schema,
150 ..
151 }) => {
152 let (left, right) = expect_two_inputs(inputs)?;
153 Ok(LogicalPlan::Join(Join {
154 left,
155 right,
156 join_type: *join_type,
157 condition: condition.clone(),
158 schema: schema.clone(),
159 }))
160 }
161 LogicalPlan::Limit(Limit { limit, offset, .. }) => Ok(LogicalPlan::Limit(Limit {
162 limit: *limit,
163 offset: *offset,
164 input: expect_single_input(inputs)?,
165 })),
166 LogicalPlan::Project(Project { exprs, schema, .. }) => {
167 Ok(LogicalPlan::Project(Project {
168 exprs: exprs.clone(),
169 schema: schema.clone(),
170 input: expect_single_input(inputs)?,
171 }))
172 }
173 LogicalPlan::Sort(Sort {
174 order_by, limit, ..
175 }) => Ok(LogicalPlan::Sort(Sort {
176 order_by: order_by.clone(),
177 limit: *limit,
178 input: expect_single_input(inputs)?,
179 })),
180 LogicalPlan::Aggregate(Aggregate {
181 group_exprs,
182 aggr_exprs,
183 schema,
184 ..
185 }) => Ok(LogicalPlan::Aggregate(Aggregate {
186 group_exprs: group_exprs.clone(),
187 aggr_exprs: aggr_exprs.clone(),
188 schema: schema.clone(),
189 input: expect_single_input(inputs)?,
190 })),
191 LogicalPlan::CreateTable(_)
192 | LogicalPlan::CreateIndex(_)
193 | LogicalPlan::DropTable(_)
194 | LogicalPlan::DropIndex(_)
195 | LogicalPlan::TableScan(_)
196 | LogicalPlan::Values(_)
197 | LogicalPlan::Update(_)
198 | LogicalPlan::Delete(_)
199 | LogicalPlan::EmptyRelation(_)
200 | LogicalPlan::Analyze(_)
201 | LogicalPlan::BeginTransaction(_)
202 | LogicalPlan::CommitTransaction
203 | LogicalPlan::RollbackTransaction
204 | LogicalPlan::SetTransaction { .. } => Ok(self.clone()),
205 }
206 }
207}
208
209fn expect_single_input(inputs: &[LogicalPlan]) -> QuillSQLResult<Arc<LogicalPlan>> {
210 expect_input_at(inputs, 0, "should have at least one input")
211}
212
213fn expect_two_inputs(
214 inputs: &[LogicalPlan],
215) -> QuillSQLResult<(Arc<LogicalPlan>, Arc<LogicalPlan>)> {
216 let left = expect_input_at(inputs, 0, "should have at least two inputs")?;
217 let right = expect_input_at(inputs, 1, "should have at least two inputs")?;
218 Ok((left, right))
219}
220
221fn expect_input_at(
222 inputs: &[LogicalPlan],
223 index: usize,
224 expectation: &str,
225) -> QuillSQLResult<Arc<LogicalPlan>> {
226 inputs
227 .get(index)
228 .cloned()
229 .map(Arc::new)
230 .ok_or_else(|| QuillSQLError::Internal(format!("inputs {:?} {}", inputs, expectation)))
231}
232
233impl std::fmt::Display for LogicalPlan {
234 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
235 match self {
236 LogicalPlan::CreateTable(v) => write!(f, "{v}"),
237 LogicalPlan::CreateIndex(v) => write!(f, "{v}"),
238 LogicalPlan::DropTable(v) => write!(f, "{v}"),
239 LogicalPlan::DropIndex(v) => write!(f, "{v}"),
240 LogicalPlan::Filter(v) => write!(f, "{v}"),
241 LogicalPlan::Insert(v) => write!(f, "{v}"),
242 LogicalPlan::Join(v) => write!(f, "{v}"),
243 LogicalPlan::Limit(v) => write!(f, "{v}"),
244 LogicalPlan::Project(v) => write!(f, "{v}"),
245 LogicalPlan::TableScan(v) => write!(f, "{v}"),
246 LogicalPlan::Sort(v) => write!(f, "{v}"),
247 LogicalPlan::Values(v) => write!(f, "{v}"),
248 LogicalPlan::EmptyRelation(v) => write!(f, "{v}"),
249 LogicalPlan::Aggregate(v) => write!(f, "{v}"),
250 LogicalPlan::Update(v) => write!(f, "{v}"),
251 LogicalPlan::Delete(v) => write!(f, "{v}"),
252 LogicalPlan::Analyze(v) => write!(f, "Analyze: {}", v.table),
253 LogicalPlan::BeginTransaction(_) => write!(f, "BeginTransaction"),
254 LogicalPlan::CommitTransaction => write!(f, "CommitTransaction"),
255 LogicalPlan::RollbackTransaction => write!(f, "RollbackTransaction"),
256 LogicalPlan::SetTransaction { .. } => write!(f, "SetTransaction"),
257 }
258 }
259}
260
261#[derive(Debug, Clone, Copy)]
262pub enum TransactionScope {
263 Session,
264 Transaction,
265}
266
267#[derive(Debug, Clone, Default)]
268pub struct TransactionModes {
269 pub isolation_level: Option<IsolationLevel>,
270 pub access_mode: Option<TransactionAccessMode>,
271}
272
273impl TransactionModes {
274 pub fn from_modes(modes: &[TransactionMode]) -> Self {
275 let mut result = TransactionModes::default();
276 for mode in modes {
277 match mode {
278 TransactionMode::IsolationLevel(level) => {
279 let isolation = match level {
280 sqlparser::ast::TransactionIsolationLevel::ReadUncommitted => {
281 IsolationLevel::ReadUncommitted
282 }
283 sqlparser::ast::TransactionIsolationLevel::ReadCommitted => {
284 IsolationLevel::ReadCommitted
285 }
286 sqlparser::ast::TransactionIsolationLevel::RepeatableRead => {
287 IsolationLevel::RepeatableRead
288 }
289 sqlparser::ast::TransactionIsolationLevel::Serializable => {
290 IsolationLevel::Serializable
291 }
292 };
293 result.isolation_level = Some(isolation);
294 }
295 TransactionMode::AccessMode(mode) => {
296 result.access_mode = Some(*mode);
297 }
298 }
299 }
300 result
301 }
302
303 pub fn unwrap_effective_isolation(&self, fallback: IsolationLevel) -> IsolationLevel {
304 self.isolation_level.unwrap_or(fallback)
305 }
306
307 pub fn unwrap_effective_access_mode(
308 &self,
309 fallback: TransactionAccessMode,
310 ) -> TransactionAccessMode {
311 self.access_mode.unwrap_or(fallback)
312 }
313}