Skip to main content

contextdb_planner/
plan.rs

1use contextdb_core::{Direction, PropagationRule};
2use contextdb_parser::ast::{
3    AlterAction, ColumnDef, Expr, OnConflict, RetainOption, SetDiskLimitValue, SetMemoryLimitValue,
4    SortDirection, StateMachineDef,
5};
6
7#[derive(Debug, Clone)]
8pub enum PhysicalPlan {
9    CreateTable(CreateTablePlan),
10    AlterTable(AlterTablePlan),
11    DropTable(String),
12    CreateIndex(CreateIndexPlan),
13    DropIndex(DropIndexPlan),
14    Insert(InsertPlan),
15    Delete(DeletePlan),
16    Update(UpdatePlan),
17    Scan {
18        table: String,
19        alias: Option<String>,
20        filter: Option<Expr>,
21    },
22    IndexScan {
23        table: String,
24        index: String,
25        range: ScanRange,
26    },
27    GraphBfs {
28        start_alias: String,
29        start_expr: Expr,
30        start_candidates: Option<Box<PhysicalPlan>>,
31        steps: Vec<GraphStepPlan>,
32        filter: Option<Expr>,
33    },
34    VectorSearch {
35        table: String,
36        column: String,
37        query_expr: Expr,
38        k: u64,
39        candidates: Option<Box<PhysicalPlan>>,
40        sort_key: Option<String>,
41    },
42    HnswSearch {
43        table: String,
44        column: String,
45        query_expr: Expr,
46        k: u64,
47        candidates: Option<Box<PhysicalPlan>>,
48        sort_key: Option<String>,
49    },
50    Filter {
51        input: Box<PhysicalPlan>,
52        predicate: Expr,
53    },
54    Project {
55        input: Box<PhysicalPlan>,
56        columns: Vec<ProjectColumn>,
57    },
58    Distinct {
59        input: Box<PhysicalPlan>,
60    },
61    Join {
62        left: Box<PhysicalPlan>,
63        right: Box<PhysicalPlan>,
64        condition: Expr,
65        join_type: JoinType,
66        left_alias: Option<String>,
67        right_alias: Option<String>,
68    },
69    Sort {
70        input: Box<PhysicalPlan>,
71        keys: Vec<SortKey>,
72    },
73    Limit {
74        input: Box<PhysicalPlan>,
75        count: u64,
76    },
77    MaterializeCte {
78        name: String,
79        input: Box<PhysicalPlan>,
80    },
81    CteRef {
82        name: String,
83    },
84    Union {
85        inputs: Vec<PhysicalPlan>,
86        all: bool,
87    },
88    Pipeline(Vec<PhysicalPlan>),
89    SetMemoryLimit(SetMemoryLimitValue),
90    ShowMemoryLimit,
91    SetDiskLimit(SetDiskLimitValue),
92    ShowDiskLimit,
93    SetSyncConflictPolicy(String),
94    ShowSyncConflictPolicy,
95    ShowVectorIndexes,
96}
97
98impl PhysicalPlan {
99    pub fn explain(&self) -> String {
100        match self {
101            PhysicalPlan::GraphBfs { steps, .. } => {
102                format!(
103                    "GraphBfs(steps={})",
104                    steps
105                        .iter()
106                        .map(|step| format!(
107                            "{}..{}:{:?}",
108                            step.min_depth, step.max_depth, step.edge_types
109                        ))
110                        .collect::<Vec<_>>()
111                        .join(" -> ")
112                )
113            }
114            PhysicalPlan::VectorSearch {
115                table, column, k, ..
116            } => {
117                format!("VectorSearch(table={}, column={}, k={})", table, column, k)
118            }
119            PhysicalPlan::HnswSearch {
120                table, column, k, ..
121            } => {
122                format!("HNSWSearch(table={}, column={}, k={})", table, column, k)
123            }
124            PhysicalPlan::Scan { table, .. } => format!("Scan(table={})", table),
125            PhysicalPlan::AlterTable(p) => format!("AlterTable(table={})", p.table),
126            PhysicalPlan::Insert(p) => format!("Insert(table={})", p.table),
127            PhysicalPlan::Delete(p) => format!("Delete(table={})", p.table),
128            PhysicalPlan::Update(p) => format!("Update(table={})", p.table),
129            PhysicalPlan::Pipeline(plans) => plans
130                .iter()
131                .map(Self::explain)
132                .collect::<Vec<_>>()
133                .join(" -> "),
134            _ => format!("{:?}", self),
135        }
136    }
137}
138
139#[derive(Debug, Clone)]
140pub struct GraphStepPlan {
141    pub edge_types: Vec<String>,
142    pub direction: Direction,
143    pub min_depth: u32,
144    pub max_depth: u32,
145    pub target_alias: String,
146}
147
148#[derive(Debug, Clone)]
149pub struct CreateTablePlan {
150    pub name: String,
151    pub columns: Vec<ColumnDef>,
152    pub unique_constraints: Vec<Vec<String>>,
153    pub immutable: bool,
154    pub state_machine: Option<StateMachineDef>,
155    pub dag_edge_types: Vec<String>,
156    pub propagation_rules: Vec<PropagationRule>,
157    pub retain: Option<RetainOption>,
158}
159
160#[derive(Debug, Clone)]
161pub struct AlterTablePlan {
162    pub table: String,
163    pub action: AlterAction,
164}
165
166#[derive(Debug, Clone)]
167pub struct CreateIndexPlan {
168    pub name: String,
169    pub table: String,
170    pub columns: Vec<(String, contextdb_core::SortDirection)>,
171}
172
173#[derive(Debug, Clone)]
174pub struct DropIndexPlan {
175    pub name: String,
176    pub table: String,
177    pub if_exists: bool,
178}
179
180#[derive(Debug, Clone)]
181pub struct InsertPlan {
182    pub table: String,
183    pub columns: Vec<String>,
184    pub values: Vec<Vec<Expr>>,
185    pub on_conflict: Option<OnConflictPlan>,
186}
187
188#[derive(Debug, Clone)]
189pub struct OnConflictPlan {
190    pub columns: Vec<String>,
191    pub update_columns: Vec<(String, Expr)>,
192}
193
194#[derive(Debug, Clone)]
195pub struct DeletePlan {
196    pub table: String,
197    pub where_clause: Option<Expr>,
198}
199
200#[derive(Debug, Clone)]
201pub struct UpdatePlan {
202    pub table: String,
203    pub assignments: Vec<(String, Expr)>,
204    pub where_clause: Option<Expr>,
205}
206
207#[derive(Debug, Clone)]
208pub struct ProjectColumn {
209    pub expr: Expr,
210    pub alias: Option<String>,
211}
212
213#[derive(Debug, Clone)]
214pub struct SortKey {
215    pub expr: Expr,
216    pub direction: SortDirection,
217}
218
219#[derive(Debug, Clone, Copy)]
220pub enum JoinType {
221    Inner,
222    Left,
223}
224
225#[derive(Debug, Clone)]
226pub struct ScanRange {
227    pub lower: std::ops::Bound<contextdb_core::Value>,
228    pub upper: std::ops::Bound<contextdb_core::Value>,
229    pub equality: Option<contextdb_core::Value>,
230}
231
232impl Default for ScanRange {
233    fn default() -> Self {
234        Self {
235            lower: std::ops::Bound::Unbounded,
236            upper: std::ops::Bound::Unbounded,
237            equality: None,
238        }
239    }
240}
241
242impl From<OnConflict> for OnConflictPlan {
243    fn from(value: OnConflict) -> Self {
244        Self {
245            columns: value.columns,
246            update_columns: value.update_columns,
247        }
248    }
249}