quill_sql/plan/logical_plan/
mod.rs

1mod aggregate;
2mod create_index;
3mod create_table;
4mod delete;
5mod empty_relation;
6mod filter;
7mod insert;
8mod join;
9mod limit;
10mod project;
11mod sort;
12mod table_scan;
13mod update;
14mod util;
15mod values;
16
17pub use aggregate::Aggregate;
18pub use create_index::CreateIndex;
19pub use create_table::CreateTable;
20pub use delete::Delete;
21pub use empty_relation::EmptyRelation;
22pub use filter::Filter;
23pub use insert::Insert;
24pub use join::{Join, JoinType};
25pub use limit::Limit;
26pub use project::Project;
27pub use sort::{OrderByExpr, Sort};
28pub use table_scan::TableScan;
29pub use update::Update;
30pub use util::*;
31pub use values::Values;
32
33use crate::catalog::{
34    SchemaRef, DELETE_OUTPUT_SCHEMA_REF, EMPTY_SCHEMA_REF, INSERT_OUTPUT_SCHEMA_REF,
35    UPDATE_OUTPUT_SCHEMA_REF,
36};
37use crate::error::{QuillSQLError, QuillSQLResult};
38use crate::transaction::IsolationLevel;
39use sqlparser::ast::{TransactionAccessMode, TransactionMode};
40use std::sync::Arc;
41
42#[derive(Debug, Clone)]
43pub enum LogicalPlan {
44    CreateTable(CreateTable),
45    CreateIndex(CreateIndex),
46    Filter(Filter),
47    Insert(Insert),
48    Join(Join),
49    Limit(Limit),
50    Project(Project),
51    TableScan(TableScan),
52    Sort(Sort),
53    Values(Values),
54    EmptyRelation(EmptyRelation),
55    Aggregate(Aggregate),
56    Update(Update),
57    Delete(Delete),
58    BeginTransaction(TransactionModes),
59    CommitTransaction,
60    RollbackTransaction,
61    SetTransaction {
62        scope: TransactionScope,
63        modes: TransactionModes,
64    },
65}
66
67impl LogicalPlan {
68    pub fn schema(&self) -> &SchemaRef {
69        match self {
70            LogicalPlan::CreateTable(_) => &EMPTY_SCHEMA_REF,
71            LogicalPlan::CreateIndex(_) => &EMPTY_SCHEMA_REF,
72            LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
73            LogicalPlan::Insert(_) => &INSERT_OUTPUT_SCHEMA_REF,
74            LogicalPlan::Join(Join { schema, .. }) => schema,
75            LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
76            LogicalPlan::Project(Project { schema, .. }) => schema,
77            LogicalPlan::TableScan(TableScan { table_schema, .. }) => table_schema,
78            LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
79            LogicalPlan::Values(Values { schema, .. }) => schema,
80            LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema,
81            LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
82            LogicalPlan::Update(_) => &UPDATE_OUTPUT_SCHEMA_REF,
83            LogicalPlan::Delete(_) => &DELETE_OUTPUT_SCHEMA_REF,
84            LogicalPlan::BeginTransaction(_)
85            | LogicalPlan::CommitTransaction
86            | LogicalPlan::RollbackTransaction
87            | LogicalPlan::SetTransaction { .. } => &EMPTY_SCHEMA_REF,
88        }
89    }
90
91    pub fn inputs(&self) -> Vec<&LogicalPlan> {
92        match self {
93            LogicalPlan::Filter(Filter { input, .. }) => vec![input],
94            LogicalPlan::Insert(Insert { input, .. }) => vec![input],
95            LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
96            LogicalPlan::Limit(Limit { input, .. }) => vec![input],
97            LogicalPlan::Project(Project { input, .. }) => vec![input],
98            LogicalPlan::Sort(Sort { input, .. }) => vec![input],
99            LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input],
100            LogicalPlan::CreateTable(_)
101            | LogicalPlan::CreateIndex(_)
102            | LogicalPlan::TableScan(_)
103            | LogicalPlan::Values(_)
104            | LogicalPlan::Update(_)
105            | LogicalPlan::Delete(_)
106            | LogicalPlan::EmptyRelation(_)
107            | LogicalPlan::BeginTransaction(_)
108            | LogicalPlan::CommitTransaction
109            | LogicalPlan::RollbackTransaction
110            | LogicalPlan::SetTransaction { .. } => vec![],
111        }
112    }
113
114    pub fn with_new_inputs(&self, inputs: &[LogicalPlan]) -> QuillSQLResult<LogicalPlan> {
115        match self {
116            LogicalPlan::Filter(Filter { predicate, .. }) => Ok(LogicalPlan::Filter(Filter {
117                predicate: predicate.clone(),
118                input: Arc::new(
119                    inputs
120                        .first()
121                        .ok_or_else(|| {
122                            QuillSQLError::Internal(format!(
123                                "inputs {:?} should have at least one",
124                                inputs
125                            ))
126                        })?
127                        .clone(),
128                ),
129            })),
130            LogicalPlan::Insert(Insert {
131                table,
132                table_schema,
133                projected_schema,
134                ..
135            }) => Ok(LogicalPlan::Insert(Insert {
136                table: table.clone(),
137                table_schema: table_schema.clone(),
138                projected_schema: projected_schema.clone(),
139                input: Arc::new(
140                    inputs
141                        .first()
142                        .ok_or_else(|| {
143                            QuillSQLError::Internal(format!(
144                                "inputs {:?} should have at least one",
145                                inputs
146                            ))
147                        })?
148                        .clone(),
149                ),
150            })),
151            LogicalPlan::Join(Join {
152                join_type,
153                condition,
154                schema,
155                ..
156            }) => Ok(LogicalPlan::Join(Join {
157                left: Arc::new(
158                    inputs
159                        .first()
160                        .ok_or_else(|| {
161                            QuillSQLError::Internal(format!(
162                                "inputs {:?} should have at least two",
163                                inputs
164                            ))
165                        })?
166                        .clone(),
167                ),
168                right: Arc::new(
169                    inputs
170                        .first()
171                        .ok_or_else(|| {
172                            QuillSQLError::Internal(format!(
173                                "inputs {:?} should have at least two",
174                                inputs
175                            ))
176                        })?
177                        .clone(),
178                ),
179                join_type: *join_type,
180                condition: condition.clone(),
181                schema: schema.clone(),
182            })),
183            LogicalPlan::Limit(Limit { limit, offset, .. }) => Ok(LogicalPlan::Limit(Limit {
184                limit: *limit,
185                offset: *offset,
186                input: Arc::new(
187                    inputs
188                        .first()
189                        .ok_or_else(|| {
190                            QuillSQLError::Internal(format!(
191                                "inputs {:?} should have at least one",
192                                inputs
193                            ))
194                        })?
195                        .clone(),
196                ),
197            })),
198            LogicalPlan::Project(Project { exprs, schema, .. }) => {
199                Ok(LogicalPlan::Project(Project {
200                    exprs: exprs.clone(),
201                    schema: schema.clone(),
202                    input: Arc::new(
203                        inputs
204                            .first()
205                            .ok_or_else(|| {
206                                QuillSQLError::Internal(format!(
207                                    "inputs {:?} should have at least one",
208                                    inputs
209                                ))
210                            })?
211                            .clone(),
212                    ),
213                }))
214            }
215            LogicalPlan::Sort(Sort {
216                order_by, limit, ..
217            }) => Ok(LogicalPlan::Sort(Sort {
218                order_by: order_by.clone(),
219                limit: *limit,
220                input: Arc::new(
221                    inputs
222                        .first()
223                        .ok_or_else(|| {
224                            QuillSQLError::Internal(format!(
225                                "inputs {:?} should have at least one",
226                                inputs
227                            ))
228                        })?
229                        .clone(),
230                ),
231            })),
232            LogicalPlan::Aggregate(Aggregate {
233                group_exprs,
234                aggr_exprs,
235                schema,
236                ..
237            }) => Ok(LogicalPlan::Aggregate(Aggregate {
238                group_exprs: group_exprs.clone(),
239                aggr_exprs: aggr_exprs.clone(),
240                schema: schema.clone(),
241                input: Arc::new(
242                    inputs
243                        .first()
244                        .ok_or_else(|| {
245                            QuillSQLError::Internal(format!(
246                                "inputs {:?} should have at least one",
247                                inputs
248                            ))
249                        })?
250                        .clone(),
251                ),
252            })),
253            LogicalPlan::CreateTable(_)
254            | LogicalPlan::CreateIndex(_)
255            | LogicalPlan::TableScan(_)
256            | LogicalPlan::Values(_)
257            | LogicalPlan::Update(_)
258            | LogicalPlan::Delete(_)
259            | LogicalPlan::EmptyRelation(_)
260            | LogicalPlan::BeginTransaction(_)
261            | LogicalPlan::CommitTransaction
262            | LogicalPlan::RollbackTransaction
263            | LogicalPlan::SetTransaction { .. } => Ok(self.clone()),
264        }
265    }
266}
267
268impl std::fmt::Display for LogicalPlan {
269    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
270        match self {
271            LogicalPlan::CreateTable(v) => write!(f, "{v}"),
272            LogicalPlan::CreateIndex(v) => write!(f, "{v}"),
273            LogicalPlan::Filter(v) => write!(f, "{v}"),
274            LogicalPlan::Insert(v) => write!(f, "{v}"),
275            LogicalPlan::Join(v) => write!(f, "{v}"),
276            LogicalPlan::Limit(v) => write!(f, "{v}"),
277            LogicalPlan::Project(v) => write!(f, "{v}"),
278            LogicalPlan::TableScan(v) => write!(f, "{v}"),
279            LogicalPlan::Sort(v) => write!(f, "{v}"),
280            LogicalPlan::Values(v) => write!(f, "{v}"),
281            LogicalPlan::EmptyRelation(v) => write!(f, "{v}"),
282            LogicalPlan::Aggregate(v) => write!(f, "{v}"),
283            LogicalPlan::Update(v) => write!(f, "{v}"),
284            LogicalPlan::Delete(v) => write!(f, "{v}"),
285            LogicalPlan::BeginTransaction(_) => write!(f, "BeginTransaction"),
286            LogicalPlan::CommitTransaction => write!(f, "CommitTransaction"),
287            LogicalPlan::RollbackTransaction => write!(f, "RollbackTransaction"),
288            LogicalPlan::SetTransaction { .. } => write!(f, "SetTransaction"),
289        }
290    }
291}
292
293#[derive(Debug, Clone, Copy)]
294pub enum TransactionScope {
295    Session,
296    Transaction,
297}
298
299#[derive(Debug, Clone, Default)]
300pub struct TransactionModes {
301    pub isolation_level: Option<IsolationLevel>,
302    pub access_mode: Option<TransactionAccessMode>,
303}
304
305impl TransactionModes {
306    pub fn from_modes(modes: &[TransactionMode]) -> Self {
307        let mut result = TransactionModes::default();
308        for mode in modes {
309            match mode {
310                TransactionMode::IsolationLevel(level) => {
311                    let isolation = match level {
312                        sqlparser::ast::TransactionIsolationLevel::ReadUncommitted => {
313                            IsolationLevel::ReadUncommitted
314                        }
315                        sqlparser::ast::TransactionIsolationLevel::ReadCommitted => {
316                            IsolationLevel::ReadCommitted
317                        }
318                        sqlparser::ast::TransactionIsolationLevel::RepeatableRead => {
319                            IsolationLevel::RepeatableRead
320                        }
321                        sqlparser::ast::TransactionIsolationLevel::Serializable => {
322                            IsolationLevel::Serializable
323                        }
324                    };
325                    result.isolation_level = Some(isolation);
326                }
327                TransactionMode::AccessMode(mode) => {
328                    result.access_mode = Some(*mode);
329                }
330            }
331        }
332        result
333    }
334
335    pub fn unwrap_effective_isolation(&self, fallback: IsolationLevel) -> IsolationLevel {
336        self.isolation_level.unwrap_or(fallback)
337    }
338
339    pub fn unwrap_effective_access_mode(
340        &self,
341        fallback: TransactionAccessMode,
342    ) -> TransactionAccessMode {
343        self.access_mode.unwrap_or(fallback)
344    }
345}