Skip to main content

contextdb_planner/
plan.rs

1use contextdb_core::{Direction, PropagationRule};
2use contextdb_parser::ast::{
3    AlterAction, ColumnDef, Expr, OnConflict, RetainOption, SetDiskLimitValue, SetMemoryLimitValue,
4    SortDirection, StateMachineDef,
5};
6
7#[derive(Debug, Clone)]
8pub enum PhysicalPlan {
9    CreateTable(CreateTablePlan),
10    AlterTable(AlterTablePlan),
11    DropTable(String),
12    CreateIndex(CreateIndexPlan),
13    Insert(InsertPlan),
14    Delete(DeletePlan),
15    Update(UpdatePlan),
16    Scan {
17        table: String,
18        alias: Option<String>,
19        filter: Option<Expr>,
20    },
21    IndexScan {
22        table: String,
23        index: String,
24        range: ScanRange,
25    },
26    GraphBfs {
27        start_alias: String,
28        start_expr: Expr,
29        start_candidates: Option<Box<PhysicalPlan>>,
30        steps: Vec<GraphStepPlan>,
31        filter: Option<Expr>,
32    },
33    VectorSearch {
34        table: String,
35        column: String,
36        query_expr: Expr,
37        k: u64,
38        candidates: Option<Box<PhysicalPlan>>,
39    },
40    HnswSearch {
41        table: String,
42        column: String,
43        query_expr: Expr,
44        k: u64,
45        candidates: Option<Box<PhysicalPlan>>,
46    },
47    Filter {
48        input: Box<PhysicalPlan>,
49        predicate: Expr,
50    },
51    Project {
52        input: Box<PhysicalPlan>,
53        columns: Vec<ProjectColumn>,
54    },
55    Distinct {
56        input: Box<PhysicalPlan>,
57    },
58    Join {
59        left: Box<PhysicalPlan>,
60        right: Box<PhysicalPlan>,
61        condition: Expr,
62        join_type: JoinType,
63        left_alias: Option<String>,
64        right_alias: Option<String>,
65    },
66    Sort {
67        input: Box<PhysicalPlan>,
68        keys: Vec<SortKey>,
69    },
70    Limit {
71        input: Box<PhysicalPlan>,
72        count: u64,
73    },
74    MaterializeCte {
75        name: String,
76        input: Box<PhysicalPlan>,
77    },
78    CteRef {
79        name: String,
80    },
81    Union {
82        inputs: Vec<PhysicalPlan>,
83        all: bool,
84    },
85    Pipeline(Vec<PhysicalPlan>),
86    SetMemoryLimit(SetMemoryLimitValue),
87    ShowMemoryLimit,
88    SetDiskLimit(SetDiskLimitValue),
89    ShowDiskLimit,
90    SetSyncConflictPolicy(String),
91    ShowSyncConflictPolicy,
92}
93
94impl PhysicalPlan {
95    pub fn explain(&self) -> String {
96        match self {
97            PhysicalPlan::GraphBfs { steps, .. } => {
98                format!(
99                    "GraphBfs(steps={})",
100                    steps
101                        .iter()
102                        .map(|step| format!(
103                            "{}..{}:{:?}",
104                            step.min_depth, step.max_depth, step.edge_types
105                        ))
106                        .collect::<Vec<_>>()
107                        .join(" -> ")
108                )
109            }
110            PhysicalPlan::VectorSearch {
111                table, column, k, ..
112            } => {
113                format!("VectorSearch(table={}, column={}, k={})", table, column, k)
114            }
115            PhysicalPlan::HnswSearch {
116                table, column, k, ..
117            } => {
118                format!("HNSWSearch(table={}, column={}, k={})", table, column, k)
119            }
120            PhysicalPlan::Scan { table, .. } => format!("Scan(table={})", table),
121            PhysicalPlan::AlterTable(p) => format!("AlterTable(table={})", p.table),
122            PhysicalPlan::Insert(p) => format!("Insert(table={})", p.table),
123            PhysicalPlan::Delete(p) => format!("Delete(table={})", p.table),
124            PhysicalPlan::Update(p) => format!("Update(table={})", p.table),
125            PhysicalPlan::Pipeline(plans) => plans
126                .iter()
127                .map(Self::explain)
128                .collect::<Vec<_>>()
129                .join(" -> "),
130            _ => format!("{:?}", self),
131        }
132    }
133}
134
135#[derive(Debug, Clone)]
136pub struct GraphStepPlan {
137    pub edge_types: Vec<String>,
138    pub direction: Direction,
139    pub min_depth: u32,
140    pub max_depth: u32,
141    pub target_alias: String,
142}
143
144#[derive(Debug, Clone)]
145pub struct CreateTablePlan {
146    pub name: String,
147    pub columns: Vec<ColumnDef>,
148    pub unique_constraints: Vec<Vec<String>>,
149    pub immutable: bool,
150    pub state_machine: Option<StateMachineDef>,
151    pub dag_edge_types: Vec<String>,
152    pub propagation_rules: Vec<PropagationRule>,
153    pub retain: Option<RetainOption>,
154}
155
156#[derive(Debug, Clone)]
157pub struct AlterTablePlan {
158    pub table: String,
159    pub action: AlterAction,
160}
161
162#[derive(Debug, Clone)]
163pub struct CreateIndexPlan {
164    pub name: String,
165    pub table: String,
166    pub columns: Vec<String>,
167}
168
169#[derive(Debug, Clone)]
170pub struct InsertPlan {
171    pub table: String,
172    pub columns: Vec<String>,
173    pub values: Vec<Vec<Expr>>,
174    pub on_conflict: Option<OnConflictPlan>,
175}
176
177#[derive(Debug, Clone)]
178pub struct OnConflictPlan {
179    pub columns: Vec<String>,
180    pub update_columns: Vec<(String, Expr)>,
181}
182
183#[derive(Debug, Clone)]
184pub struct DeletePlan {
185    pub table: String,
186    pub where_clause: Option<Expr>,
187}
188
189#[derive(Debug, Clone)]
190pub struct UpdatePlan {
191    pub table: String,
192    pub assignments: Vec<(String, Expr)>,
193    pub where_clause: Option<Expr>,
194}
195
196#[derive(Debug, Clone)]
197pub struct ProjectColumn {
198    pub expr: Expr,
199    pub alias: Option<String>,
200}
201
202#[derive(Debug, Clone)]
203pub struct SortKey {
204    pub expr: Expr,
205    pub direction: SortDirection,
206}
207
208#[derive(Debug, Clone, Copy)]
209pub enum JoinType {
210    Inner,
211    Left,
212}
213
214#[derive(Debug, Clone)]
215pub struct ScanRange;
216
217impl From<OnConflict> for OnConflictPlan {
218    fn from(value: OnConflict) -> Self {
219        Self {
220            columns: value.columns,
221            update_columns: value.update_columns,
222        }
223    }
224}