quill_sql/execution/physical_plan/
mod.rs

1use std::sync::OnceLock;
2
3use crate::storage::engine::TableBinding;
4use crate::utils::table_ref::TableReference;
5
6mod aggregate;
7mod analyze;
8mod create_index;
9mod create_table;
10mod delete;
11mod drop_index;
12mod drop_table;
13mod empty;
14mod filter;
15mod index_scan;
16mod insert;
17mod limit;
18mod nested_loop_join;
19mod project;
20mod scan;
21mod seq_scan;
22mod sort;
23mod update;
24mod values;
25
26pub use aggregate::PhysicalAggregate;
27pub use analyze::PhysicalAnalyze;
28pub use create_index::PhysicalCreateIndex;
29pub use create_table::PhysicalCreateTable;
30pub use delete::PhysicalDelete;
31pub use drop_index::PhysicalDropIndex;
32pub use drop_table::PhysicalDropTable;
33pub use empty::PhysicalEmpty;
34pub use filter::PhysicalFilter;
35pub use index_scan::PhysicalIndexScan;
36pub use insert::PhysicalInsert;
37pub use limit::PhysicalLimit;
38pub use nested_loop_join::PhysicalNestedLoopJoin;
39pub use project::PhysicalProject;
40pub use seq_scan::PhysicalSeqScan;
41pub use sort::PhysicalSort;
42pub use update::PhysicalUpdate;
43pub use values::PhysicalValues;
44
45use crate::catalog::SchemaRef;
46use crate::{
47    error::QuillSQLResult,
48    execution::{ExecutionContext, VolcanoExecutor},
49    storage::tuple::Tuple,
50};
51
52#[derive(Debug)]
53pub enum PhysicalPlan {
54    Empty(PhysicalEmpty),
55    Values(PhysicalValues),
56    SeqScan(PhysicalSeqScan),
57    IndexScan(PhysicalIndexScan),
58    Limit(PhysicalLimit),
59    Sort(PhysicalSort),
60    Update(PhysicalUpdate),
61    Delete(PhysicalDelete),
62    Insert(PhysicalInsert),
63    Project(PhysicalProject),
64    Filter(PhysicalFilter),
65    NestedLoopJoin(PhysicalNestedLoopJoin),
66    Aggregate(PhysicalAggregate),
67    Analyze(PhysicalAnalyze),
68    CreateTable(PhysicalCreateTable),
69    CreateIndex(PhysicalCreateIndex),
70    DropTable(PhysicalDropTable),
71    DropIndex(PhysicalDropIndex),
72}
73
74impl PhysicalPlan {
75    pub fn inputs(&self) -> Vec<&PhysicalPlan> {
76        match self {
77            PhysicalPlan::Project(PhysicalProject { input, .. }) => vec![input],
78            PhysicalPlan::Filter(PhysicalFilter { input, .. }) => vec![input],
79            PhysicalPlan::Limit(PhysicalLimit { input, .. }) => vec![input],
80            PhysicalPlan::Insert(PhysicalInsert { input, .. }) => vec![input],
81            PhysicalPlan::NestedLoopJoin(PhysicalNestedLoopJoin {
82                left_input,
83                right_input,
84                ..
85            }) => vec![left_input, right_input],
86            PhysicalPlan::Sort(PhysicalSort { input, .. }) => vec![input],
87            PhysicalPlan::Aggregate(PhysicalAggregate { input, .. }) => vec![input],
88            PhysicalPlan::Empty(_)
89            | PhysicalPlan::CreateTable(_)
90            | PhysicalPlan::CreateIndex(_)
91            | PhysicalPlan::DropTable(_)
92            | PhysicalPlan::DropIndex(_)
93            | PhysicalPlan::SeqScan(_)
94            | PhysicalPlan::IndexScan(_)
95            | PhysicalPlan::Update(_)
96            | PhysicalPlan::Delete(_)
97            | PhysicalPlan::Values(_)
98            | PhysicalPlan::Analyze(_) => vec![],
99        }
100    }
101
102    pub fn display_name(&self) -> String {
103        match self {
104            PhysicalPlan::Empty(_) => "Empty".to_string(),
105            PhysicalPlan::Values(_) => "Values".to_string(),
106            PhysicalPlan::SeqScan(_) => "SeqScan".to_string(),
107            PhysicalPlan::IndexScan(_) => "IndexScan".to_string(),
108            PhysicalPlan::Limit(_) => "Limit".to_string(),
109            PhysicalPlan::Sort(_) => "Sort".to_string(),
110            PhysicalPlan::Update(_) => "Update".to_string(),
111            PhysicalPlan::Delete(_) => "Delete".to_string(),
112            PhysicalPlan::Insert(_) => "Insert".to_string(),
113            PhysicalPlan::Project(_) => "Project".to_string(),
114            PhysicalPlan::Filter(_) => "Filter".to_string(),
115            PhysicalPlan::NestedLoopJoin(_) => "NestedLoopJoin".to_string(),
116            PhysicalPlan::Aggregate(_) => "Aggregate".to_string(),
117            PhysicalPlan::Analyze(_) => "Analyze".to_string(),
118            PhysicalPlan::CreateTable(_) => "CreateTable".to_string(),
119            PhysicalPlan::CreateIndex(_) => "CreateIndex".to_string(),
120            PhysicalPlan::DropTable(_) => "DropTable".to_string(),
121            PhysicalPlan::DropIndex(_) => "DropIndex".to_string(),
122        }
123    }
124}
125
126impl VolcanoExecutor for PhysicalPlan {
127    fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
128        match self {
129            PhysicalPlan::Empty(op) => op.init(context),
130            PhysicalPlan::CreateTable(op) => op.init(context),
131            PhysicalPlan::CreateIndex(op) => op.init(context),
132            PhysicalPlan::DropTable(op) => op.init(context),
133            PhysicalPlan::DropIndex(op) => op.init(context),
134            PhysicalPlan::Insert(op) => op.init(context),
135            PhysicalPlan::Values(op) => op.init(context),
136            PhysicalPlan::Project(op) => op.init(context),
137            PhysicalPlan::Filter(op) => op.init(context),
138            PhysicalPlan::SeqScan(op) => op.init(context),
139            PhysicalPlan::IndexScan(op) => op.init(context),
140            PhysicalPlan::Limit(op) => op.init(context),
141            PhysicalPlan::NestedLoopJoin(op) => op.init(context),
142            PhysicalPlan::Sort(op) => op.init(context),
143            PhysicalPlan::Aggregate(op) => op.init(context),
144            PhysicalPlan::Update(op) => op.init(context),
145            PhysicalPlan::Delete(op) => op.init(context),
146            PhysicalPlan::Analyze(op) => op.init(context),
147        }
148    }
149
150    fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
151        match self {
152            PhysicalPlan::Empty(op) => op.next(context),
153            PhysicalPlan::CreateTable(op) => op.next(context),
154            PhysicalPlan::CreateIndex(op) => op.next(context),
155            PhysicalPlan::DropTable(op) => op.next(context),
156            PhysicalPlan::DropIndex(op) => op.next(context),
157            PhysicalPlan::Insert(op) => op.next(context),
158            PhysicalPlan::Values(op) => op.next(context),
159            PhysicalPlan::Project(op) => op.next(context),
160            PhysicalPlan::Filter(op) => op.next(context),
161            PhysicalPlan::SeqScan(op) => op.next(context),
162            PhysicalPlan::IndexScan(op) => op.next(context),
163            PhysicalPlan::Limit(op) => op.next(context),
164            PhysicalPlan::NestedLoopJoin(op) => op.next(context),
165            PhysicalPlan::Sort(op) => op.next(context),
166            PhysicalPlan::Aggregate(op) => op.next(context),
167            PhysicalPlan::Update(op) => op.next(context),
168            PhysicalPlan::Delete(op) => op.next(context),
169            PhysicalPlan::Analyze(op) => op.next(context),
170        }
171    }
172
173    fn output_schema(&self) -> SchemaRef {
174        match self {
175            Self::Empty(op) => op.output_schema(),
176            Self::CreateTable(op) => op.output_schema(),
177            Self::CreateIndex(op) => op.output_schema(),
178            Self::DropTable(op) => op.output_schema(),
179            Self::DropIndex(op) => op.output_schema(),
180            Self::Insert(op) => op.output_schema(),
181            Self::Values(op) => op.output_schema(),
182            Self::Project(op) => op.output_schema(),
183            Self::Filter(op) => op.output_schema(),
184            Self::SeqScan(op) => op.output_schema(),
185            Self::IndexScan(op) => op.output_schema(),
186            Self::Limit(op) => op.output_schema(),
187            Self::NestedLoopJoin(op) => op.output_schema(),
188            Self::Sort(op) => op.output_schema(),
189            Self::Aggregate(op) => op.output_schema(),
190            Self::Update(op) => op.output_schema(),
191            Self::Delete(op) => op.output_schema(),
192            Self::Analyze(op) => op.output_schema(),
193        }
194    }
195}
196
197impl std::fmt::Display for PhysicalPlan {
198    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
199        match self {
200            Self::Empty(op) => write!(f, "{op}"),
201            Self::CreateTable(op) => write!(f, "{op}"),
202            Self::CreateIndex(op) => write!(f, "{op}"),
203            Self::DropTable(op) => write!(f, "{op}"),
204            Self::DropIndex(op) => write!(f, "{op}"),
205            Self::Insert(op) => write!(f, "{op}"),
206            Self::Values(op) => write!(f, "{op}"),
207            Self::Project(op) => write!(f, "{op}"),
208            Self::Filter(op) => write!(f, "{op}"),
209            Self::SeqScan(op) => write!(f, "{op}"),
210            Self::IndexScan(op) => write!(f, "{op}"),
211            Self::Limit(op) => write!(f, "{op}"),
212            Self::NestedLoopJoin(op) => write!(f, "{op}"),
213            Self::Sort(op) => write!(f, "{op}"),
214            Self::Aggregate(op) => write!(f, "{op}"),
215            Self::Update(op) => write!(f, "{op}"),
216            Self::Delete(op) => write!(f, "{op}"),
217            Self::Analyze(op) => write!(f, "{op}"),
218        }
219    }
220}
221
222pub(crate) fn resolve_table_binding<'a>(
223    cache: &'a OnceLock<TableBinding>,
224    context: &mut ExecutionContext,
225    table: &TableReference,
226) -> QuillSQLResult<&'a TableBinding> {
227    if cache.get().is_none() {
228        let binding = context.table(table)?;
229        let _ = cache.set(binding);
230    }
231    Ok(cache.get().expect("table binding not initialized"))
232}
233
234pub(crate) fn stream_not_ready(op: &str) -> crate::error::QuillSQLError {
235    crate::error::QuillSQLError::Execution(format!("{op} stream not initialized"))
236}