1use crate::ast::JoinType;
4use crate::ast::{AggregateFunc, Expr, SortOrder};
5use alloc::boxed::Box;
6use alloc::string::String;
7use alloc::vec::Vec;
8use cynos_core::Value;
9
10#[derive(Clone, Copy, Debug, PartialEq, Eq)]
12pub enum JoinAlgorithm {
13 Hash,
15 SortMerge,
17 NestedLoop,
19 IndexNestedLoop,
21}
22
23#[derive(Clone, Debug)]
25pub enum PhysicalPlan {
26 TableScan { table: String },
28
29 IndexScan {
31 table: String,
32 index: String,
33 range_start: Option<Value>,
34 range_end: Option<Value>,
35 include_start: bool,
36 include_end: bool,
37 limit: Option<usize>,
39 offset: Option<usize>,
41 reverse: bool,
43 },
44
45 IndexGet {
47 table: String,
48 index: String,
49 key: Value,
50 limit: Option<usize>,
52 },
53
54 IndexInGet {
57 table: String,
58 index: String,
59 keys: Vec<Value>,
60 },
61
62 GinIndexScan {
64 table: String,
65 index: String,
66 key: String,
68 value: Option<String>,
70 query_type: String,
72 },
73
74 GinIndexScanMulti {
77 table: String,
78 index: String,
79 pairs: Vec<(String, String)>,
81 },
82
83 Filter {
85 input: Box<PhysicalPlan>,
86 predicate: Expr,
87 },
88
89 Project {
91 input: Box<PhysicalPlan>,
92 columns: Vec<Expr>,
93 },
94
95 HashJoin {
97 left: Box<PhysicalPlan>,
98 right: Box<PhysicalPlan>,
99 condition: Expr,
100 join_type: JoinType,
101 },
102
103 SortMergeJoin {
105 left: Box<PhysicalPlan>,
106 right: Box<PhysicalPlan>,
107 condition: Expr,
108 join_type: JoinType,
109 },
110
111 NestedLoopJoin {
113 left: Box<PhysicalPlan>,
114 right: Box<PhysicalPlan>,
115 condition: Expr,
116 join_type: JoinType,
117 },
118
119 IndexNestedLoopJoin {
121 outer: Box<PhysicalPlan>,
122 inner_table: String,
123 inner_index: String,
124 condition: Expr,
125 join_type: JoinType,
126 },
127
128 HashAggregate {
130 input: Box<PhysicalPlan>,
131 group_by: Vec<Expr>,
132 aggregates: Vec<(AggregateFunc, Expr)>,
133 },
134
135 Sort {
137 input: Box<PhysicalPlan>,
138 order_by: Vec<(Expr, SortOrder)>,
139 },
140
141 TopN {
144 input: Box<PhysicalPlan>,
145 order_by: Vec<(Expr, SortOrder)>,
146 limit: usize,
147 offset: usize,
148 },
149
150 Limit {
152 input: Box<PhysicalPlan>,
153 limit: usize,
154 offset: usize,
155 },
156
157 CrossProduct {
159 left: Box<PhysicalPlan>,
160 right: Box<PhysicalPlan>,
161 },
162
163 NoOp { input: Box<PhysicalPlan> },
165
166 Empty,
168}
169
170impl PhysicalPlan {
171 pub fn table_scan(table: impl Into<String>) -> Self {
173 PhysicalPlan::TableScan {
174 table: table.into(),
175 }
176 }
177
178 pub fn index_scan(
180 table: impl Into<String>,
181 index: impl Into<String>,
182 range_start: Option<Value>,
183 range_end: Option<Value>,
184 ) -> Self {
185 PhysicalPlan::IndexScan {
186 table: table.into(),
187 index: index.into(),
188 range_start,
189 range_end,
190 include_start: true,
191 include_end: true,
192 limit: None,
193 offset: None,
194 reverse: false,
195 }
196 }
197
198 pub fn index_scan_with_limit(
200 table: impl Into<String>,
201 index: impl Into<String>,
202 range_start: Option<Value>,
203 range_end: Option<Value>,
204 limit: Option<usize>,
205 offset: Option<usize>,
206 ) -> Self {
207 PhysicalPlan::IndexScan {
208 table: table.into(),
209 index: index.into(),
210 range_start,
211 range_end,
212 include_start: true,
213 include_end: true,
214 limit,
215 offset,
216 reverse: false,
217 }
218 }
219
220 pub fn index_scan_with_options(
222 table: impl Into<String>,
223 index: impl Into<String>,
224 range_start: Option<Value>,
225 range_end: Option<Value>,
226 limit: Option<usize>,
227 offset: Option<usize>,
228 reverse: bool,
229 ) -> Self {
230 PhysicalPlan::IndexScan {
231 table: table.into(),
232 index: index.into(),
233 range_start,
234 range_end,
235 include_start: true,
236 include_end: true,
237 limit,
238 offset,
239 reverse,
240 }
241 }
242
243 pub fn index_get(table: impl Into<String>, index: impl Into<String>, key: Value) -> Self {
245 PhysicalPlan::IndexGet {
246 table: table.into(),
247 index: index.into(),
248 key,
249 limit: None,
250 }
251 }
252
253 pub fn index_get_with_limit(
255 table: impl Into<String>,
256 index: impl Into<String>,
257 key: Value,
258 limit: Option<usize>,
259 ) -> Self {
260 PhysicalPlan::IndexGet {
261 table: table.into(),
262 index: index.into(),
263 key,
264 limit,
265 }
266 }
267
268 pub fn index_in_get(
270 table: impl Into<String>,
271 index: impl Into<String>,
272 keys: Vec<Value>,
273 ) -> Self {
274 PhysicalPlan::IndexInGet {
275 table: table.into(),
276 index: index.into(),
277 keys,
278 }
279 }
280
281 pub fn gin_index_scan(
283 table: impl Into<String>,
284 index: impl Into<String>,
285 key: impl Into<String>,
286 value: Option<String>,
287 query_type: impl Into<String>,
288 ) -> Self {
289 PhysicalPlan::GinIndexScan {
290 table: table.into(),
291 index: index.into(),
292 key: key.into(),
293 value,
294 query_type: query_type.into(),
295 }
296 }
297
298 pub fn gin_index_scan_multi(
300 table: impl Into<String>,
301 index: impl Into<String>,
302 pairs: Vec<(String, String)>,
303 ) -> Self {
304 PhysicalPlan::GinIndexScanMulti {
305 table: table.into(),
306 index: index.into(),
307 pairs,
308 }
309 }
310
311 pub fn filter(input: PhysicalPlan, predicate: Expr) -> Self {
313 PhysicalPlan::Filter {
314 input: Box::new(input),
315 predicate,
316 }
317 }
318
319 pub fn project(input: PhysicalPlan, columns: Vec<Expr>) -> Self {
321 PhysicalPlan::Project {
322 input: Box::new(input),
323 columns,
324 }
325 }
326
327 pub fn hash_join(
329 left: PhysicalPlan,
330 right: PhysicalPlan,
331 condition: Expr,
332 join_type: JoinType,
333 ) -> Self {
334 PhysicalPlan::HashJoin {
335 left: Box::new(left),
336 right: Box::new(right),
337 condition,
338 join_type,
339 }
340 }
341
342 pub fn sort_merge_join(
344 left: PhysicalPlan,
345 right: PhysicalPlan,
346 condition: Expr,
347 join_type: JoinType,
348 ) -> Self {
349 PhysicalPlan::SortMergeJoin {
350 left: Box::new(left),
351 right: Box::new(right),
352 condition,
353 join_type,
354 }
355 }
356
357 pub fn nested_loop_join(
359 left: PhysicalPlan,
360 right: PhysicalPlan,
361 condition: Expr,
362 join_type: JoinType,
363 ) -> Self {
364 PhysicalPlan::NestedLoopJoin {
365 left: Box::new(left),
366 right: Box::new(right),
367 condition,
368 join_type,
369 }
370 }
371
372 pub fn hash_aggregate(
374 input: PhysicalPlan,
375 group_by: Vec<Expr>,
376 aggregates: Vec<(AggregateFunc, Expr)>,
377 ) -> Self {
378 PhysicalPlan::HashAggregate {
379 input: Box::new(input),
380 group_by,
381 aggregates,
382 }
383 }
384
385 pub fn sort(input: PhysicalPlan, order_by: Vec<(Expr, SortOrder)>) -> Self {
387 PhysicalPlan::Sort {
388 input: Box::new(input),
389 order_by,
390 }
391 }
392
393 pub fn top_n(
395 input: PhysicalPlan,
396 order_by: Vec<(Expr, SortOrder)>,
397 limit: usize,
398 offset: usize,
399 ) -> Self {
400 PhysicalPlan::TopN {
401 input: Box::new(input),
402 order_by,
403 limit,
404 offset,
405 }
406 }
407
408 pub fn limit(input: PhysicalPlan, limit: usize, offset: usize) -> Self {
410 PhysicalPlan::Limit {
411 input: Box::new(input),
412 limit,
413 offset,
414 }
415 }
416
417 pub fn is_incrementalizable(&self) -> bool {
419 match self {
420 PhysicalPlan::TableScan { .. }
421 | PhysicalPlan::IndexScan { .. }
422 | PhysicalPlan::IndexGet { .. }
423 | PhysicalPlan::IndexInGet { .. }
424 | PhysicalPlan::Filter { .. }
425 | PhysicalPlan::Project { .. }
426 | PhysicalPlan::HashJoin { .. }
427 | PhysicalPlan::HashAggregate { .. } => true,
428 PhysicalPlan::Sort { .. } | PhysicalPlan::Limit { .. } | PhysicalPlan::TopN { .. } => {
429 false
430 }
431 PhysicalPlan::SortMergeJoin { .. }
432 | PhysicalPlan::NestedLoopJoin { .. }
433 | PhysicalPlan::IndexNestedLoopJoin { .. } => true,
434 PhysicalPlan::CrossProduct { .. } => true,
435 PhysicalPlan::NoOp { input } => input.is_incrementalizable(),
436 PhysicalPlan::Empty => true,
437 PhysicalPlan::GinIndexScan { .. } | PhysicalPlan::GinIndexScanMulti { .. } => true,
438 }
439 }
440
441 pub fn inputs(&self) -> Vec<&PhysicalPlan> {
443 match self {
444 PhysicalPlan::TableScan { .. }
445 | PhysicalPlan::IndexScan { .. }
446 | PhysicalPlan::IndexGet { .. }
447 | PhysicalPlan::IndexInGet { .. }
448 | PhysicalPlan::GinIndexScan { .. }
449 | PhysicalPlan::GinIndexScanMulti { .. }
450 | PhysicalPlan::Empty => alloc::vec![],
451 PhysicalPlan::Filter { input, .. }
452 | PhysicalPlan::Project { input, .. }
453 | PhysicalPlan::HashAggregate { input, .. }
454 | PhysicalPlan::Sort { input, .. }
455 | PhysicalPlan::TopN { input, .. }
456 | PhysicalPlan::Limit { input, .. }
457 | PhysicalPlan::NoOp { input } => alloc::vec![input.as_ref()],
458 PhysicalPlan::HashJoin { left, right, .. }
459 | PhysicalPlan::SortMergeJoin { left, right, .. }
460 | PhysicalPlan::NestedLoopJoin { left, right, .. }
461 | PhysicalPlan::CrossProduct { left, right } => {
462 alloc::vec![left.as_ref(), right.as_ref()]
463 }
464 PhysicalPlan::IndexNestedLoopJoin { outer, .. } => alloc::vec![outer.as_ref()],
465 }
466 }
467}
468
469#[cfg(test)]
470mod tests {
471 use super::*;
472 use crate::ast::Expr;
473
474 #[test]
475 fn test_physical_plan_builders() {
476 let scan = PhysicalPlan::table_scan("users");
477 assert!(matches!(scan, PhysicalPlan::TableScan { table } if table == "users"));
478
479 let index_scan = PhysicalPlan::index_scan(
480 "users",
481 "idx_id",
482 Some(Value::Int64(1)),
483 Some(Value::Int64(100)),
484 );
485 assert!(matches!(index_scan, PhysicalPlan::IndexScan { .. }));
486 }
487
488 #[test]
489 fn test_is_incrementalizable() {
490 let scan = PhysicalPlan::table_scan("users");
491 assert!(scan.is_incrementalizable());
492
493 let sort = PhysicalPlan::sort(
494 PhysicalPlan::table_scan("users"),
495 alloc::vec![(Expr::column("users", "id", 0), SortOrder::Asc)],
496 );
497 assert!(!sort.is_incrementalizable());
498
499 let limit = PhysicalPlan::limit(PhysicalPlan::table_scan("users"), 10, 0);
500 assert!(!limit.is_incrementalizable());
501 }
502}