Skip to main content

quill_sql/plan/logical_plan/
mod.rs

1mod aggregate;
2mod analyze;
3mod create_index;
4mod create_table;
5mod delete;
6mod drop_index;
7mod drop_table;
8mod empty_relation;
9mod filter;
10mod insert;
11mod join;
12mod limit;
13mod project;
14mod sort;
15mod table_scan;
16mod update;
17mod util;
18mod values;
19
20pub use aggregate::Aggregate;
21pub use analyze::Analyze;
22pub use create_index::CreateIndex;
23pub use create_table::CreateTable;
24pub use delete::Delete;
25pub use drop_index::DropIndex;
26pub use drop_table::DropTable;
27pub use empty_relation::EmptyRelation;
28pub use filter::Filter;
29pub use insert::Insert;
30pub use join::{Join, JoinType};
31pub use limit::Limit;
32pub use project::Project;
33pub use sort::{OrderByExpr, Sort};
34pub use table_scan::TableScan;
35pub use update::Update;
36pub use util::*;
37pub use values::Values;
38
39use crate::catalog::{
40    SchemaRef, DELETE_OUTPUT_SCHEMA_REF, EMPTY_SCHEMA_REF, INSERT_OUTPUT_SCHEMA_REF,
41    UPDATE_OUTPUT_SCHEMA_REF,
42};
43use crate::error::{QuillSQLError, QuillSQLResult};
44use crate::transaction::IsolationLevel;
45use sqlparser::ast::{TransactionAccessMode, TransactionMode};
46use std::sync::Arc;
47
48#[derive(Debug, Clone)]
49pub enum LogicalPlan {
50    CreateTable(CreateTable),
51    CreateIndex(CreateIndex),
52    DropTable(DropTable),
53    DropIndex(DropIndex),
54    Filter(Filter),
55    Insert(Insert),
56    Join(Join),
57    Limit(Limit),
58    Project(Project),
59    TableScan(TableScan),
60    Sort(Sort),
61    Values(Values),
62    EmptyRelation(EmptyRelation),
63    Aggregate(Aggregate),
64    Update(Update),
65    Delete(Delete),
66    Analyze(Analyze),
67    BeginTransaction(TransactionModes),
68    CommitTransaction,
69    RollbackTransaction,
70    SetTransaction {
71        scope: TransactionScope,
72        modes: TransactionModes,
73    },
74}
75
76impl LogicalPlan {
77    pub fn schema(&self) -> &SchemaRef {
78        match self {
79            LogicalPlan::CreateTable(_) => &EMPTY_SCHEMA_REF,
80            LogicalPlan::CreateIndex(_) => &EMPTY_SCHEMA_REF,
81            LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
82            LogicalPlan::Insert(_) => &INSERT_OUTPUT_SCHEMA_REF,
83            LogicalPlan::Join(Join { schema, .. }) => schema,
84            LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
85            LogicalPlan::Project(Project { schema, .. }) => schema,
86            LogicalPlan::TableScan(TableScan { table_schema, .. }) => table_schema,
87            LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
88            LogicalPlan::Values(Values { schema, .. }) => schema,
89            LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema,
90            LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
91            LogicalPlan::Update(_) => &UPDATE_OUTPUT_SCHEMA_REF,
92            LogicalPlan::Delete(_) => &DELETE_OUTPUT_SCHEMA_REF,
93            LogicalPlan::DropTable(_) => &EMPTY_SCHEMA_REF,
94            LogicalPlan::DropIndex(_) => &EMPTY_SCHEMA_REF,
95            LogicalPlan::Analyze(_) => &EMPTY_SCHEMA_REF,
96            LogicalPlan::BeginTransaction(_)
97            | LogicalPlan::CommitTransaction
98            | LogicalPlan::RollbackTransaction
99            | LogicalPlan::SetTransaction { .. } => &EMPTY_SCHEMA_REF,
100        }
101    }
102
103    pub fn inputs(&self) -> Vec<&LogicalPlan> {
104        match self {
105            LogicalPlan::Filter(Filter { input, .. }) => vec![input],
106            LogicalPlan::Insert(Insert { input, .. }) => vec![input],
107            LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
108            LogicalPlan::Limit(Limit { input, .. }) => vec![input],
109            LogicalPlan::Project(Project { input, .. }) => vec![input],
110            LogicalPlan::Sort(Sort { input, .. }) => vec![input],
111            LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input],
112            LogicalPlan::CreateTable(_)
113            | LogicalPlan::CreateIndex(_)
114            | LogicalPlan::DropTable(_)
115            | LogicalPlan::DropIndex(_)
116            | LogicalPlan::TableScan(_)
117            | LogicalPlan::Values(_)
118            | LogicalPlan::Update(_)
119            | LogicalPlan::Delete(_)
120            | LogicalPlan::EmptyRelation(_)
121            | LogicalPlan::Analyze(_)
122            | LogicalPlan::BeginTransaction(_)
123            | LogicalPlan::CommitTransaction
124            | LogicalPlan::RollbackTransaction
125            | LogicalPlan::SetTransaction { .. } => vec![],
126        }
127    }
128
129    pub fn with_new_inputs(&self, inputs: &[LogicalPlan]) -> QuillSQLResult<LogicalPlan> {
130        match self {
131            LogicalPlan::Filter(Filter { predicate, .. }) => Ok(LogicalPlan::Filter(Filter {
132                predicate: predicate.clone(),
133                input: expect_single_input(inputs)?,
134            })),
135            LogicalPlan::Insert(Insert {
136                table,
137                table_schema,
138                projected_schema,
139                ..
140            }) => Ok(LogicalPlan::Insert(Insert {
141                table: table.clone(),
142                table_schema: table_schema.clone(),
143                projected_schema: projected_schema.clone(),
144                input: expect_single_input(inputs)?,
145            })),
146            LogicalPlan::Join(Join {
147                join_type,
148                condition,
149                schema,
150                ..
151            }) => {
152                let (left, right) = expect_two_inputs(inputs)?;
153                Ok(LogicalPlan::Join(Join {
154                    left,
155                    right,
156                    join_type: *join_type,
157                    condition: condition.clone(),
158                    schema: schema.clone(),
159                }))
160            }
161            LogicalPlan::Limit(Limit { limit, offset, .. }) => Ok(LogicalPlan::Limit(Limit {
162                limit: *limit,
163                offset: *offset,
164                input: expect_single_input(inputs)?,
165            })),
166            LogicalPlan::Project(Project { exprs, schema, .. }) => {
167                Ok(LogicalPlan::Project(Project {
168                    exprs: exprs.clone(),
169                    schema: schema.clone(),
170                    input: expect_single_input(inputs)?,
171                }))
172            }
173            LogicalPlan::Sort(Sort {
174                order_by, limit, ..
175            }) => Ok(LogicalPlan::Sort(Sort {
176                order_by: order_by.clone(),
177                limit: *limit,
178                input: expect_single_input(inputs)?,
179            })),
180            LogicalPlan::Aggregate(Aggregate {
181                group_exprs,
182                aggr_exprs,
183                schema,
184                ..
185            }) => Ok(LogicalPlan::Aggregate(Aggregate {
186                group_exprs: group_exprs.clone(),
187                aggr_exprs: aggr_exprs.clone(),
188                schema: schema.clone(),
189                input: expect_single_input(inputs)?,
190            })),
191            LogicalPlan::CreateTable(_)
192            | LogicalPlan::CreateIndex(_)
193            | LogicalPlan::DropTable(_)
194            | LogicalPlan::DropIndex(_)
195            | LogicalPlan::TableScan(_)
196            | LogicalPlan::Values(_)
197            | LogicalPlan::Update(_)
198            | LogicalPlan::Delete(_)
199            | LogicalPlan::EmptyRelation(_)
200            | LogicalPlan::Analyze(_)
201            | LogicalPlan::BeginTransaction(_)
202            | LogicalPlan::CommitTransaction
203            | LogicalPlan::RollbackTransaction
204            | LogicalPlan::SetTransaction { .. } => Ok(self.clone()),
205        }
206    }
207}
208
209fn expect_single_input(inputs: &[LogicalPlan]) -> QuillSQLResult<Arc<LogicalPlan>> {
210    expect_input_at(inputs, 0, "should have at least one input")
211}
212
213fn expect_two_inputs(
214    inputs: &[LogicalPlan],
215) -> QuillSQLResult<(Arc<LogicalPlan>, Arc<LogicalPlan>)> {
216    let left = expect_input_at(inputs, 0, "should have at least two inputs")?;
217    let right = expect_input_at(inputs, 1, "should have at least two inputs")?;
218    Ok((left, right))
219}
220
221fn expect_input_at(
222    inputs: &[LogicalPlan],
223    index: usize,
224    expectation: &str,
225) -> QuillSQLResult<Arc<LogicalPlan>> {
226    inputs
227        .get(index)
228        .cloned()
229        .map(Arc::new)
230        .ok_or_else(|| QuillSQLError::Internal(format!("inputs {:?} {}", inputs, expectation)))
231}
232
233impl std::fmt::Display for LogicalPlan {
234    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
235        match self {
236            LogicalPlan::CreateTable(v) => write!(f, "{v}"),
237            LogicalPlan::CreateIndex(v) => write!(f, "{v}"),
238            LogicalPlan::DropTable(v) => write!(f, "{v}"),
239            LogicalPlan::DropIndex(v) => write!(f, "{v}"),
240            LogicalPlan::Filter(v) => write!(f, "{v}"),
241            LogicalPlan::Insert(v) => write!(f, "{v}"),
242            LogicalPlan::Join(v) => write!(f, "{v}"),
243            LogicalPlan::Limit(v) => write!(f, "{v}"),
244            LogicalPlan::Project(v) => write!(f, "{v}"),
245            LogicalPlan::TableScan(v) => write!(f, "{v}"),
246            LogicalPlan::Sort(v) => write!(f, "{v}"),
247            LogicalPlan::Values(v) => write!(f, "{v}"),
248            LogicalPlan::EmptyRelation(v) => write!(f, "{v}"),
249            LogicalPlan::Aggregate(v) => write!(f, "{v}"),
250            LogicalPlan::Update(v) => write!(f, "{v}"),
251            LogicalPlan::Delete(v) => write!(f, "{v}"),
252            LogicalPlan::Analyze(v) => write!(f, "Analyze: {}", v.table),
253            LogicalPlan::BeginTransaction(_) => write!(f, "BeginTransaction"),
254            LogicalPlan::CommitTransaction => write!(f, "CommitTransaction"),
255            LogicalPlan::RollbackTransaction => write!(f, "RollbackTransaction"),
256            LogicalPlan::SetTransaction { .. } => write!(f, "SetTransaction"),
257        }
258    }
259}
260
261#[derive(Debug, Clone, Copy)]
262pub enum TransactionScope {
263    Session,
264    Transaction,
265}
266
267#[derive(Debug, Clone, Default)]
268pub struct TransactionModes {
269    pub isolation_level: Option<IsolationLevel>,
270    pub access_mode: Option<TransactionAccessMode>,
271}
272
273impl TransactionModes {
274    pub fn from_modes(modes: &[TransactionMode]) -> Self {
275        let mut result = TransactionModes::default();
276        for mode in modes {
277            match mode {
278                TransactionMode::IsolationLevel(level) => {
279                    let isolation = match level {
280                        sqlparser::ast::TransactionIsolationLevel::ReadUncommitted => {
281                            IsolationLevel::ReadUncommitted
282                        }
283                        sqlparser::ast::TransactionIsolationLevel::ReadCommitted => {
284                            IsolationLevel::ReadCommitted
285                        }
286                        sqlparser::ast::TransactionIsolationLevel::RepeatableRead => {
287                            IsolationLevel::RepeatableRead
288                        }
289                        sqlparser::ast::TransactionIsolationLevel::Serializable => {
290                            IsolationLevel::Serializable
291                        }
292                    };
293                    result.isolation_level = Some(isolation);
294                }
295                TransactionMode::AccessMode(mode) => {
296                    result.access_mode = Some(*mode);
297                }
298            }
299        }
300        result
301    }
302
303    pub fn unwrap_effective_isolation(&self, fallback: IsolationLevel) -> IsolationLevel {
304        self.isolation_level.unwrap_or(fallback)
305    }
306
307    pub fn unwrap_effective_access_mode(
308        &self,
309        fallback: TransactionAccessMode,
310    ) -> TransactionAccessMode {
311        self.access_mode.unwrap_or(fallback)
312    }
313}