1mod aggregate;
2mod create_index;
3mod create_table;
4mod delete;
5mod empty_relation;
6mod filter;
7mod insert;
8mod join;
9mod limit;
10mod project;
11mod sort;
12mod table_scan;
13mod update;
14mod util;
15mod values;
16
17pub use aggregate::Aggregate;
18pub use create_index::CreateIndex;
19pub use create_table::CreateTable;
20pub use delete::Delete;
21pub use empty_relation::EmptyRelation;
22pub use filter::Filter;
23pub use insert::Insert;
24pub use join::{Join, JoinType};
25pub use limit::Limit;
26pub use project::Project;
27pub use sort::{OrderByExpr, Sort};
28pub use table_scan::TableScan;
29pub use update::Update;
30pub use util::*;
31pub use values::Values;
32
33use crate::catalog::{
34 SchemaRef, DELETE_OUTPUT_SCHEMA_REF, EMPTY_SCHEMA_REF, INSERT_OUTPUT_SCHEMA_REF,
35 UPDATE_OUTPUT_SCHEMA_REF,
36};
37use crate::error::{QuillSQLError, QuillSQLResult};
38use crate::transaction::IsolationLevel;
39use sqlparser::ast::{TransactionAccessMode, TransactionMode};
40use std::sync::Arc;
41
42#[derive(Debug, Clone)]
43pub enum LogicalPlan {
44 CreateTable(CreateTable),
45 CreateIndex(CreateIndex),
46 Filter(Filter),
47 Insert(Insert),
48 Join(Join),
49 Limit(Limit),
50 Project(Project),
51 TableScan(TableScan),
52 Sort(Sort),
53 Values(Values),
54 EmptyRelation(EmptyRelation),
55 Aggregate(Aggregate),
56 Update(Update),
57 Delete(Delete),
58 BeginTransaction(TransactionModes),
59 CommitTransaction,
60 RollbackTransaction,
61 SetTransaction {
62 scope: TransactionScope,
63 modes: TransactionModes,
64 },
65}
66
67impl LogicalPlan {
68 pub fn schema(&self) -> &SchemaRef {
69 match self {
70 LogicalPlan::CreateTable(_) => &EMPTY_SCHEMA_REF,
71 LogicalPlan::CreateIndex(_) => &EMPTY_SCHEMA_REF,
72 LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
73 LogicalPlan::Insert(_) => &INSERT_OUTPUT_SCHEMA_REF,
74 LogicalPlan::Join(Join { schema, .. }) => schema,
75 LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
76 LogicalPlan::Project(Project { schema, .. }) => schema,
77 LogicalPlan::TableScan(TableScan { table_schema, .. }) => table_schema,
78 LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
79 LogicalPlan::Values(Values { schema, .. }) => schema,
80 LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema,
81 LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
82 LogicalPlan::Update(_) => &UPDATE_OUTPUT_SCHEMA_REF,
83 LogicalPlan::Delete(_) => &DELETE_OUTPUT_SCHEMA_REF,
84 LogicalPlan::BeginTransaction(_)
85 | LogicalPlan::CommitTransaction
86 | LogicalPlan::RollbackTransaction
87 | LogicalPlan::SetTransaction { .. } => &EMPTY_SCHEMA_REF,
88 }
89 }
90
91 pub fn inputs(&self) -> Vec<&LogicalPlan> {
92 match self {
93 LogicalPlan::Filter(Filter { input, .. }) => vec![input],
94 LogicalPlan::Insert(Insert { input, .. }) => vec![input],
95 LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
96 LogicalPlan::Limit(Limit { input, .. }) => vec![input],
97 LogicalPlan::Project(Project { input, .. }) => vec![input],
98 LogicalPlan::Sort(Sort { input, .. }) => vec![input],
99 LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input],
100 LogicalPlan::CreateTable(_)
101 | LogicalPlan::CreateIndex(_)
102 | LogicalPlan::TableScan(_)
103 | LogicalPlan::Values(_)
104 | LogicalPlan::Update(_)
105 | LogicalPlan::Delete(_)
106 | LogicalPlan::EmptyRelation(_)
107 | LogicalPlan::BeginTransaction(_)
108 | LogicalPlan::CommitTransaction
109 | LogicalPlan::RollbackTransaction
110 | LogicalPlan::SetTransaction { .. } => vec![],
111 }
112 }
113
114 pub fn with_new_inputs(&self, inputs: &[LogicalPlan]) -> QuillSQLResult<LogicalPlan> {
115 match self {
116 LogicalPlan::Filter(Filter { predicate, .. }) => Ok(LogicalPlan::Filter(Filter {
117 predicate: predicate.clone(),
118 input: Arc::new(
119 inputs
120 .first()
121 .ok_or_else(|| {
122 QuillSQLError::Internal(format!(
123 "inputs {:?} should have at least one",
124 inputs
125 ))
126 })?
127 .clone(),
128 ),
129 })),
130 LogicalPlan::Insert(Insert {
131 table,
132 table_schema,
133 projected_schema,
134 ..
135 }) => Ok(LogicalPlan::Insert(Insert {
136 table: table.clone(),
137 table_schema: table_schema.clone(),
138 projected_schema: projected_schema.clone(),
139 input: Arc::new(
140 inputs
141 .first()
142 .ok_or_else(|| {
143 QuillSQLError::Internal(format!(
144 "inputs {:?} should have at least one",
145 inputs
146 ))
147 })?
148 .clone(),
149 ),
150 })),
151 LogicalPlan::Join(Join {
152 join_type,
153 condition,
154 schema,
155 ..
156 }) => Ok(LogicalPlan::Join(Join {
157 left: Arc::new(
158 inputs
159 .first()
160 .ok_or_else(|| {
161 QuillSQLError::Internal(format!(
162 "inputs {:?} should have at least two",
163 inputs
164 ))
165 })?
166 .clone(),
167 ),
168 right: Arc::new(
169 inputs
170 .first()
171 .ok_or_else(|| {
172 QuillSQLError::Internal(format!(
173 "inputs {:?} should have at least two",
174 inputs
175 ))
176 })?
177 .clone(),
178 ),
179 join_type: *join_type,
180 condition: condition.clone(),
181 schema: schema.clone(),
182 })),
183 LogicalPlan::Limit(Limit { limit, offset, .. }) => Ok(LogicalPlan::Limit(Limit {
184 limit: *limit,
185 offset: *offset,
186 input: Arc::new(
187 inputs
188 .first()
189 .ok_or_else(|| {
190 QuillSQLError::Internal(format!(
191 "inputs {:?} should have at least one",
192 inputs
193 ))
194 })?
195 .clone(),
196 ),
197 })),
198 LogicalPlan::Project(Project { exprs, schema, .. }) => {
199 Ok(LogicalPlan::Project(Project {
200 exprs: exprs.clone(),
201 schema: schema.clone(),
202 input: Arc::new(
203 inputs
204 .first()
205 .ok_or_else(|| {
206 QuillSQLError::Internal(format!(
207 "inputs {:?} should have at least one",
208 inputs
209 ))
210 })?
211 .clone(),
212 ),
213 }))
214 }
215 LogicalPlan::Sort(Sort {
216 order_by, limit, ..
217 }) => Ok(LogicalPlan::Sort(Sort {
218 order_by: order_by.clone(),
219 limit: *limit,
220 input: Arc::new(
221 inputs
222 .first()
223 .ok_or_else(|| {
224 QuillSQLError::Internal(format!(
225 "inputs {:?} should have at least one",
226 inputs
227 ))
228 })?
229 .clone(),
230 ),
231 })),
232 LogicalPlan::Aggregate(Aggregate {
233 group_exprs,
234 aggr_exprs,
235 schema,
236 ..
237 }) => Ok(LogicalPlan::Aggregate(Aggregate {
238 group_exprs: group_exprs.clone(),
239 aggr_exprs: aggr_exprs.clone(),
240 schema: schema.clone(),
241 input: Arc::new(
242 inputs
243 .first()
244 .ok_or_else(|| {
245 QuillSQLError::Internal(format!(
246 "inputs {:?} should have at least one",
247 inputs
248 ))
249 })?
250 .clone(),
251 ),
252 })),
253 LogicalPlan::CreateTable(_)
254 | LogicalPlan::CreateIndex(_)
255 | LogicalPlan::TableScan(_)
256 | LogicalPlan::Values(_)
257 | LogicalPlan::Update(_)
258 | LogicalPlan::Delete(_)
259 | LogicalPlan::EmptyRelation(_)
260 | LogicalPlan::BeginTransaction(_)
261 | LogicalPlan::CommitTransaction
262 | LogicalPlan::RollbackTransaction
263 | LogicalPlan::SetTransaction { .. } => Ok(self.clone()),
264 }
265 }
266}
267
268impl std::fmt::Display for LogicalPlan {
269 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
270 match self {
271 LogicalPlan::CreateTable(v) => write!(f, "{v}"),
272 LogicalPlan::CreateIndex(v) => write!(f, "{v}"),
273 LogicalPlan::Filter(v) => write!(f, "{v}"),
274 LogicalPlan::Insert(v) => write!(f, "{v}"),
275 LogicalPlan::Join(v) => write!(f, "{v}"),
276 LogicalPlan::Limit(v) => write!(f, "{v}"),
277 LogicalPlan::Project(v) => write!(f, "{v}"),
278 LogicalPlan::TableScan(v) => write!(f, "{v}"),
279 LogicalPlan::Sort(v) => write!(f, "{v}"),
280 LogicalPlan::Values(v) => write!(f, "{v}"),
281 LogicalPlan::EmptyRelation(v) => write!(f, "{v}"),
282 LogicalPlan::Aggregate(v) => write!(f, "{v}"),
283 LogicalPlan::Update(v) => write!(f, "{v}"),
284 LogicalPlan::Delete(v) => write!(f, "{v}"),
285 LogicalPlan::BeginTransaction(_) => write!(f, "BeginTransaction"),
286 LogicalPlan::CommitTransaction => write!(f, "CommitTransaction"),
287 LogicalPlan::RollbackTransaction => write!(f, "RollbackTransaction"),
288 LogicalPlan::SetTransaction { .. } => write!(f, "SetTransaction"),
289 }
290 }
291}
292
293#[derive(Debug, Clone, Copy)]
294pub enum TransactionScope {
295 Session,
296 Transaction,
297}
298
299#[derive(Debug, Clone, Default)]
300pub struct TransactionModes {
301 pub isolation_level: Option<IsolationLevel>,
302 pub access_mode: Option<TransactionAccessMode>,
303}
304
305impl TransactionModes {
306 pub fn from_modes(modes: &[TransactionMode]) -> Self {
307 let mut result = TransactionModes::default();
308 for mode in modes {
309 match mode {
310 TransactionMode::IsolationLevel(level) => {
311 let isolation = match level {
312 sqlparser::ast::TransactionIsolationLevel::ReadUncommitted => {
313 IsolationLevel::ReadUncommitted
314 }
315 sqlparser::ast::TransactionIsolationLevel::ReadCommitted => {
316 IsolationLevel::ReadCommitted
317 }
318 sqlparser::ast::TransactionIsolationLevel::RepeatableRead => {
319 IsolationLevel::RepeatableRead
320 }
321 sqlparser::ast::TransactionIsolationLevel::Serializable => {
322 IsolationLevel::Serializable
323 }
324 };
325 result.isolation_level = Some(isolation);
326 }
327 TransactionMode::AccessMode(mode) => {
328 result.access_mode = Some(*mode);
329 }
330 }
331 }
332 result
333 }
334
335 pub fn unwrap_effective_isolation(&self, fallback: IsolationLevel) -> IsolationLevel {
336 self.isolation_level.unwrap_or(fallback)
337 }
338
339 pub fn unwrap_effective_access_mode(
340 &self,
341 fallback: TransactionAccessMode,
342 ) -> TransactionAccessMode {
343 self.access_mode.unwrap_or(fallback)
344 }
345}