1use contextdb_core::{Direction, PropagationRule};
2use contextdb_parser::ast::{
3 AlterAction, ColumnDef, Expr, OnConflict, RetainOption, SetDiskLimitValue, SetMemoryLimitValue,
4 SortDirection, StateMachineDef,
5};
6
7#[derive(Debug, Clone)]
8pub enum PhysicalPlan {
9 CreateTable(CreateTablePlan),
10 AlterTable(AlterTablePlan),
11 DropTable(String),
12 CreateIndex(CreateIndexPlan),
13 DropIndex(DropIndexPlan),
14 Insert(InsertPlan),
15 Delete(DeletePlan),
16 Update(UpdatePlan),
17 Scan {
18 table: String,
19 alias: Option<String>,
20 filter: Option<Expr>,
21 },
22 IndexScan {
23 table: String,
24 index: String,
25 range: ScanRange,
26 },
27 GraphBfs {
28 start_alias: String,
29 start_expr: Expr,
30 start_candidates: Option<Box<PhysicalPlan>>,
31 steps: Vec<GraphStepPlan>,
32 filter: Option<Expr>,
33 },
34 VectorSearch {
35 table: String,
36 column: String,
37 query_expr: Expr,
38 k: u64,
39 candidates: Option<Box<PhysicalPlan>>,
40 },
41 HnswSearch {
42 table: String,
43 column: String,
44 query_expr: Expr,
45 k: u64,
46 candidates: Option<Box<PhysicalPlan>>,
47 },
48 Filter {
49 input: Box<PhysicalPlan>,
50 predicate: Expr,
51 },
52 Project {
53 input: Box<PhysicalPlan>,
54 columns: Vec<ProjectColumn>,
55 },
56 Distinct {
57 input: Box<PhysicalPlan>,
58 },
59 Join {
60 left: Box<PhysicalPlan>,
61 right: Box<PhysicalPlan>,
62 condition: Expr,
63 join_type: JoinType,
64 left_alias: Option<String>,
65 right_alias: Option<String>,
66 },
67 Sort {
68 input: Box<PhysicalPlan>,
69 keys: Vec<SortKey>,
70 },
71 Limit {
72 input: Box<PhysicalPlan>,
73 count: u64,
74 },
75 MaterializeCte {
76 name: String,
77 input: Box<PhysicalPlan>,
78 },
79 CteRef {
80 name: String,
81 },
82 Union {
83 inputs: Vec<PhysicalPlan>,
84 all: bool,
85 },
86 Pipeline(Vec<PhysicalPlan>),
87 SetMemoryLimit(SetMemoryLimitValue),
88 ShowMemoryLimit,
89 SetDiskLimit(SetDiskLimitValue),
90 ShowDiskLimit,
91 SetSyncConflictPolicy(String),
92 ShowSyncConflictPolicy,
93}
94
95impl PhysicalPlan {
96 pub fn explain(&self) -> String {
97 match self {
98 PhysicalPlan::GraphBfs { steps, .. } => {
99 format!(
100 "GraphBfs(steps={})",
101 steps
102 .iter()
103 .map(|step| format!(
104 "{}..{}:{:?}",
105 step.min_depth, step.max_depth, step.edge_types
106 ))
107 .collect::<Vec<_>>()
108 .join(" -> ")
109 )
110 }
111 PhysicalPlan::VectorSearch {
112 table, column, k, ..
113 } => {
114 format!("VectorSearch(table={}, column={}, k={})", table, column, k)
115 }
116 PhysicalPlan::HnswSearch {
117 table, column, k, ..
118 } => {
119 format!("HNSWSearch(table={}, column={}, k={})", table, column, k)
120 }
121 PhysicalPlan::Scan { table, .. } => format!("Scan(table={})", table),
122 PhysicalPlan::AlterTable(p) => format!("AlterTable(table={})", p.table),
123 PhysicalPlan::Insert(p) => format!("Insert(table={})", p.table),
124 PhysicalPlan::Delete(p) => format!("Delete(table={})", p.table),
125 PhysicalPlan::Update(p) => format!("Update(table={})", p.table),
126 PhysicalPlan::Pipeline(plans) => plans
127 .iter()
128 .map(Self::explain)
129 .collect::<Vec<_>>()
130 .join(" -> "),
131 _ => format!("{:?}", self),
132 }
133 }
134}
135
136#[derive(Debug, Clone)]
137pub struct GraphStepPlan {
138 pub edge_types: Vec<String>,
139 pub direction: Direction,
140 pub min_depth: u32,
141 pub max_depth: u32,
142 pub target_alias: String,
143}
144
145#[derive(Debug, Clone)]
146pub struct CreateTablePlan {
147 pub name: String,
148 pub columns: Vec<ColumnDef>,
149 pub unique_constraints: Vec<Vec<String>>,
150 pub immutable: bool,
151 pub state_machine: Option<StateMachineDef>,
152 pub dag_edge_types: Vec<String>,
153 pub propagation_rules: Vec<PropagationRule>,
154 pub retain: Option<RetainOption>,
155}
156
157#[derive(Debug, Clone)]
158pub struct AlterTablePlan {
159 pub table: String,
160 pub action: AlterAction,
161}
162
163#[derive(Debug, Clone)]
164pub struct CreateIndexPlan {
165 pub name: String,
166 pub table: String,
167 pub columns: Vec<(String, contextdb_core::SortDirection)>,
168}
169
170#[derive(Debug, Clone)]
171pub struct DropIndexPlan {
172 pub name: String,
173 pub table: String,
174 pub if_exists: bool,
175}
176
177#[derive(Debug, Clone)]
178pub struct InsertPlan {
179 pub table: String,
180 pub columns: Vec<String>,
181 pub values: Vec<Vec<Expr>>,
182 pub on_conflict: Option<OnConflictPlan>,
183}
184
185#[derive(Debug, Clone)]
186pub struct OnConflictPlan {
187 pub columns: Vec<String>,
188 pub update_columns: Vec<(String, Expr)>,
189}
190
191#[derive(Debug, Clone)]
192pub struct DeletePlan {
193 pub table: String,
194 pub where_clause: Option<Expr>,
195}
196
197#[derive(Debug, Clone)]
198pub struct UpdatePlan {
199 pub table: String,
200 pub assignments: Vec<(String, Expr)>,
201 pub where_clause: Option<Expr>,
202}
203
204#[derive(Debug, Clone)]
205pub struct ProjectColumn {
206 pub expr: Expr,
207 pub alias: Option<String>,
208}
209
210#[derive(Debug, Clone)]
211pub struct SortKey {
212 pub expr: Expr,
213 pub direction: SortDirection,
214}
215
216#[derive(Debug, Clone, Copy)]
217pub enum JoinType {
218 Inner,
219 Left,
220}
221
222#[derive(Debug, Clone)]
223pub struct ScanRange {
224 pub lower: std::ops::Bound<contextdb_core::Value>,
225 pub upper: std::ops::Bound<contextdb_core::Value>,
226 pub equality: Option<contextdb_core::Value>,
227}
228
229impl Default for ScanRange {
230 fn default() -> Self {
231 Self {
232 lower: std::ops::Bound::Unbounded,
233 upper: std::ops::Bound::Unbounded,
234 equality: None,
235 }
236 }
237}
238
239impl From<OnConflict> for OnConflictPlan {
240 fn from(value: OnConflict) -> Self {
241 Self {
242 columns: value.columns,
243 update_columns: value.update_columns,
244 }
245 }
246}