kotoba_query_engine/
planner.rs

1//! Query Planner
2//!
3//! Plans the execution of GQL queries by creating optimized execution plans.
4
5use std::sync::Arc;
6use async_trait::async_trait;
7use serde::{Deserialize, Serialize};
8use anyhow::Result;
9
10use crate::ast::*;
11use crate::types::*;
12use crate::{ProjectionPort, IndexManagerPort};
13use kotoba_storage::KeyValueStore;
14
15/// Query planner with KeyValueStore backend
16pub struct QueryPlanner<T: KeyValueStore> {
17    storage: Arc<T>,
18}
19
20impl<T: KeyValueStore + 'static> QueryPlanner<T> {
21    pub fn new(storage: Arc<T>) -> Self {
22        Self { storage }
23    }
24
25    /// Create execution plan for a query
26    pub async fn plan(&self, query: GqlQuery) -> Result<ExecutionPlan> {
27        let mut plan = ExecutionPlan::default();
28
29        // Process each clause
30        for clause in query.clauses {
31            match clause {
32                QueryClause::Match(match_clause) => {
33                    plan.steps.push(ExecutionStep::Match(self.plan_match(match_clause).await?));
34                }
35                QueryClause::Where(where_clause) => {
36                    plan.steps.push(ExecutionStep::Filter(self.plan_where(where_clause)?));
37                }
38                QueryClause::GroupBy(group_by) => {
39                    plan.steps.push(ExecutionStep::GroupBy(self.plan_group_by(group_by)?));
40                }
41                QueryClause::OrderBy(order_by) => {
42                    plan.steps.push(ExecutionStep::Sort(self.plan_order_by(order_by)?));
43                }
44                QueryClause::Limit(limit) => {
45                    plan.steps.push(ExecutionStep::Limit(limit));
46                }
47                _ => {} // Other clauses not implemented yet
48            }
49        }
50
51        // Add return step
52        if let Some(return_clause) = query.returning {
53            plan.steps.push(ExecutionStep::Return(self.plan_return(return_clause)?));
54        }
55
56        Ok(plan)
57    }
58
59    async fn plan_match(&self, match_clause: MatchClause) -> Result<MatchPlan> {
60        let mut vertex_scans = Vec::new();
61        let mut edge_scans = Vec::new();
62
63        // Analyze graph pattern for optimization opportunities
64        for path_pattern in match_clause.pattern.path_patterns {
65            if let PathTerm::PathElement(path_element) = path_pattern.path_term {
66                // Plan vertex scans
67                vertex_scans.push(self.plan_vertex_scan(&path_element.vertex_pattern).await?);
68
69                // Plan edge scans
70                for edge_pattern in path_element.edge_patterns {
71                    edge_scans.push(self.plan_edge_scan(&edge_pattern).await?);
72                }
73            }
74        }
75
76        Ok(MatchPlan {
77            vertex_scans,
78            edge_scans,
79            join_strategy: JoinStrategy::HashJoin, // Default strategy
80        })
81    }
82
83    async fn plan_vertex_scan(&self, vertex_pattern: &VertexPattern) -> Result<VertexScanPlan> {
84        // Analyze vertex pattern for index usage
85        let mut index_candidates: Vec<String> = Vec::new();
86
87        for (property, _) in &vertex_pattern.properties {
88            // TODO: Check if there's an index for this property using KeyValueStore
89            // For now, assume no indices exist
90            // if let Ok(index_exists) = self.check_vertex_index_exists(property).await {
91            //     if index_exists {
92            //         index_candidates.push(property.clone());
93            //     }
94            // }
95        }
96
97        // Choose the best index or fallback to scan
98        let scan_type = if !index_candidates.is_empty() {
99            ScanType::IndexScan(index_candidates[0].clone())
100        } else {
101            ScanType::FullScan
102        };
103
104        Ok(VertexScanPlan {
105            labels: vertex_pattern.labels.clone(),
106            properties: vertex_pattern.properties.clone(),
107            scan_type,
108        })
109    }
110
111    async fn plan_edge_scan(&self, edge_pattern: &EdgePattern) -> Result<EdgeScanPlan> {
112        // Similar logic for edge scanning
113        let mut index_candidates: Vec<String> = Vec::new();
114
115        for (property, _) in &edge_pattern.properties {
116            // TODO: Check if there's an index for this property using KeyValueStore
117            // For now, assume no indices exist
118            // if let Ok(index_exists) = self.check_edge_index_exists(property).await {
119            //     if index_exists {
120            //         index_candidates.push(property.clone());
121            //     }
122            // }
123        }
124
125        let scan_type = if !index_candidates.is_empty() {
126            ScanType::IndexScan(index_candidates[0].clone())
127        } else {
128            ScanType::FullScan
129        };
130
131        Ok(EdgeScanPlan {
132            labels: edge_pattern.labels.clone(),
133            properties: edge_pattern.properties.clone(),
134            direction: edge_pattern.direction.clone(),
135            scan_type,
136        })
137    }
138
139    fn plan_where(&self, where_clause: WhereClause) -> Result<FilterPlan> {
140        // Analyze WHERE clause for optimization
141        let filter_type = match where_clause.expression {
142            BooleanExpression::Comparison(comp) => {
143                FilterType::Comparison(comp)
144            }
145            BooleanExpression::Exists(pattern) => {
146                FilterType::Exists(*pattern)
147            }
148            _ => FilterType::Generic(where_clause.expression),
149        };
150
151        Ok(FilterPlan {
152            filter_type,
153        })
154    }
155
156    fn plan_group_by(&self, group_by: GroupByClause) -> Result<GroupByPlan> {
157        Ok(GroupByPlan {
158            keys: group_by.grouping_keys,
159        })
160    }
161
162    fn plan_order_by(&self, order_by: OrderByClause) -> Result<SortPlan> {
163        Ok(SortPlan {
164            keys: order_by.sort_keys,
165        })
166    }
167
168    fn plan_return(&self, return_clause: ReturnClause) -> Result<ReturnPlan> {
169        Ok(ReturnPlan {
170            items: return_clause.items,
171            distinct: return_clause.distinct,
172        })
173    }
174}
175
176/// Execution plan types
177#[derive(Debug, Clone, Serialize, Deserialize)]
178pub struct ExecutionPlan {
179    pub steps: Vec<ExecutionStep>,
180}
181
182impl Default for ExecutionPlan {
183    fn default() -> Self {
184        Self { steps: Vec::new() }
185    }
186}
187
188#[derive(Debug, Clone, Serialize, Deserialize)]
189pub enum ExecutionStep {
190    Match(MatchPlan),
191    Filter(FilterPlan),
192    GroupBy(GroupByPlan),
193    Sort(SortPlan),
194    Limit(LimitClause),
195    Return(ReturnPlan),
196}
197
198/// Match execution plan
199#[derive(Debug, Clone, Serialize, Deserialize)]
200pub struct MatchPlan {
201    pub vertex_scans: Vec<VertexScanPlan>,
202    pub edge_scans: Vec<EdgeScanPlan>,
203    pub join_strategy: JoinStrategy,
204}
205
206/// Vertex scan plan
207#[derive(Debug, Clone, Serialize, Deserialize)]
208pub struct VertexScanPlan {
209    pub labels: Vec<String>,
210    pub properties: std::collections::HashMap<String, ValueExpression>,
211    pub scan_type: ScanType,
212}
213
214/// Edge scan plan
215#[derive(Debug, Clone, Serialize, Deserialize)]
216pub struct EdgeScanPlan {
217    pub labels: Vec<String>,
218    pub properties: std::collections::HashMap<String, ValueExpression>,
219    pub direction: EdgeDirection,
220    pub scan_type: ScanType,
221}
222
223/// Scan types for optimization
224#[derive(Debug, Clone, Serialize, Deserialize)]
225pub enum ScanType {
226    FullScan,
227    IndexScan(String),
228    RangeScan { start: ValueExpression, end: ValueExpression },
229}
230
231/// Join strategies
232#[derive(Debug, Clone, Serialize, Deserialize)]
233pub enum JoinStrategy {
234    HashJoin,
235    NestedLoopJoin,
236    MergeJoin,
237}
238
239/// Filter plan
240#[derive(Debug, Clone, Serialize, Deserialize)]
241pub struct FilterPlan {
242    pub filter_type: FilterType,
243}
244
245/// Filter types
246#[derive(Debug, Clone, Serialize, Deserialize)]
247pub enum FilterType {
248    Comparison(ComparisonExpression),
249    Exists(GraphPattern),
250    Generic(BooleanExpression),
251}
252
253/// Group by plan
254#[derive(Debug, Clone, Serialize, Deserialize)]
255pub struct GroupByPlan {
256    pub keys: Vec<ValueExpression>,
257}
258
259/// Sort plan
260#[derive(Debug, Clone, Serialize, Deserialize)]
261pub struct SortPlan {
262    pub keys: Vec<SortKey>,
263}
264
265/// Return plan
266#[derive(Debug, Clone, Serialize, Deserialize)]
267pub struct ReturnPlan {
268    pub items: Vec<ReturnItem>,
269    pub distinct: bool,
270}
271
272
273#[cfg(test)]
274mod tests {
275    use super::*;
276
277    #[test]
278    fn test_query_planner_creation() {
279        // Test that planner can be created
280        // This will be expanded with actual planning tests
281    }
282}