1use std::sync::Arc;
6use async_trait::async_trait;
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use anyhow::Result;
10use futures::stream::{self, StreamExt};
11
12use crate::ast::*;
13use crate::types::*;
14use crate::planner::*;
15use crate::{ProjectionPort, IndexManagerPort};
16use kotoba_storage::KeyValueStore;
17
18pub struct QueryExecutor<T: KeyValueStore> {
20 storage: Arc<T>,
21}
22
23impl<T: KeyValueStore + 'static> QueryExecutor<T> {
24 fn vertex_to_json_value(&self, vertex: Vertex) -> serde_json::Value {
26 serde_json::json!({
27 "id": vertex.id,
28 "labels": vertex.labels,
29 "properties": vertex.properties
30 })
31 }
32
33 fn edge_to_json_value(&self, edge: Edge) -> serde_json::Value {
35 serde_json::json!({
36 "id": edge.id,
37 "label": edge.label,
38 "from_vertex": edge.from_vertex,
39 "to_vertex": edge.to_vertex,
40 "properties": edge.properties
41 })
42 }
43
44 pub fn new(storage: Arc<T>) -> Self {
45 Self { storage }
46 }
47
48 pub async fn execute(
50 &self,
51 plan: ExecutionPlan,
52 context: crate::QueryContext,
53 ) -> Result<QueryResult> {
54 let mut current_result = ExecutionResult::Empty;
55
56 for step in plan.steps {
58 current_result = match step {
59 ExecutionStep::Match(match_plan) => {
60 self.execute_match(match_plan, current_result).await?
61 }
62 ExecutionStep::Filter(filter_plan) => {
63 self.execute_filter(filter_plan, current_result).await?
64 }
65 ExecutionStep::GroupBy(group_by_plan) => {
66 self.execute_group_by(group_by_plan, current_result).await?
67 }
68 ExecutionStep::Sort(sort_plan) => {
69 self.execute_sort(sort_plan, current_result).await?
70 }
71 ExecutionStep::Limit(limit_clause) => {
72 self.execute_limit(limit_clause, current_result).await?
73 }
74 ExecutionStep::Return(return_plan) => {
75 return self.execute_return(return_plan, current_result).await;
76 }
77 };
78 }
79
80 Ok(QueryResult::from(current_result))
82 }
83
84 async fn execute_match(
85 &self,
86 match_plan: MatchPlan,
87 _previous_result: ExecutionResult,
88 ) -> Result<ExecutionResult> {
89 let mut results = Vec::new();
90
91 for vertex_scan in match_plan.vertex_scans {
93 let vertices = self.execute_vertex_scan(vertex_scan).await?;
94 results.extend(vertices);
95 }
96
97 for edge_scan in match_plan.edge_scans {
99 let edges = self.execute_edge_scan(edge_scan).await?;
100 results.extend(edges.into_iter().map(|e| vec![e]));
102 }
103
104 Ok(ExecutionResult::Rows(results))
105 }
106
107 async fn execute_vertex_scan(&self, scan_plan: VertexScanPlan) -> Result<Vec<Vec<Value>>> {
108 let mut results = Vec::new();
109
110 let prefix = "vertex:".to_string();
114 let vertex_keys = self.storage.scan(prefix.as_bytes()).await?;
115
116 for key_bytes in vertex_keys {
117 if let Ok(key_str) = std::str::from_utf8(&key_bytes.0) {
118 if key_str.starts_with("vertex:") {
119 if let Some(vertex_data) = self.storage.get(&key_bytes.0).await? {
120 if let Ok(vertex_json) = serde_json::from_slice::<Value>(&vertex_data) {
121 results.push(vec![vertex_json]);
122 }
123 }
124 }
125 }
126 }
127
128 Ok(results)
129 }
130
131 async fn execute_edge_scan(&self, scan_plan: EdgeScanPlan) -> Result<Vec<Value>> {
132 let mut results = Vec::new();
133
134 let prefix = "edge:".to_string();
138 let edge_keys = self.storage.scan(prefix.as_bytes()).await?;
139
140 for key_bytes in edge_keys {
141 if let Ok(key_str) = std::str::from_utf8(&key_bytes.0) {
142 if key_str.starts_with("edge:") {
143 if let Some(edge_data) = self.storage.get(&key_bytes.0).await? {
144 if let Ok(edge_json) = serde_json::from_slice::<Value>(&edge_data) {
145 results.push(edge_json);
146 }
147 }
148 }
149 }
150 }
151
152 Ok(results)
153 }
154
155 async fn execute_filter(
156 &self,
157 filter_plan: FilterPlan,
158 input: ExecutionResult,
159 ) -> Result<ExecutionResult> {
160 match input {
161 ExecutionResult::Rows(rows) => {
162 let mut filtered_rows = Vec::new();
163
164 for row in rows {
165 if self.evaluate_filter(&filter_plan, &row).await? {
166 filtered_rows.push(row);
167 }
168 }
169
170 Ok(ExecutionResult::Rows(filtered_rows))
171 }
172 _ => Ok(input),
173 }
174 }
175
176 async fn evaluate_filter(&self, _filter_plan: &FilterPlan, _row: &[Value]) -> Result<bool> {
177 Ok(true)
180 }
181
182 async fn execute_group_by(
183 &self,
184 group_by_plan: GroupByPlan,
185 input: ExecutionResult,
186 ) -> Result<ExecutionResult> {
187 match input {
188 ExecutionResult::Rows(rows) => {
189 let mut groups = std::collections::HashMap::new();
190
191 for row in rows {
192 let key = self.compute_group_key(&group_by_plan.keys, &row).await?;
193 groups.entry(key).or_insert_with(Vec::new).push(row);
194 }
195
196 Ok(ExecutionResult::Grouped(groups))
197 }
198 _ => Ok(input),
199 }
200 }
201
202 async fn compute_group_key(&self, _keys: &[ValueExpression], _row: &[Value]) -> Result<String> {
203 Ok("default_group".to_string())
205 }
206
207 async fn execute_sort(
208 &self,
209 sort_plan: SortPlan,
210 input: ExecutionResult,
211 ) -> Result<ExecutionResult> {
212 match input {
213 ExecutionResult::Rows(mut rows) => {
214 Ok(ExecutionResult::Rows(rows))
217 }
218 _ => Ok(input),
219 }
220 }
221
222 async fn execute_limit(
223 &self,
224 limit_clause: LimitClause,
225 input: ExecutionResult,
226 ) -> Result<ExecutionResult> {
227 match input {
228 ExecutionResult::Rows(rows) => {
229 let start = limit_clause.offset.unwrap_or(0) as usize;
230 let end = start + limit_clause.count as usize;
231 let limited_rows = rows.into_iter()
232 .skip(start)
233 .take(limit_clause.count as usize)
234 .collect();
235
236 Ok(ExecutionResult::Rows(limited_rows))
237 }
238 _ => Ok(input),
239 }
240 }
241
242 async fn execute_return(
243 &self,
244 return_plan: ReturnPlan,
245 input: ExecutionResult,
246 ) -> Result<QueryResult> {
247 let mut results = Vec::new();
248
249 match input {
250 ExecutionResult::Rows(rows) => {
251 for row in rows {
252 let mut result_row = Vec::new();
253
254 for item in &return_plan.items {
255 let value = self.evaluate_expression(&item.expression, &row).await?;
256 result_row.push(value);
257 }
258
259 results.push(result_row);
260 }
261 }
262 ExecutionResult::Grouped(groups) => {
263 for (_key, rows) in groups {
265 if let Some(row) = rows.first() {
267 let mut result_row = Vec::new();
268 for item in &return_plan.items {
269 let value = self.evaluate_expression(&item.expression, row).await?;
270 result_row.push(value);
271 }
272 results.push(result_row);
273 }
274 }
275 }
276 ExecutionResult::Empty => {}
277 }
278
279 if return_plan.distinct {
281 }
283
284 let rows_returned = results.len() as u64;
285 Ok(QueryResult {
286 columns: return_plan.items.iter()
287 .map(|item| item.alias.clone().unwrap_or_else(|| "column".to_string()))
288 .collect(),
289 rows: results,
290 statistics: crate::QueryStatistics {
291 total_time_ms: 0,
292 planning_time_ms: 0,
293 execution_time_ms: 0,
294 rows_scanned: 0,
295 rows_returned,
296 indices_used: vec![],
297 },
298 })
299 }
300
301 async fn evaluate_expression(&self, _expression: &ValueExpression, _row: &[serde_json::Value]) -> Result<serde_json::Value> {
302 Ok(serde_json::Value::String("placeholder".to_string()))
305 }
306}
307
308pub struct StatementExecutor<T: KeyValueStore> {
310 storage: Arc<T>,
311}
312
313impl<T: KeyValueStore + 'static> StatementExecutor<T> {
314 pub fn new(storage: Arc<T>) -> Self {
315 Self { storage }
316 }
317
318 pub async fn execute(
319 &self,
320 _statement: GqlStatement,
321 _context: crate::QueryContext,
322 ) -> Result<StatementResult> {
323 Ok(StatementResult {
325 success: true,
326 message: "Statement executed successfully".to_string(),
327 affected_rows: None,
328 execution_time_ms: 0,
329 })
330 }
331}
332
333#[derive(Debug, Clone)]
335pub enum ExecutionResult {
336 Empty,
337 Rows(Vec<Vec<serde_json::Value>>),
338 Grouped(std::collections::HashMap<String, Vec<Vec<serde_json::Value>>>),
339}
340
341
342impl From<ExecutionResult> for QueryResult {
343 fn from(result: ExecutionResult) -> Self {
344 match result {
345 ExecutionResult::Rows(rows) => {
346 let rows_returned = rows.len() as u64;
347 QueryResult {
348 columns: vec!["result".to_string()], rows,
350 statistics: crate::QueryStatistics {
351 total_time_ms: 0,
352 planning_time_ms: 0,
353 execution_time_ms: 0,
354 rows_scanned: 0,
355 rows_returned,
356 indices_used: vec![],
357 },
358 }
359 },
360 _ => QueryResult {
361 columns: Vec::new(),
362 rows: Vec::new(),
363 statistics: crate::QueryStatistics {
364 total_time_ms: 0,
365 planning_time_ms: 0,
366 execution_time_ms: 0,
367 rows_scanned: 0,
368 rows_returned: 0u64,
369 indices_used: vec![],
370 },
371 },
372 }
373 }
374}
375
376#[cfg(test)]
377mod tests {
378 use super::*;
379
380 #[tokio::test]
381 async fn test_query_executor_creation() {
382 }
385}