1mod aggregate;
2mod create_index;
3mod create_table;
4mod delete;
5mod drop_index;
6mod drop_table;
7mod empty_relation;
8mod filter;
9mod insert;
10mod join;
11mod limit;
12mod project;
13mod sort;
14mod table_scan;
15mod update;
16mod util;
17mod values;
18
19pub use aggregate::Aggregate;
20pub use create_index::CreateIndex;
21pub use create_table::CreateTable;
22pub use delete::Delete;
23pub use drop_index::DropIndex;
24pub use drop_table::DropTable;
25pub use empty_relation::EmptyRelation;
26pub use filter::Filter;
27pub use insert::Insert;
28pub use join::{Join, JoinType};
29pub use limit::Limit;
30pub use project::Project;
31pub use sort::{OrderByExpr, Sort};
32pub use table_scan::TableScan;
33pub use update::Update;
34pub use util::*;
35pub use values::Values;
36
37use crate::catalog::{
38 SchemaRef, DELETE_OUTPUT_SCHEMA_REF, EMPTY_SCHEMA_REF, INSERT_OUTPUT_SCHEMA_REF,
39 UPDATE_OUTPUT_SCHEMA_REF,
40};
41use crate::error::{QuillSQLError, QuillSQLResult};
42use crate::transaction::IsolationLevel;
43use sqlparser::ast::{TransactionAccessMode, TransactionMode};
44use std::sync::Arc;
45
46#[derive(Debug, Clone)]
47pub enum LogicalPlan {
48 CreateTable(CreateTable),
49 CreateIndex(CreateIndex),
50 DropTable(DropTable),
51 DropIndex(DropIndex),
52 Filter(Filter),
53 Insert(Insert),
54 Join(Join),
55 Limit(Limit),
56 Project(Project),
57 TableScan(TableScan),
58 Sort(Sort),
59 Values(Values),
60 EmptyRelation(EmptyRelation),
61 Aggregate(Aggregate),
62 Update(Update),
63 Delete(Delete),
64 BeginTransaction(TransactionModes),
65 CommitTransaction,
66 RollbackTransaction,
67 SetTransaction {
68 scope: TransactionScope,
69 modes: TransactionModes,
70 },
71}
72
73impl LogicalPlan {
74 pub fn schema(&self) -> &SchemaRef {
75 match self {
76 LogicalPlan::CreateTable(_) => &EMPTY_SCHEMA_REF,
77 LogicalPlan::CreateIndex(_) => &EMPTY_SCHEMA_REF,
78 LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
79 LogicalPlan::Insert(_) => &INSERT_OUTPUT_SCHEMA_REF,
80 LogicalPlan::Join(Join { schema, .. }) => schema,
81 LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
82 LogicalPlan::Project(Project { schema, .. }) => schema,
83 LogicalPlan::TableScan(TableScan { table_schema, .. }) => table_schema,
84 LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
85 LogicalPlan::Values(Values { schema, .. }) => schema,
86 LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema,
87 LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
88 LogicalPlan::Update(_) => &UPDATE_OUTPUT_SCHEMA_REF,
89 LogicalPlan::Delete(_) => &DELETE_OUTPUT_SCHEMA_REF,
90 LogicalPlan::DropTable(_) => &EMPTY_SCHEMA_REF,
91 LogicalPlan::DropIndex(_) => &EMPTY_SCHEMA_REF,
92 LogicalPlan::BeginTransaction(_)
93 | LogicalPlan::CommitTransaction
94 | LogicalPlan::RollbackTransaction
95 | LogicalPlan::SetTransaction { .. } => &EMPTY_SCHEMA_REF,
96 }
97 }
98
99 pub fn inputs(&self) -> Vec<&LogicalPlan> {
100 match self {
101 LogicalPlan::Filter(Filter { input, .. }) => vec![input],
102 LogicalPlan::Insert(Insert { input, .. }) => vec![input],
103 LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
104 LogicalPlan::Limit(Limit { input, .. }) => vec![input],
105 LogicalPlan::Project(Project { input, .. }) => vec![input],
106 LogicalPlan::Sort(Sort { input, .. }) => vec![input],
107 LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input],
108 LogicalPlan::CreateTable(_)
109 | LogicalPlan::CreateIndex(_)
110 | LogicalPlan::DropTable(_)
111 | LogicalPlan::DropIndex(_)
112 | LogicalPlan::TableScan(_)
113 | LogicalPlan::Values(_)
114 | LogicalPlan::Update(_)
115 | LogicalPlan::Delete(_)
116 | LogicalPlan::EmptyRelation(_)
117 | LogicalPlan::BeginTransaction(_)
118 | LogicalPlan::CommitTransaction
119 | LogicalPlan::RollbackTransaction
120 | LogicalPlan::SetTransaction { .. } => vec![],
121 }
122 }
123
124 pub fn with_new_inputs(&self, inputs: &[LogicalPlan]) -> QuillSQLResult<LogicalPlan> {
125 match self {
126 LogicalPlan::Filter(Filter { predicate, .. }) => Ok(LogicalPlan::Filter(Filter {
127 predicate: predicate.clone(),
128 input: Arc::new(
129 inputs
130 .first()
131 .ok_or_else(|| {
132 QuillSQLError::Internal(format!(
133 "inputs {:?} should have at least one",
134 inputs
135 ))
136 })?
137 .clone(),
138 ),
139 })),
140 LogicalPlan::Insert(Insert {
141 table,
142 table_schema,
143 projected_schema,
144 ..
145 }) => Ok(LogicalPlan::Insert(Insert {
146 table: table.clone(),
147 table_schema: table_schema.clone(),
148 projected_schema: projected_schema.clone(),
149 input: Arc::new(
150 inputs
151 .first()
152 .ok_or_else(|| {
153 QuillSQLError::Internal(format!(
154 "inputs {:?} should have at least one",
155 inputs
156 ))
157 })?
158 .clone(),
159 ),
160 })),
161 LogicalPlan::Join(Join {
162 join_type,
163 condition,
164 schema,
165 ..
166 }) => Ok(LogicalPlan::Join(Join {
167 left: Arc::new(
168 inputs
169 .first()
170 .ok_or_else(|| {
171 QuillSQLError::Internal(format!(
172 "inputs {:?} should have at least two",
173 inputs
174 ))
175 })?
176 .clone(),
177 ),
178 right: Arc::new(
179 inputs
180 .first()
181 .ok_or_else(|| {
182 QuillSQLError::Internal(format!(
183 "inputs {:?} should have at least two",
184 inputs
185 ))
186 })?
187 .clone(),
188 ),
189 join_type: *join_type,
190 condition: condition.clone(),
191 schema: schema.clone(),
192 })),
193 LogicalPlan::Limit(Limit { limit, offset, .. }) => Ok(LogicalPlan::Limit(Limit {
194 limit: *limit,
195 offset: *offset,
196 input: Arc::new(
197 inputs
198 .first()
199 .ok_or_else(|| {
200 QuillSQLError::Internal(format!(
201 "inputs {:?} should have at least one",
202 inputs
203 ))
204 })?
205 .clone(),
206 ),
207 })),
208 LogicalPlan::Project(Project { exprs, schema, .. }) => {
209 Ok(LogicalPlan::Project(Project {
210 exprs: exprs.clone(),
211 schema: schema.clone(),
212 input: Arc::new(
213 inputs
214 .first()
215 .ok_or_else(|| {
216 QuillSQLError::Internal(format!(
217 "inputs {:?} should have at least one",
218 inputs
219 ))
220 })?
221 .clone(),
222 ),
223 }))
224 }
225 LogicalPlan::Sort(Sort {
226 order_by, limit, ..
227 }) => Ok(LogicalPlan::Sort(Sort {
228 order_by: order_by.clone(),
229 limit: *limit,
230 input: Arc::new(
231 inputs
232 .first()
233 .ok_or_else(|| {
234 QuillSQLError::Internal(format!(
235 "inputs {:?} should have at least one",
236 inputs
237 ))
238 })?
239 .clone(),
240 ),
241 })),
242 LogicalPlan::Aggregate(Aggregate {
243 group_exprs,
244 aggr_exprs,
245 schema,
246 ..
247 }) => Ok(LogicalPlan::Aggregate(Aggregate {
248 group_exprs: group_exprs.clone(),
249 aggr_exprs: aggr_exprs.clone(),
250 schema: schema.clone(),
251 input: Arc::new(
252 inputs
253 .first()
254 .ok_or_else(|| {
255 QuillSQLError::Internal(format!(
256 "inputs {:?} should have at least one",
257 inputs
258 ))
259 })?
260 .clone(),
261 ),
262 })),
263 LogicalPlan::CreateTable(_)
264 | LogicalPlan::CreateIndex(_)
265 | LogicalPlan::DropTable(_)
266 | LogicalPlan::DropIndex(_)
267 | LogicalPlan::TableScan(_)
268 | LogicalPlan::Values(_)
269 | LogicalPlan::Update(_)
270 | LogicalPlan::Delete(_)
271 | LogicalPlan::EmptyRelation(_)
272 | LogicalPlan::BeginTransaction(_)
273 | LogicalPlan::CommitTransaction
274 | LogicalPlan::RollbackTransaction
275 | LogicalPlan::SetTransaction { .. } => Ok(self.clone()),
276 }
277 }
278}
279
280impl std::fmt::Display for LogicalPlan {
281 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
282 match self {
283 LogicalPlan::CreateTable(v) => write!(f, "{v}"),
284 LogicalPlan::CreateIndex(v) => write!(f, "{v}"),
285 LogicalPlan::DropTable(v) => write!(f, "{v}"),
286 LogicalPlan::DropIndex(v) => write!(f, "{v}"),
287 LogicalPlan::Filter(v) => write!(f, "{v}"),
288 LogicalPlan::Insert(v) => write!(f, "{v}"),
289 LogicalPlan::Join(v) => write!(f, "{v}"),
290 LogicalPlan::Limit(v) => write!(f, "{v}"),
291 LogicalPlan::Project(v) => write!(f, "{v}"),
292 LogicalPlan::TableScan(v) => write!(f, "{v}"),
293 LogicalPlan::Sort(v) => write!(f, "{v}"),
294 LogicalPlan::Values(v) => write!(f, "{v}"),
295 LogicalPlan::EmptyRelation(v) => write!(f, "{v}"),
296 LogicalPlan::Aggregate(v) => write!(f, "{v}"),
297 LogicalPlan::Update(v) => write!(f, "{v}"),
298 LogicalPlan::Delete(v) => write!(f, "{v}"),
299 LogicalPlan::BeginTransaction(_) => write!(f, "BeginTransaction"),
300 LogicalPlan::CommitTransaction => write!(f, "CommitTransaction"),
301 LogicalPlan::RollbackTransaction => write!(f, "RollbackTransaction"),
302 LogicalPlan::SetTransaction { .. } => write!(f, "SetTransaction"),
303 }
304 }
305}
306
307#[derive(Debug, Clone, Copy)]
308pub enum TransactionScope {
309 Session,
310 Transaction,
311}
312
313#[derive(Debug, Clone, Default)]
314pub struct TransactionModes {
315 pub isolation_level: Option<IsolationLevel>,
316 pub access_mode: Option<TransactionAccessMode>,
317}
318
319impl TransactionModes {
320 pub fn from_modes(modes: &[TransactionMode]) -> Self {
321 let mut result = TransactionModes::default();
322 for mode in modes {
323 match mode {
324 TransactionMode::IsolationLevel(level) => {
325 let isolation = match level {
326 sqlparser::ast::TransactionIsolationLevel::ReadUncommitted => {
327 IsolationLevel::ReadUncommitted
328 }
329 sqlparser::ast::TransactionIsolationLevel::ReadCommitted => {
330 IsolationLevel::ReadCommitted
331 }
332 sqlparser::ast::TransactionIsolationLevel::RepeatableRead => {
333 IsolationLevel::RepeatableRead
334 }
335 sqlparser::ast::TransactionIsolationLevel::Serializable => {
336 IsolationLevel::Serializable
337 }
338 };
339 result.isolation_level = Some(isolation);
340 }
341 TransactionMode::AccessMode(mode) => {
342 result.access_mode = Some(*mode);
343 }
344 }
345 }
346 result
347 }
348
349 pub fn unwrap_effective_isolation(&self, fallback: IsolationLevel) -> IsolationLevel {
350 self.isolation_level.unwrap_or(fallback)
351 }
352
353 pub fn unwrap_effective_access_mode(
354 &self,
355 fallback: TransactionAccessMode,
356 ) -> TransactionAccessMode {
357 self.access_mode.unwrap_or(fallback)
358 }
359}