Skip to main content

contextdb_planner/
plan.rs

1use contextdb_core::{Direction, PropagationRule};
2use contextdb_parser::ast::{
3    AlterAction, ColumnDef, Expr, OnConflict, RetainOption, SetDiskLimitValue, SetMemoryLimitValue,
4    SortDirection, StateMachineDef,
5};
6
7#[derive(Debug, Clone)]
8pub enum PhysicalPlan {
9    CreateTable(CreateTablePlan),
10    AlterTable(AlterTablePlan),
11    DropTable(String),
12    CreateIndex(CreateIndexPlan),
13    Insert(InsertPlan),
14    Delete(DeletePlan),
15    Update(UpdatePlan),
16    Scan {
17        table: String,
18        alias: Option<String>,
19        filter: Option<Expr>,
20    },
21    IndexScan {
22        table: String,
23        index: String,
24        range: ScanRange,
25    },
26    GraphBfs {
27        start_alias: String,
28        start_expr: Expr,
29        start_candidates: Option<Box<PhysicalPlan>>,
30        steps: Vec<GraphStepPlan>,
31        filter: Option<Expr>,
32    },
33    VectorSearch {
34        table: String,
35        column: String,
36        query_expr: Expr,
37        k: u64,
38        candidates: Option<Box<PhysicalPlan>>,
39    },
40    HnswSearch {
41        table: String,
42        column: String,
43        query_expr: Expr,
44        k: u64,
45        candidates: Option<Box<PhysicalPlan>>,
46    },
47    Filter {
48        input: Box<PhysicalPlan>,
49        predicate: Expr,
50    },
51    Project {
52        input: Box<PhysicalPlan>,
53        columns: Vec<ProjectColumn>,
54    },
55    Distinct {
56        input: Box<PhysicalPlan>,
57    },
58    Join {
59        left: Box<PhysicalPlan>,
60        right: Box<PhysicalPlan>,
61        condition: Expr,
62        join_type: JoinType,
63        left_alias: Option<String>,
64        right_alias: Option<String>,
65    },
66    Sort {
67        input: Box<PhysicalPlan>,
68        keys: Vec<SortKey>,
69    },
70    Limit {
71        input: Box<PhysicalPlan>,
72        count: u64,
73    },
74    MaterializeCte {
75        name: String,
76        input: Box<PhysicalPlan>,
77    },
78    CteRef {
79        name: String,
80    },
81    Union {
82        inputs: Vec<PhysicalPlan>,
83        all: bool,
84    },
85    Pipeline(Vec<PhysicalPlan>),
86    SetMemoryLimit(SetMemoryLimitValue),
87    ShowMemoryLimit,
88    SetDiskLimit(SetDiskLimitValue),
89    ShowDiskLimit,
90    SetSyncConflictPolicy(String),
91    ShowSyncConflictPolicy,
92}
93
94impl PhysicalPlan {
95    pub fn explain(&self) -> String {
96        match self {
97            PhysicalPlan::GraphBfs { steps, .. } => {
98                format!(
99                    "GraphBfs(steps={})",
100                    steps
101                        .iter()
102                        .map(|step| format!(
103                            "{}..{}:{:?}",
104                            step.min_depth, step.max_depth, step.edge_types
105                        ))
106                        .collect::<Vec<_>>()
107                        .join(" -> ")
108                )
109            }
110            PhysicalPlan::VectorSearch {
111                table, column, k, ..
112            } => {
113                format!("VectorSearch(table={}, column={}, k={})", table, column, k)
114            }
115            PhysicalPlan::HnswSearch {
116                table, column, k, ..
117            } => {
118                format!("HNSWSearch(table={}, column={}, k={})", table, column, k)
119            }
120            PhysicalPlan::Scan { table, .. } => format!("Scan(table={})", table),
121            PhysicalPlan::AlterTable(p) => format!("AlterTable(table={})", p.table),
122            PhysicalPlan::Insert(p) => format!("Insert(table={})", p.table),
123            PhysicalPlan::Delete(p) => format!("Delete(table={})", p.table),
124            PhysicalPlan::Update(p) => format!("Update(table={})", p.table),
125            PhysicalPlan::Pipeline(plans) => plans
126                .iter()
127                .map(Self::explain)
128                .collect::<Vec<_>>()
129                .join(" -> "),
130            _ => format!("{:?}", self),
131        }
132    }
133}
134
135#[derive(Debug, Clone)]
136pub struct GraphStepPlan {
137    pub edge_types: Vec<String>,
138    pub direction: Direction,
139    pub min_depth: u32,
140    pub max_depth: u32,
141    pub target_alias: String,
142}
143
144#[derive(Debug, Clone)]
145pub struct CreateTablePlan {
146    pub name: String,
147    pub columns: Vec<ColumnDef>,
148    pub immutable: bool,
149    pub state_machine: Option<StateMachineDef>,
150    pub dag_edge_types: Vec<String>,
151    pub propagation_rules: Vec<PropagationRule>,
152    pub retain: Option<RetainOption>,
153}
154
155#[derive(Debug, Clone)]
156pub struct AlterTablePlan {
157    pub table: String,
158    pub action: AlterAction,
159}
160
161#[derive(Debug, Clone)]
162pub struct CreateIndexPlan {
163    pub name: String,
164    pub table: String,
165    pub columns: Vec<String>,
166}
167
168#[derive(Debug, Clone)]
169pub struct InsertPlan {
170    pub table: String,
171    pub columns: Vec<String>,
172    pub values: Vec<Vec<Expr>>,
173    pub on_conflict: Option<OnConflictPlan>,
174}
175
176#[derive(Debug, Clone)]
177pub struct OnConflictPlan {
178    pub columns: Vec<String>,
179    pub update_columns: Vec<(String, Expr)>,
180}
181
182#[derive(Debug, Clone)]
183pub struct DeletePlan {
184    pub table: String,
185    pub where_clause: Option<Expr>,
186}
187
188#[derive(Debug, Clone)]
189pub struct UpdatePlan {
190    pub table: String,
191    pub assignments: Vec<(String, Expr)>,
192    pub where_clause: Option<Expr>,
193}
194
195#[derive(Debug, Clone)]
196pub struct ProjectColumn {
197    pub expr: Expr,
198    pub alias: Option<String>,
199}
200
201#[derive(Debug, Clone)]
202pub struct SortKey {
203    pub expr: Expr,
204    pub direction: SortDirection,
205}
206
207#[derive(Debug, Clone, Copy)]
208pub enum JoinType {
209    Inner,
210    Left,
211}
212
213#[derive(Debug, Clone)]
214pub struct ScanRange;
215
216impl From<OnConflict> for OnConflictPlan {
217    fn from(value: OnConflict) -> Self {
218        Self {
219            columns: value.columns,
220            update_columns: value.update_columns,
221        }
222    }
223}