1use contextdb_core::{Direction, PropagationRule};
2use contextdb_parser::ast::{
3 AlterAction, ColumnDef, Expr, OnConflict, RetainOption, SetDiskLimitValue, SetMemoryLimitValue,
4 SortDirection, StateMachineDef,
5};
6
7#[derive(Debug, Clone)]
8pub enum PhysicalPlan {
9 CreateTable(CreateTablePlan),
10 AlterTable(AlterTablePlan),
11 DropTable(String),
12 CreateIndex(CreateIndexPlan),
13 DropIndex(DropIndexPlan),
14 Insert(InsertPlan),
15 Delete(DeletePlan),
16 Update(UpdatePlan),
17 Scan {
18 table: String,
19 alias: Option<String>,
20 filter: Option<Expr>,
21 },
22 IndexScan {
23 table: String,
24 index: String,
25 range: ScanRange,
26 },
27 GraphBfs {
28 start_alias: String,
29 start_expr: Expr,
30 start_candidates: Option<Box<PhysicalPlan>>,
31 steps: Vec<GraphStepPlan>,
32 filter: Option<Expr>,
33 },
34 VectorSearch {
35 table: String,
36 column: String,
37 query_expr: Expr,
38 k: u64,
39 candidates: Option<Box<PhysicalPlan>>,
40 sort_key: Option<String>,
41 },
42 HnswSearch {
43 table: String,
44 column: String,
45 query_expr: Expr,
46 k: u64,
47 candidates: Option<Box<PhysicalPlan>>,
48 sort_key: Option<String>,
49 },
50 Filter {
51 input: Box<PhysicalPlan>,
52 predicate: Expr,
53 },
54 Project {
55 input: Box<PhysicalPlan>,
56 columns: Vec<ProjectColumn>,
57 },
58 Distinct {
59 input: Box<PhysicalPlan>,
60 },
61 Join {
62 left: Box<PhysicalPlan>,
63 right: Box<PhysicalPlan>,
64 condition: Expr,
65 join_type: JoinType,
66 left_alias: Option<String>,
67 right_alias: Option<String>,
68 },
69 Sort {
70 input: Box<PhysicalPlan>,
71 keys: Vec<SortKey>,
72 },
73 Limit {
74 input: Box<PhysicalPlan>,
75 count: u64,
76 },
77 MaterializeCte {
78 name: String,
79 input: Box<PhysicalPlan>,
80 },
81 CteRef {
82 name: String,
83 },
84 Union {
85 inputs: Vec<PhysicalPlan>,
86 all: bool,
87 },
88 Pipeline(Vec<PhysicalPlan>),
89 SetMemoryLimit(SetMemoryLimitValue),
90 ShowMemoryLimit,
91 SetDiskLimit(SetDiskLimitValue),
92 ShowDiskLimit,
93 SetSyncConflictPolicy(String),
94 ShowSyncConflictPolicy,
95 ShowVectorIndexes,
96}
97
98impl PhysicalPlan {
99 pub fn explain(&self) -> String {
100 match self {
101 PhysicalPlan::GraphBfs { steps, .. } => {
102 format!(
103 "GraphBfs(steps={})",
104 steps
105 .iter()
106 .map(|step| format!(
107 "{}..{}:{:?}",
108 step.min_depth, step.max_depth, step.edge_types
109 ))
110 .collect::<Vec<_>>()
111 .join(" -> ")
112 )
113 }
114 PhysicalPlan::VectorSearch {
115 table, column, k, ..
116 } => {
117 format!("VectorSearch(table={}, column={}, k={})", table, column, k)
118 }
119 PhysicalPlan::HnswSearch {
120 table, column, k, ..
121 } => {
122 format!("HNSWSearch(table={}, column={}, k={})", table, column, k)
123 }
124 PhysicalPlan::Scan { table, .. } => format!("Scan(table={})", table),
125 PhysicalPlan::AlterTable(p) => format!("AlterTable(table={})", p.table),
126 PhysicalPlan::Insert(p) => format!("Insert(table={})", p.table),
127 PhysicalPlan::Delete(p) => format!("Delete(table={})", p.table),
128 PhysicalPlan::Update(p) => format!("Update(table={})", p.table),
129 PhysicalPlan::Pipeline(plans) => plans
130 .iter()
131 .map(Self::explain)
132 .collect::<Vec<_>>()
133 .join(" -> "),
134 _ => format!("{:?}", self),
135 }
136 }
137}
138
139#[derive(Debug, Clone)]
140pub struct GraphStepPlan {
141 pub edge_types: Vec<String>,
142 pub direction: Direction,
143 pub min_depth: u32,
144 pub max_depth: u32,
145 pub target_alias: String,
146}
147
148#[derive(Debug, Clone)]
149pub struct CreateTablePlan {
150 pub name: String,
151 pub columns: Vec<ColumnDef>,
152 pub unique_constraints: Vec<Vec<String>>,
153 pub immutable: bool,
154 pub state_machine: Option<StateMachineDef>,
155 pub dag_edge_types: Vec<String>,
156 pub propagation_rules: Vec<PropagationRule>,
157 pub retain: Option<RetainOption>,
158}
159
160#[derive(Debug, Clone)]
161pub struct AlterTablePlan {
162 pub table: String,
163 pub action: AlterAction,
164}
165
166#[derive(Debug, Clone)]
167pub struct CreateIndexPlan {
168 pub name: String,
169 pub table: String,
170 pub columns: Vec<(String, contextdb_core::SortDirection)>,
171}
172
173#[derive(Debug, Clone)]
174pub struct DropIndexPlan {
175 pub name: String,
176 pub table: String,
177 pub if_exists: bool,
178}
179
180#[derive(Debug, Clone)]
181pub struct InsertPlan {
182 pub table: String,
183 pub columns: Vec<String>,
184 pub values: Vec<Vec<Expr>>,
185 pub on_conflict: Option<OnConflictPlan>,
186}
187
188#[derive(Debug, Clone)]
189pub struct OnConflictPlan {
190 pub columns: Vec<String>,
191 pub update_columns: Vec<(String, Expr)>,
192}
193
194#[derive(Debug, Clone)]
195pub struct DeletePlan {
196 pub table: String,
197 pub where_clause: Option<Expr>,
198}
199
200#[derive(Debug, Clone)]
201pub struct UpdatePlan {
202 pub table: String,
203 pub assignments: Vec<(String, Expr)>,
204 pub where_clause: Option<Expr>,
205}
206
207#[derive(Debug, Clone)]
208pub struct ProjectColumn {
209 pub expr: Expr,
210 pub alias: Option<String>,
211}
212
213#[derive(Debug, Clone)]
214pub struct SortKey {
215 pub expr: Expr,
216 pub direction: SortDirection,
217}
218
219#[derive(Debug, Clone, Copy)]
220pub enum JoinType {
221 Inner,
222 Left,
223}
224
225#[derive(Debug, Clone)]
226pub struct ScanRange {
227 pub lower: std::ops::Bound<contextdb_core::Value>,
228 pub upper: std::ops::Bound<contextdb_core::Value>,
229 pub equality: Option<contextdb_core::Value>,
230}
231
232impl Default for ScanRange {
233 fn default() -> Self {
234 Self {
235 lower: std::ops::Bound::Unbounded,
236 upper: std::ops::Bound::Unbounded,
237 equality: None,
238 }
239 }
240}
241
242impl From<OnConflict> for OnConflictPlan {
243 fn from(value: OnConflict) -> Self {
244 Self {
245 columns: value.columns,
246 update_columns: value.update_columns,
247 }
248 }
249}