quill_sql/execution/physical_plan/
mod.rs1use std::sync::OnceLock;
2
3use crate::storage::engine::TableBinding;
4use crate::utils::table_ref::TableReference;
5
6mod aggregate;
7mod analyze;
8mod create_index;
9mod create_table;
10mod delete;
11mod drop_index;
12mod drop_table;
13mod empty;
14mod filter;
15mod index_scan;
16mod insert;
17mod limit;
18mod nested_loop_join;
19mod project;
20mod scan;
21mod seq_scan;
22mod sort;
23mod update;
24mod values;
25
26pub use aggregate::PhysicalAggregate;
27pub use analyze::PhysicalAnalyze;
28pub use create_index::PhysicalCreateIndex;
29pub use create_table::PhysicalCreateTable;
30pub use delete::PhysicalDelete;
31pub use drop_index::PhysicalDropIndex;
32pub use drop_table::PhysicalDropTable;
33pub use empty::PhysicalEmpty;
34pub use filter::PhysicalFilter;
35pub use index_scan::PhysicalIndexScan;
36pub use insert::PhysicalInsert;
37pub use limit::PhysicalLimit;
38pub use nested_loop_join::PhysicalNestedLoopJoin;
39pub use project::PhysicalProject;
40pub use seq_scan::PhysicalSeqScan;
41pub use sort::PhysicalSort;
42pub use update::PhysicalUpdate;
43pub use values::PhysicalValues;
44
45use crate::catalog::SchemaRef;
46use crate::{
47 error::QuillSQLResult,
48 execution::{ExecutionContext, VolcanoExecutor},
49 storage::tuple::Tuple,
50};
51
52#[derive(Debug)]
53pub enum PhysicalPlan {
54 Empty(PhysicalEmpty),
55 Values(PhysicalValues),
56 SeqScan(PhysicalSeqScan),
57 IndexScan(PhysicalIndexScan),
58 Limit(PhysicalLimit),
59 Sort(PhysicalSort),
60 Update(PhysicalUpdate),
61 Delete(PhysicalDelete),
62 Insert(PhysicalInsert),
63 Project(PhysicalProject),
64 Filter(PhysicalFilter),
65 NestedLoopJoin(PhysicalNestedLoopJoin),
66 Aggregate(PhysicalAggregate),
67 Analyze(PhysicalAnalyze),
68 CreateTable(PhysicalCreateTable),
69 CreateIndex(PhysicalCreateIndex),
70 DropTable(PhysicalDropTable),
71 DropIndex(PhysicalDropIndex),
72}
73
74impl PhysicalPlan {
75 pub fn inputs(&self) -> Vec<&PhysicalPlan> {
76 match self {
77 PhysicalPlan::Project(PhysicalProject { input, .. }) => vec![input],
78 PhysicalPlan::Filter(PhysicalFilter { input, .. }) => vec![input],
79 PhysicalPlan::Limit(PhysicalLimit { input, .. }) => vec![input],
80 PhysicalPlan::Insert(PhysicalInsert { input, .. }) => vec![input],
81 PhysicalPlan::NestedLoopJoin(PhysicalNestedLoopJoin {
82 left_input,
83 right_input,
84 ..
85 }) => vec![left_input, right_input],
86 PhysicalPlan::Sort(PhysicalSort { input, .. }) => vec![input],
87 PhysicalPlan::Aggregate(PhysicalAggregate { input, .. }) => vec![input],
88 PhysicalPlan::Empty(_)
89 | PhysicalPlan::CreateTable(_)
90 | PhysicalPlan::CreateIndex(_)
91 | PhysicalPlan::DropTable(_)
92 | PhysicalPlan::DropIndex(_)
93 | PhysicalPlan::SeqScan(_)
94 | PhysicalPlan::IndexScan(_)
95 | PhysicalPlan::Update(_)
96 | PhysicalPlan::Delete(_)
97 | PhysicalPlan::Values(_)
98 | PhysicalPlan::Analyze(_) => vec![],
99 }
100 }
101
102 pub fn display_name(&self) -> String {
103 match self {
104 PhysicalPlan::Empty(_) => "Empty".to_string(),
105 PhysicalPlan::Values(_) => "Values".to_string(),
106 PhysicalPlan::SeqScan(_) => "SeqScan".to_string(),
107 PhysicalPlan::IndexScan(_) => "IndexScan".to_string(),
108 PhysicalPlan::Limit(_) => "Limit".to_string(),
109 PhysicalPlan::Sort(_) => "Sort".to_string(),
110 PhysicalPlan::Update(_) => "Update".to_string(),
111 PhysicalPlan::Delete(_) => "Delete".to_string(),
112 PhysicalPlan::Insert(_) => "Insert".to_string(),
113 PhysicalPlan::Project(_) => "Project".to_string(),
114 PhysicalPlan::Filter(_) => "Filter".to_string(),
115 PhysicalPlan::NestedLoopJoin(_) => "NestedLoopJoin".to_string(),
116 PhysicalPlan::Aggregate(_) => "Aggregate".to_string(),
117 PhysicalPlan::Analyze(_) => "Analyze".to_string(),
118 PhysicalPlan::CreateTable(_) => "CreateTable".to_string(),
119 PhysicalPlan::CreateIndex(_) => "CreateIndex".to_string(),
120 PhysicalPlan::DropTable(_) => "DropTable".to_string(),
121 PhysicalPlan::DropIndex(_) => "DropIndex".to_string(),
122 }
123 }
124}
125
126impl VolcanoExecutor for PhysicalPlan {
127 fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
128 match self {
129 PhysicalPlan::Empty(op) => op.init(context),
130 PhysicalPlan::CreateTable(op) => op.init(context),
131 PhysicalPlan::CreateIndex(op) => op.init(context),
132 PhysicalPlan::DropTable(op) => op.init(context),
133 PhysicalPlan::DropIndex(op) => op.init(context),
134 PhysicalPlan::Insert(op) => op.init(context),
135 PhysicalPlan::Values(op) => op.init(context),
136 PhysicalPlan::Project(op) => op.init(context),
137 PhysicalPlan::Filter(op) => op.init(context),
138 PhysicalPlan::SeqScan(op) => op.init(context),
139 PhysicalPlan::IndexScan(op) => op.init(context),
140 PhysicalPlan::Limit(op) => op.init(context),
141 PhysicalPlan::NestedLoopJoin(op) => op.init(context),
142 PhysicalPlan::Sort(op) => op.init(context),
143 PhysicalPlan::Aggregate(op) => op.init(context),
144 PhysicalPlan::Update(op) => op.init(context),
145 PhysicalPlan::Delete(op) => op.init(context),
146 PhysicalPlan::Analyze(op) => op.init(context),
147 }
148 }
149
150 fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
151 match self {
152 PhysicalPlan::Empty(op) => op.next(context),
153 PhysicalPlan::CreateTable(op) => op.next(context),
154 PhysicalPlan::CreateIndex(op) => op.next(context),
155 PhysicalPlan::DropTable(op) => op.next(context),
156 PhysicalPlan::DropIndex(op) => op.next(context),
157 PhysicalPlan::Insert(op) => op.next(context),
158 PhysicalPlan::Values(op) => op.next(context),
159 PhysicalPlan::Project(op) => op.next(context),
160 PhysicalPlan::Filter(op) => op.next(context),
161 PhysicalPlan::SeqScan(op) => op.next(context),
162 PhysicalPlan::IndexScan(op) => op.next(context),
163 PhysicalPlan::Limit(op) => op.next(context),
164 PhysicalPlan::NestedLoopJoin(op) => op.next(context),
165 PhysicalPlan::Sort(op) => op.next(context),
166 PhysicalPlan::Aggregate(op) => op.next(context),
167 PhysicalPlan::Update(op) => op.next(context),
168 PhysicalPlan::Delete(op) => op.next(context),
169 PhysicalPlan::Analyze(op) => op.next(context),
170 }
171 }
172
173 fn output_schema(&self) -> SchemaRef {
174 match self {
175 Self::Empty(op) => op.output_schema(),
176 Self::CreateTable(op) => op.output_schema(),
177 Self::CreateIndex(op) => op.output_schema(),
178 Self::DropTable(op) => op.output_schema(),
179 Self::DropIndex(op) => op.output_schema(),
180 Self::Insert(op) => op.output_schema(),
181 Self::Values(op) => op.output_schema(),
182 Self::Project(op) => op.output_schema(),
183 Self::Filter(op) => op.output_schema(),
184 Self::SeqScan(op) => op.output_schema(),
185 Self::IndexScan(op) => op.output_schema(),
186 Self::Limit(op) => op.output_schema(),
187 Self::NestedLoopJoin(op) => op.output_schema(),
188 Self::Sort(op) => op.output_schema(),
189 Self::Aggregate(op) => op.output_schema(),
190 Self::Update(op) => op.output_schema(),
191 Self::Delete(op) => op.output_schema(),
192 Self::Analyze(op) => op.output_schema(),
193 }
194 }
195}
196
197impl std::fmt::Display for PhysicalPlan {
198 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
199 match self {
200 Self::Empty(op) => write!(f, "{op}"),
201 Self::CreateTable(op) => write!(f, "{op}"),
202 Self::CreateIndex(op) => write!(f, "{op}"),
203 Self::DropTable(op) => write!(f, "{op}"),
204 Self::DropIndex(op) => write!(f, "{op}"),
205 Self::Insert(op) => write!(f, "{op}"),
206 Self::Values(op) => write!(f, "{op}"),
207 Self::Project(op) => write!(f, "{op}"),
208 Self::Filter(op) => write!(f, "{op}"),
209 Self::SeqScan(op) => write!(f, "{op}"),
210 Self::IndexScan(op) => write!(f, "{op}"),
211 Self::Limit(op) => write!(f, "{op}"),
212 Self::NestedLoopJoin(op) => write!(f, "{op}"),
213 Self::Sort(op) => write!(f, "{op}"),
214 Self::Aggregate(op) => write!(f, "{op}"),
215 Self::Update(op) => write!(f, "{op}"),
216 Self::Delete(op) => write!(f, "{op}"),
217 Self::Analyze(op) => write!(f, "{op}"),
218 }
219 }
220}
221
222pub(crate) fn resolve_table_binding<'a>(
223 cache: &'a OnceLock<TableBinding>,
224 context: &mut ExecutionContext,
225 table: &TableReference,
226) -> QuillSQLResult<&'a TableBinding> {
227 if cache.get().is_none() {
228 let binding = context.table(table)?;
229 let _ = cache.set(binding);
230 }
231 Ok(cache.get().expect("table binding not initialized"))
232}
233
234pub(crate) fn stream_not_ready(op: &str) -> crate::error::QuillSQLError {
235 crate::error::QuillSQLError::Execution(format!("{op} stream not initialized"))
236}