Skip to main content

contextdb_planner/
plan.rs

1use contextdb_core::{Direction, PropagationRule};
2use contextdb_parser::ast::{
3    AlterAction, ColumnDef, Expr, OnConflict, RetainOption, SetDiskLimitValue, SetMemoryLimitValue,
4    SortDirection, StateMachineDef,
5};
6
7#[derive(Debug, Clone)]
8pub enum PhysicalPlan {
9    CreateTable(CreateTablePlan),
10    AlterTable(AlterTablePlan),
11    DropTable(String),
12    CreateIndex(CreateIndexPlan),
13    DropIndex(DropIndexPlan),
14    Insert(InsertPlan),
15    Delete(DeletePlan),
16    Update(UpdatePlan),
17    Scan {
18        table: String,
19        alias: Option<String>,
20        filter: Option<Expr>,
21    },
22    IndexScan {
23        table: String,
24        index: String,
25        range: ScanRange,
26    },
27    GraphBfs {
28        start_alias: String,
29        start_expr: Expr,
30        start_candidates: Option<Box<PhysicalPlan>>,
31        steps: Vec<GraphStepPlan>,
32        filter: Option<Expr>,
33    },
34    VectorSearch {
35        table: String,
36        column: String,
37        query_expr: Expr,
38        k: u64,
39        candidates: Option<Box<PhysicalPlan>>,
40    },
41    HnswSearch {
42        table: String,
43        column: String,
44        query_expr: Expr,
45        k: u64,
46        candidates: Option<Box<PhysicalPlan>>,
47    },
48    Filter {
49        input: Box<PhysicalPlan>,
50        predicate: Expr,
51    },
52    Project {
53        input: Box<PhysicalPlan>,
54        columns: Vec<ProjectColumn>,
55    },
56    Distinct {
57        input: Box<PhysicalPlan>,
58    },
59    Join {
60        left: Box<PhysicalPlan>,
61        right: Box<PhysicalPlan>,
62        condition: Expr,
63        join_type: JoinType,
64        left_alias: Option<String>,
65        right_alias: Option<String>,
66    },
67    Sort {
68        input: Box<PhysicalPlan>,
69        keys: Vec<SortKey>,
70    },
71    Limit {
72        input: Box<PhysicalPlan>,
73        count: u64,
74    },
75    MaterializeCte {
76        name: String,
77        input: Box<PhysicalPlan>,
78    },
79    CteRef {
80        name: String,
81    },
82    Union {
83        inputs: Vec<PhysicalPlan>,
84        all: bool,
85    },
86    Pipeline(Vec<PhysicalPlan>),
87    SetMemoryLimit(SetMemoryLimitValue),
88    ShowMemoryLimit,
89    SetDiskLimit(SetDiskLimitValue),
90    ShowDiskLimit,
91    SetSyncConflictPolicy(String),
92    ShowSyncConflictPolicy,
93}
94
95impl PhysicalPlan {
96    pub fn explain(&self) -> String {
97        match self {
98            PhysicalPlan::GraphBfs { steps, .. } => {
99                format!(
100                    "GraphBfs(steps={})",
101                    steps
102                        .iter()
103                        .map(|step| format!(
104                            "{}..{}:{:?}",
105                            step.min_depth, step.max_depth, step.edge_types
106                        ))
107                        .collect::<Vec<_>>()
108                        .join(" -> ")
109                )
110            }
111            PhysicalPlan::VectorSearch {
112                table, column, k, ..
113            } => {
114                format!("VectorSearch(table={}, column={}, k={})", table, column, k)
115            }
116            PhysicalPlan::HnswSearch {
117                table, column, k, ..
118            } => {
119                format!("HNSWSearch(table={}, column={}, k={})", table, column, k)
120            }
121            PhysicalPlan::Scan { table, .. } => format!("Scan(table={})", table),
122            PhysicalPlan::AlterTable(p) => format!("AlterTable(table={})", p.table),
123            PhysicalPlan::Insert(p) => format!("Insert(table={})", p.table),
124            PhysicalPlan::Delete(p) => format!("Delete(table={})", p.table),
125            PhysicalPlan::Update(p) => format!("Update(table={})", p.table),
126            PhysicalPlan::Pipeline(plans) => plans
127                .iter()
128                .map(Self::explain)
129                .collect::<Vec<_>>()
130                .join(" -> "),
131            _ => format!("{:?}", self),
132        }
133    }
134}
135
136#[derive(Debug, Clone)]
137pub struct GraphStepPlan {
138    pub edge_types: Vec<String>,
139    pub direction: Direction,
140    pub min_depth: u32,
141    pub max_depth: u32,
142    pub target_alias: String,
143}
144
145#[derive(Debug, Clone)]
146pub struct CreateTablePlan {
147    pub name: String,
148    pub columns: Vec<ColumnDef>,
149    pub unique_constraints: Vec<Vec<String>>,
150    pub immutable: bool,
151    pub state_machine: Option<StateMachineDef>,
152    pub dag_edge_types: Vec<String>,
153    pub propagation_rules: Vec<PropagationRule>,
154    pub retain: Option<RetainOption>,
155}
156
157#[derive(Debug, Clone)]
158pub struct AlterTablePlan {
159    pub table: String,
160    pub action: AlterAction,
161}
162
163#[derive(Debug, Clone)]
164pub struct CreateIndexPlan {
165    pub name: String,
166    pub table: String,
167    pub columns: Vec<(String, contextdb_core::SortDirection)>,
168}
169
170#[derive(Debug, Clone)]
171pub struct DropIndexPlan {
172    pub name: String,
173    pub table: String,
174    pub if_exists: bool,
175}
176
177#[derive(Debug, Clone)]
178pub struct InsertPlan {
179    pub table: String,
180    pub columns: Vec<String>,
181    pub values: Vec<Vec<Expr>>,
182    pub on_conflict: Option<OnConflictPlan>,
183}
184
185#[derive(Debug, Clone)]
186pub struct OnConflictPlan {
187    pub columns: Vec<String>,
188    pub update_columns: Vec<(String, Expr)>,
189}
190
191#[derive(Debug, Clone)]
192pub struct DeletePlan {
193    pub table: String,
194    pub where_clause: Option<Expr>,
195}
196
197#[derive(Debug, Clone)]
198pub struct UpdatePlan {
199    pub table: String,
200    pub assignments: Vec<(String, Expr)>,
201    pub where_clause: Option<Expr>,
202}
203
204#[derive(Debug, Clone)]
205pub struct ProjectColumn {
206    pub expr: Expr,
207    pub alias: Option<String>,
208}
209
210#[derive(Debug, Clone)]
211pub struct SortKey {
212    pub expr: Expr,
213    pub direction: SortDirection,
214}
215
216#[derive(Debug, Clone, Copy)]
217pub enum JoinType {
218    Inner,
219    Left,
220}
221
222#[derive(Debug, Clone)]
223pub struct ScanRange {
224    pub lower: std::ops::Bound<contextdb_core::Value>,
225    pub upper: std::ops::Bound<contextdb_core::Value>,
226    pub equality: Option<contextdb_core::Value>,
227}
228
229impl Default for ScanRange {
230    fn default() -> Self {
231        Self {
232            lower: std::ops::Bound::Unbounded,
233            upper: std::ops::Bound::Unbounded,
234            equality: None,
235        }
236    }
237}
238
239impl From<OnConflict> for OnConflictPlan {
240    fn from(value: OnConflict) -> Self {
241        Self {
242            columns: value.columns,
243            update_columns: value.update_columns,
244        }
245    }
246}