quill_sql/plan/logical_plan/
mod.rs

1mod aggregate;
2mod create_index;
3mod create_table;
4mod delete;
5mod drop_index;
6mod drop_table;
7mod empty_relation;
8mod filter;
9mod insert;
10mod join;
11mod limit;
12mod project;
13mod sort;
14mod table_scan;
15mod update;
16mod util;
17mod values;
18
19pub use aggregate::Aggregate;
20pub use create_index::CreateIndex;
21pub use create_table::CreateTable;
22pub use delete::Delete;
23pub use drop_index::DropIndex;
24pub use drop_table::DropTable;
25pub use empty_relation::EmptyRelation;
26pub use filter::Filter;
27pub use insert::Insert;
28pub use join::{Join, JoinType};
29pub use limit::Limit;
30pub use project::Project;
31pub use sort::{OrderByExpr, Sort};
32pub use table_scan::TableScan;
33pub use update::Update;
34pub use util::*;
35pub use values::Values;
36
37use crate::catalog::{
38    SchemaRef, DELETE_OUTPUT_SCHEMA_REF, EMPTY_SCHEMA_REF, INSERT_OUTPUT_SCHEMA_REF,
39    UPDATE_OUTPUT_SCHEMA_REF,
40};
41use crate::error::{QuillSQLError, QuillSQLResult};
42use crate::transaction::IsolationLevel;
43use sqlparser::ast::{TransactionAccessMode, TransactionMode};
44use std::sync::Arc;
45
46#[derive(Debug, Clone)]
47pub enum LogicalPlan {
48    CreateTable(CreateTable),
49    CreateIndex(CreateIndex),
50    DropTable(DropTable),
51    DropIndex(DropIndex),
52    Filter(Filter),
53    Insert(Insert),
54    Join(Join),
55    Limit(Limit),
56    Project(Project),
57    TableScan(TableScan),
58    Sort(Sort),
59    Values(Values),
60    EmptyRelation(EmptyRelation),
61    Aggregate(Aggregate),
62    Update(Update),
63    Delete(Delete),
64    BeginTransaction(TransactionModes),
65    CommitTransaction,
66    RollbackTransaction,
67    SetTransaction {
68        scope: TransactionScope,
69        modes: TransactionModes,
70    },
71}
72
73impl LogicalPlan {
74    pub fn schema(&self) -> &SchemaRef {
75        match self {
76            LogicalPlan::CreateTable(_) => &EMPTY_SCHEMA_REF,
77            LogicalPlan::CreateIndex(_) => &EMPTY_SCHEMA_REF,
78            LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
79            LogicalPlan::Insert(_) => &INSERT_OUTPUT_SCHEMA_REF,
80            LogicalPlan::Join(Join { schema, .. }) => schema,
81            LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
82            LogicalPlan::Project(Project { schema, .. }) => schema,
83            LogicalPlan::TableScan(TableScan { table_schema, .. }) => table_schema,
84            LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
85            LogicalPlan::Values(Values { schema, .. }) => schema,
86            LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema,
87            LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
88            LogicalPlan::Update(_) => &UPDATE_OUTPUT_SCHEMA_REF,
89            LogicalPlan::Delete(_) => &DELETE_OUTPUT_SCHEMA_REF,
90            LogicalPlan::DropTable(_) => &EMPTY_SCHEMA_REF,
91            LogicalPlan::DropIndex(_) => &EMPTY_SCHEMA_REF,
92            LogicalPlan::BeginTransaction(_)
93            | LogicalPlan::CommitTransaction
94            | LogicalPlan::RollbackTransaction
95            | LogicalPlan::SetTransaction { .. } => &EMPTY_SCHEMA_REF,
96        }
97    }
98
99    pub fn inputs(&self) -> Vec<&LogicalPlan> {
100        match self {
101            LogicalPlan::Filter(Filter { input, .. }) => vec![input],
102            LogicalPlan::Insert(Insert { input, .. }) => vec![input],
103            LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
104            LogicalPlan::Limit(Limit { input, .. }) => vec![input],
105            LogicalPlan::Project(Project { input, .. }) => vec![input],
106            LogicalPlan::Sort(Sort { input, .. }) => vec![input],
107            LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input],
108            LogicalPlan::CreateTable(_)
109            | LogicalPlan::CreateIndex(_)
110            | LogicalPlan::DropTable(_)
111            | LogicalPlan::DropIndex(_)
112            | LogicalPlan::TableScan(_)
113            | LogicalPlan::Values(_)
114            | LogicalPlan::Update(_)
115            | LogicalPlan::Delete(_)
116            | LogicalPlan::EmptyRelation(_)
117            | LogicalPlan::BeginTransaction(_)
118            | LogicalPlan::CommitTransaction
119            | LogicalPlan::RollbackTransaction
120            | LogicalPlan::SetTransaction { .. } => vec![],
121        }
122    }
123
124    pub fn with_new_inputs(&self, inputs: &[LogicalPlan]) -> QuillSQLResult<LogicalPlan> {
125        match self {
126            LogicalPlan::Filter(Filter { predicate, .. }) => Ok(LogicalPlan::Filter(Filter {
127                predicate: predicate.clone(),
128                input: Arc::new(
129                    inputs
130                        .first()
131                        .ok_or_else(|| {
132                            QuillSQLError::Internal(format!(
133                                "inputs {:?} should have at least one",
134                                inputs
135                            ))
136                        })?
137                        .clone(),
138                ),
139            })),
140            LogicalPlan::Insert(Insert {
141                table,
142                table_schema,
143                projected_schema,
144                ..
145            }) => Ok(LogicalPlan::Insert(Insert {
146                table: table.clone(),
147                table_schema: table_schema.clone(),
148                projected_schema: projected_schema.clone(),
149                input: Arc::new(
150                    inputs
151                        .first()
152                        .ok_or_else(|| {
153                            QuillSQLError::Internal(format!(
154                                "inputs {:?} should have at least one",
155                                inputs
156                            ))
157                        })?
158                        .clone(),
159                ),
160            })),
161            LogicalPlan::Join(Join {
162                join_type,
163                condition,
164                schema,
165                ..
166            }) => Ok(LogicalPlan::Join(Join {
167                left: Arc::new(
168                    inputs
169                        .first()
170                        .ok_or_else(|| {
171                            QuillSQLError::Internal(format!(
172                                "inputs {:?} should have at least two",
173                                inputs
174                            ))
175                        })?
176                        .clone(),
177                ),
178                right: Arc::new(
179                    inputs
180                        .first()
181                        .ok_or_else(|| {
182                            QuillSQLError::Internal(format!(
183                                "inputs {:?} should have at least two",
184                                inputs
185                            ))
186                        })?
187                        .clone(),
188                ),
189                join_type: *join_type,
190                condition: condition.clone(),
191                schema: schema.clone(),
192            })),
193            LogicalPlan::Limit(Limit { limit, offset, .. }) => Ok(LogicalPlan::Limit(Limit {
194                limit: *limit,
195                offset: *offset,
196                input: Arc::new(
197                    inputs
198                        .first()
199                        .ok_or_else(|| {
200                            QuillSQLError::Internal(format!(
201                                "inputs {:?} should have at least one",
202                                inputs
203                            ))
204                        })?
205                        .clone(),
206                ),
207            })),
208            LogicalPlan::Project(Project { exprs, schema, .. }) => {
209                Ok(LogicalPlan::Project(Project {
210                    exprs: exprs.clone(),
211                    schema: schema.clone(),
212                    input: Arc::new(
213                        inputs
214                            .first()
215                            .ok_or_else(|| {
216                                QuillSQLError::Internal(format!(
217                                    "inputs {:?} should have at least one",
218                                    inputs
219                                ))
220                            })?
221                            .clone(),
222                    ),
223                }))
224            }
225            LogicalPlan::Sort(Sort {
226                order_by, limit, ..
227            }) => Ok(LogicalPlan::Sort(Sort {
228                order_by: order_by.clone(),
229                limit: *limit,
230                input: Arc::new(
231                    inputs
232                        .first()
233                        .ok_or_else(|| {
234                            QuillSQLError::Internal(format!(
235                                "inputs {:?} should have at least one",
236                                inputs
237                            ))
238                        })?
239                        .clone(),
240                ),
241            })),
242            LogicalPlan::Aggregate(Aggregate {
243                group_exprs,
244                aggr_exprs,
245                schema,
246                ..
247            }) => Ok(LogicalPlan::Aggregate(Aggregate {
248                group_exprs: group_exprs.clone(),
249                aggr_exprs: aggr_exprs.clone(),
250                schema: schema.clone(),
251                input: Arc::new(
252                    inputs
253                        .first()
254                        .ok_or_else(|| {
255                            QuillSQLError::Internal(format!(
256                                "inputs {:?} should have at least one",
257                                inputs
258                            ))
259                        })?
260                        .clone(),
261                ),
262            })),
263            LogicalPlan::CreateTable(_)
264            | LogicalPlan::CreateIndex(_)
265            | LogicalPlan::DropTable(_)
266            | LogicalPlan::DropIndex(_)
267            | LogicalPlan::TableScan(_)
268            | LogicalPlan::Values(_)
269            | LogicalPlan::Update(_)
270            | LogicalPlan::Delete(_)
271            | LogicalPlan::EmptyRelation(_)
272            | LogicalPlan::BeginTransaction(_)
273            | LogicalPlan::CommitTransaction
274            | LogicalPlan::RollbackTransaction
275            | LogicalPlan::SetTransaction { .. } => Ok(self.clone()),
276        }
277    }
278}
279
280impl std::fmt::Display for LogicalPlan {
281    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
282        match self {
283            LogicalPlan::CreateTable(v) => write!(f, "{v}"),
284            LogicalPlan::CreateIndex(v) => write!(f, "{v}"),
285            LogicalPlan::DropTable(v) => write!(f, "{v}"),
286            LogicalPlan::DropIndex(v) => write!(f, "{v}"),
287            LogicalPlan::Filter(v) => write!(f, "{v}"),
288            LogicalPlan::Insert(v) => write!(f, "{v}"),
289            LogicalPlan::Join(v) => write!(f, "{v}"),
290            LogicalPlan::Limit(v) => write!(f, "{v}"),
291            LogicalPlan::Project(v) => write!(f, "{v}"),
292            LogicalPlan::TableScan(v) => write!(f, "{v}"),
293            LogicalPlan::Sort(v) => write!(f, "{v}"),
294            LogicalPlan::Values(v) => write!(f, "{v}"),
295            LogicalPlan::EmptyRelation(v) => write!(f, "{v}"),
296            LogicalPlan::Aggregate(v) => write!(f, "{v}"),
297            LogicalPlan::Update(v) => write!(f, "{v}"),
298            LogicalPlan::Delete(v) => write!(f, "{v}"),
299            LogicalPlan::BeginTransaction(_) => write!(f, "BeginTransaction"),
300            LogicalPlan::CommitTransaction => write!(f, "CommitTransaction"),
301            LogicalPlan::RollbackTransaction => write!(f, "RollbackTransaction"),
302            LogicalPlan::SetTransaction { .. } => write!(f, "SetTransaction"),
303        }
304    }
305}
306
307#[derive(Debug, Clone, Copy)]
308pub enum TransactionScope {
309    Session,
310    Transaction,
311}
312
313#[derive(Debug, Clone, Default)]
314pub struct TransactionModes {
315    pub isolation_level: Option<IsolationLevel>,
316    pub access_mode: Option<TransactionAccessMode>,
317}
318
319impl TransactionModes {
320    pub fn from_modes(modes: &[TransactionMode]) -> Self {
321        let mut result = TransactionModes::default();
322        for mode in modes {
323            match mode {
324                TransactionMode::IsolationLevel(level) => {
325                    let isolation = match level {
326                        sqlparser::ast::TransactionIsolationLevel::ReadUncommitted => {
327                            IsolationLevel::ReadUncommitted
328                        }
329                        sqlparser::ast::TransactionIsolationLevel::ReadCommitted => {
330                            IsolationLevel::ReadCommitted
331                        }
332                        sqlparser::ast::TransactionIsolationLevel::RepeatableRead => {
333                            IsolationLevel::RepeatableRead
334                        }
335                        sqlparser::ast::TransactionIsolationLevel::Serializable => {
336                            IsolationLevel::Serializable
337                        }
338                    };
339                    result.isolation_level = Some(isolation);
340                }
341                TransactionMode::AccessMode(mode) => {
342                    result.access_mode = Some(*mode);
343                }
344            }
345        }
346        result
347    }
348
349    pub fn unwrap_effective_isolation(&self, fallback: IsolationLevel) -> IsolationLevel {
350        self.isolation_level.unwrap_or(fallback)
351    }
352
353    pub fn unwrap_effective_access_mode(
354        &self,
355        fallback: TransactionAccessMode,
356    ) -> TransactionAccessMode {
357        self.access_mode.unwrap_or(fallback)
358    }
359}