1use kotoba_core::ir::*;
4use kotoba_core::types::*;
5use kotoba_graph::prelude::*;
6use kotoba_errors::KotobaError;
7use std::collections::HashSet;
8
9type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
11use crate::planner::{PhysicalPlanner, PhysicalPlan, PhysicalOp};
12use std::collections::HashMap;
15use std::sync::Arc;
16use tokio::sync::RwLock;
17use crate::planner::logical::LogicalPlanner;
18use crate::planner::optimizer::QueryOptimizer;
19
20#[derive(Debug)]
22pub struct QueryExecutor {
23 logical_planner: LogicalPlanner,
24 physical_planner: PhysicalPlanner,
25 optimizer: QueryOptimizer,
26 }
28
29impl QueryExecutor {
30 pub fn new() -> Self {
31 Self {
32 logical_planner: LogicalPlanner::new(),
33 physical_planner: PhysicalPlanner::new(),
34 optimizer: QueryOptimizer::new(),
35 }
37 }
38
39 pub fn execute_gql(&self, gql: &str, graph: &GraphRef, catalog: &Catalog) -> Result<RowStream> {
47 self.execute_gql_local(gql, graph, catalog)
49 }
50
51 fn execute_gql_local(&self, gql: &str, graph: &GraphRef, catalog: &Catalog) -> Result<RowStream> {
53 let mut logical_plan = self.logical_planner.parse_gql(gql)?;
55
56 logical_plan = self.logical_planner.optimize(&logical_plan, catalog);
58
59 logical_plan = self.optimizer.optimize(&logical_plan, catalog);
61
62 let physical_plan = self.physical_planner.plan_to_physical(&logical_plan, catalog)?;
64
65 self.execute_physical_plan(&physical_plan, graph, catalog)
67 }
68
69 fn convert_distributed_result_to_row_stream(&self, _dist_result: ()) -> Result<RowStream> {
71 Ok(vec![])
73 }
74
75 pub fn execute_plan(&self, plan: &PlanIR, graph: &GraphRef, catalog: &Catalog) -> Result<RowStream> {
77 let physical_plan = self.physical_planner.plan_to_physical(plan, catalog)?;
79
80 self.execute_physical_plan(&physical_plan, graph, catalog)
82 }
83
84 pub fn execute_physical_plan(&self, plan: &PhysicalPlan, graph: &GraphRef, catalog: &Catalog) -> Result<RowStream> {
86 match &plan.op {
87 PhysicalOp::NodeScan { label, as_, props } => {
88 self.execute_node_scan(graph, label, as_, props.as_ref())
89 }
90 PhysicalOp::IndexScan { label, as_, index, value } => {
91 self.execute_index_scan(graph, label, as_, index, value)
92 }
93 PhysicalOp::Filter { pred, input } => {
94 let input_rows = self.execute_physical_plan(
95 &PhysicalPlan { op: *input.clone(), estimated_cost: 0.0 },
96 graph, catalog
97 )?;
98 self.execute_filter(input_rows, pred)
99 }
100 PhysicalOp::Expand { edge, to_as, input } => {
101 let input_rows = self.execute_physical_plan(
102 &PhysicalPlan { op: *input.clone(), estimated_cost: 0.0 },
103 graph, catalog
104 )?;
105 self.execute_expand(graph, input_rows, edge, to_as)
106 }
107 PhysicalOp::NestedLoopJoin { left, right, on } => {
108 let left_rows = self.execute_physical_plan(
109 &PhysicalPlan { op: *left.clone(), estimated_cost: 0.0 },
110 graph, catalog
111 )?;
112 let right_rows = self.execute_physical_plan(
113 &PhysicalPlan { op: *right.clone(), estimated_cost: 0.0 },
114 graph, catalog
115 )?;
116 self.execute_nested_loop_join(left_rows, right_rows, on)
117 }
118 PhysicalOp::HashJoin { left, right, on } => {
119 let left_rows = self.execute_physical_plan(
120 &PhysicalPlan { op: *left.clone(), estimated_cost: 0.0 },
121 graph, catalog
122 )?;
123 let right_rows = self.execute_physical_plan(
124 &PhysicalPlan { op: *right.clone(), estimated_cost: 0.0 },
125 graph, catalog
126 )?;
127 self.execute_hash_join(left_rows, right_rows, on)
128 }
129 PhysicalOp::Project { cols, input } => {
130 let input_rows = self.execute_physical_plan(
131 &PhysicalPlan { op: *input.clone(), estimated_cost: 0.0 },
132 graph, catalog
133 )?;
134 self.execute_project(input_rows, cols)
135 }
136 PhysicalOp::Limit { count, input } => {
137 let input_rows = self.execute_physical_plan(
138 &PhysicalPlan { op: *input.clone(), estimated_cost: 0.0 },
139 graph, catalog
140 )?;
141 Ok(input_rows.into_iter().take(*count).collect())
142 }
143 PhysicalOp::Distinct { input } => {
144 let input_rows = self.execute_physical_plan(
145 &PhysicalPlan { op: *input.clone(), estimated_cost: 0.0 },
146 graph, catalog
147 )?;
148 self.execute_distinct(input_rows)
149 }
150 PhysicalOp::Sort { keys, input } => {
151 let mut input_rows = self.execute_physical_plan(
152 &PhysicalPlan { op: *input.clone(), estimated_cost: 0.0 },
153 graph, catalog
154 )?;
155 self.execute_sort(&mut input_rows, keys);
156 Ok(input_rows)
157 }
158 PhysicalOp::Group { keys, aggregations, input } => {
159 let input_rows = self.execute_physical_plan(
160 &PhysicalPlan { op: *input.clone(), estimated_cost: 0.0 },
161 graph, catalog
162 )?;
163 self.execute_group(input_rows, keys, aggregations)
164 }
165 }
166 }
167
168 fn execute_node_scan(&self, graph: &GraphRef, label: &Label, as_: &str, props: Option<&Properties>) -> Result<RowStream> {
170 let graph = graph.read();
171 let mut rows = Vec::new();
172
173 let vertex_ids = if let Some(props) = props {
174 graph.vertices.values()
176 .filter(|v| v.labels.contains(label))
177 .filter(|v| self.matches_properties(&v.props, props))
178 .map(|v| v.id)
179 .collect::<Vec<_>>()
180 } else {
181 graph.vertices_by_label(label).into_iter().collect::<Vec<_>>()
182 };
183
184 for vertex_id in vertex_ids {
185 if let Some(_vertex) = graph.get_vertex(&vertex_id) {
186 let mut row = HashMap::new();
187 row.insert(as_.to_string(), Value::String(vertex_id.to_string()));
188 rows.push(Row { values: row });
189 }
190 }
191
192 Ok(rows)
193 }
194
195 fn execute_index_scan(&self, graph: &GraphRef, label: &Label, as_: &str, _index: &str, _value: &Value) -> Result<RowStream> {
197 self.execute_node_scan(graph, label, as_, None)
199 }
200
201 fn execute_filter(&self, input_rows: RowStream, pred: &Predicate) -> Result<RowStream> {
203 let mut result = Vec::new();
204
205 for row in input_rows {
206 if self.evaluate_predicate(&row, pred)? {
207 result.push(row);
208 }
209 }
210
211 Ok(result)
212 }
213
214 fn execute_expand(&self, graph: &GraphRef, input_rows: RowStream, edge: &EdgePattern, to_as: &str) -> Result<RowStream> {
216 let graph = graph.read();
217 let mut result = Vec::new();
218
219 for row in input_rows {
220 for value in row.values.values() {
222 if let Value::String(vertex_id_str) = value {
223 if let Ok(vertex_id) = vertex_id_str.parse::<uuid::Uuid>() {
224 if let Some(vertex_id) = graph.vertices.get_key_value(&vertex_id.into()).map(|(id, _)| *id) {
225 let neighbors = match edge.dir {
226 Direction::Out => graph.adj_out.get(&vertex_id).cloned(),
227 Direction::In => graph.adj_in.get(&vertex_id).cloned(),
228 Direction::Both => {
229 let mut all_neighbors = HashSet::new();
231 if let Some(out) = graph.adj_out.get(&vertex_id) {
232 all_neighbors.extend(out);
233 }
234 if let Some(in_) = graph.adj_in.get(&vertex_id) {
235 all_neighbors.extend(in_);
236 }
237 Some(all_neighbors)
238 }
239 };
240
241 if let Some(neighbors) = neighbors {
242 for &neighbor_id in &neighbors {
243 let mut new_row = row.clone();
244 new_row.values.insert(to_as.to_string(), Value::String(neighbor_id.to_string()));
245 result.push(Row { values: new_row.values });
246 }
247 }
248 }
249 }
250 }
251 }
252 }
253
254 Ok(result)
255 }
256
257 fn execute_nested_loop_join(&self, left_rows: RowStream, right_rows: RowStream, on: &[String]) -> Result<RowStream> {
259 let mut result = Vec::new();
260
261 for left_row in &left_rows {
262 for right_row in &right_rows {
263 if self.join_condition_matches(left_row, right_row, on) {
264 let mut combined = left_row.values.clone();
265 combined.extend(right_row.values.clone());
266 result.push(Row { values: combined });
267 }
268 }
269 }
270
271 Ok(result)
272 }
273
274 fn execute_hash_join(&self, left_rows: RowStream, right_rows: RowStream, on: &[String]) -> Result<RowStream> {
276 let mut hash_table = HashMap::new();
277 let mut result = Vec::new();
278
279 for row in right_rows {
281 let key = self.extract_join_key(&row, on);
282 hash_table.entry(key).or_insert(Vec::new()).push(row);
283 }
284
285 for left_row in left_rows {
287 let key = self.extract_join_key(&left_row, on);
288 if let Some(right_rows) = hash_table.get(&key) {
289 for right_row in right_rows {
290 let mut combined = left_row.values.clone();
291 combined.extend(right_row.values.clone());
292 result.push(Row { values: combined });
293 }
294 }
295 }
296
297 Ok(result)
298 }
299
300 fn execute_project(&self, input_rows: RowStream, cols: &[String]) -> Result<RowStream> {
302 let mut result = Vec::new();
303
304 for row in input_rows {
305 let mut projected = HashMap::new();
306 for col in cols {
307 if let Some(value) = row.values.get(col) {
308 projected.insert(col.clone(), value.clone());
309 }
310 }
311 result.push(Row { values: projected });
312 }
313
314 Ok(result)
315 }
316
317 fn execute_distinct(&self, input_rows: RowStream) -> Result<RowStream> {
319 let mut seen = HashSet::new();
320 let mut result = Vec::new();
321
322 for row in input_rows {
323 let key = format!("{:?}", row.values);
324 if seen.insert(key) {
325 result.push(row);
326 }
327 }
328
329 Ok(result)
330 }
331
332 fn execute_sort(&self, rows: &mut RowStream, keys: &[SortKey]) {
334 rows.sort_by(|a, b| {
335 for key in keys {
336 let a_val = a.values.get(&key.expr.to_string());
337 let b_val = b.values.get(&key.expr.to_string());
338
339 match (a_val, b_val) {
340 (Some(Value::Int(x)), Some(Value::Int(y))) => {
341 let cmp = x.cmp(y);
342 if cmp != std::cmp::Ordering::Equal {
343 return if key.asc { cmp } else { cmp.reverse() };
344 }
345 }
346 (Some(Value::String(x)), Some(Value::String(y))) => {
347 let cmp = x.cmp(y);
348 if cmp != std::cmp::Ordering::Equal {
349 return if key.asc { cmp } else { cmp.reverse() };
350 }
351 }
352 _ => {}
353 }
354 }
355 std::cmp::Ordering::Equal
356 });
357 }
358
359 fn execute_group(&self, input_rows: RowStream, keys: &[String], aggregations: &[Aggregation]) -> Result<RowStream> {
361 let mut groups: HashMap<String, Vec<Row>> = HashMap::new();
362
363 for row in input_rows {
365 let group_key = self.extract_group_key(&row, keys);
366 groups.entry(group_key).or_insert(Vec::new()).push(row);
367 }
368
369 let mut result = Vec::new();
371 for (group_key, group_rows) in groups {
372 let mut aggregated = HashMap::new();
373
374 let key_parts: Vec<&str> = group_key.split('|').collect();
376 for (i, key) in keys.iter().enumerate() {
377 if let Some(&key_part) = key_parts.get(i) {
378 aggregated.insert(key.clone(), Value::String(key_part.to_string()));
380 }
381 }
382
383 for agg in aggregations {
385 let value = self.compute_aggregation(&group_rows, agg);
386 aggregated.insert(agg.as_.clone(), value);
387 }
388
389 result.push(Row { values: aggregated });
390 }
391
392 Ok(result)
393 }
394
395 fn matches_properties(&self, vertex_props: &Properties, filter_props: &Properties) -> bool {
397 for (key, expected_value) in filter_props {
398 if let Some(actual_value) = vertex_props.get(key) {
399 if !self.values_match(actual_value, expected_value) {
400 return false;
401 }
402 } else {
403 return false;
404 }
405 }
406 true
407 }
408
409 fn values_match(&self, a: &Value, b: &Value) -> bool {
411 match (a, b) {
412 (Value::Null, Value::Null) => true,
413 (Value::Bool(x), Value::Bool(y)) => x == y,
414 (Value::Int(x), Value::Int(y)) => x == y,
415 (Value::String(x), Value::String(y)) => x == y,
416 _ => false,
417 }
418 }
419
420 fn evaluate_predicate(&self, row: &Row, pred: &Predicate) -> Result<bool> {
422 match pred {
423 Predicate::Eq { eq } if eq.len() == 2 => {
424 let left = self.evaluate_expr(row, &eq[0])?;
425 let right = self.evaluate_expr(row, &eq[1])?;
426 Ok(self.values_match(&left, &right))
427 }
428 Predicate::And { and } => {
429 for p in and {
430 if !self.evaluate_predicate(row, p)? {
431 return Ok(false);
432 }
433 }
434 Ok(true)
435 }
436 Predicate::Or { or } => {
437 for p in or {
438 if self.evaluate_predicate(row, p)? {
439 return Ok(true);
440 }
441 }
442 Ok(false)
443 }
444 _ => Ok(true), }
446 }
447
448 fn evaluate_expr(&self, row: &Row, expr: &Expr) -> Result<Value> {
450 match expr {
451 Expr::Var(var) => {
452 row.values.get(var)
453 .cloned()
454 .ok_or_else(|| Box::new(KotobaError::Execution(format!("Variable {} not found", var))) as Box<dyn std::error::Error + Send + Sync>)
455 }
456 Expr::Const(val) => Ok(val.clone()),
457 Expr::Fn { fn_: name, args } => {
458 if name.starts_with("algorithm_") {
460 return self.evaluate_algorithm_function(&name[10..], args, row);
461 }
462
463 match name.as_str() {
465 "degree" => {
466 Ok(Value::Int(1))
468 }
469 "property" => {
470 if args.len() >= 2 {
472 if let (Expr::Var(var), Expr::Const(Value::String(prop))) = (&args[0], &args[1]) {
473 if let Some(Value::String(vertex_id_str)) = row.values.get(var) {
474 Ok(Value::String(format!("{}.{}", vertex_id_str, prop)))
477 } else {
478 Ok(Value::Null)
479 }
480 } else {
481 Ok(Value::Null)
482 }
483 } else {
484 Ok(Value::Null)
485 }
486 }
487 _ => Ok(Value::Null),
488 }
489 }
490 }
491 }
492
493 fn evaluate_algorithm_function(&self, algorithm_name: &str, args: &[Expr], row: &Row) -> Result<Value> {
495 match algorithm_name {
496 "dijkstra" | "shortest_path" => {
497 if args.len() >= 2 {
499 if let (Expr::Var(source_var), Expr::Var(target_var)) = (&args[0], &args[1]) {
500 if let (Some(Value::String(source_id)), Some(Value::String(target_id))) =
501 (row.values.get(source_var), row.values.get(target_var)) {
502
503 Ok(Value::Int(5)) } else {
507 Ok(Value::Null)
508 }
509 } else {
510 Ok(Value::Null)
511 }
512 } else {
513 Ok(Value::Null)
514 }
515 }
516 "degree_centrality" => {
517 if args.len() >= 1 {
519 if let Expr::Var(var) = &args[0] {
520 if let Some(Value::String(_vertex_id)) = row.values.get(var) {
521 Ok(Value::Int(3)) } else {
524 Ok(Value::Null)
525 }
526 } else {
527 Ok(Value::Null)
528 }
529 } else {
530 Ok(Value::Null)
531 }
532 }
533 "betweenness_centrality" => {
534 if args.len() >= 1 {
536 if let Expr::Var(var) = &args[0] {
537 if let Some(Value::String(_vertex_id)) = row.values.get(var) {
538 Ok(Value::Int(10)) } else {
540 Ok(Value::Null)
541 }
542 } else {
543 Ok(Value::Null)
544 }
545 } else {
546 Ok(Value::Null)
547 }
548 }
549 "closeness_centrality" => {
550 if args.len() >= 1 {
552 if let Expr::Var(var) = &args[0] {
553 if let Some(Value::String(_vertex_id)) = row.values.get(var) {
554 Ok(Value::Int(8)) } else {
556 Ok(Value::Null)
557 }
558 } else {
559 Ok(Value::Null)
560 }
561 } else {
562 Ok(Value::Null)
563 }
564 }
565 "pagerank" => {
566 if args.len() >= 1 {
568 if let Expr::Var(var) = &args[0] {
569 if let Some(Value::String(_vertex_id)) = row.values.get(var) {
570 Ok(Value::Int(15)) } else {
572 Ok(Value::Null)
573 }
574 } else {
575 Ok(Value::Null)
576 }
577 } else {
578 Ok(Value::Null)
579 }
580 }
581 "pattern_matching" => {
582 Ok(Value::Int(2)) }
585 _ => Ok(Value::Null),
586 }
587 }
588
589 fn join_condition_matches(&self, left: &Row, right: &Row, on: &[String]) -> bool {
591 for key in on {
592 let left_val = left.values.get(key);
593 let right_val = right.values.get(key);
594
595 match (left_val, right_val) {
596 (Some(a), Some(b)) => {
597 if !self.values_match(a, b) {
598 return false;
599 }
600 }
601 _ => return false,
602 }
603 }
604 true
605 }
606
607 fn extract_join_key(&self, row: &Row, on: &[String]) -> String {
609 let mut key_parts = Vec::new();
610 for col in on {
611 if let Some(value) = row.values.get(col) {
612 key_parts.push(format!("{:?}", value));
613 }
614 }
615 key_parts.join("|")
616 }
617
618 fn extract_group_key(&self, row: &Row, keys: &[String]) -> String {
620 let mut key_parts = Vec::new();
621 for key in keys {
622 if let Some(value) = row.values.get(key) {
623 key_parts.push(format!("{:?}", value));
624 }
625 }
626 key_parts.join("|")
627 }
628
629 fn compute_aggregation(&self, rows: &[Row], agg: &Aggregation) -> Value {
631 match agg.fn_.as_str() {
632 "count" => Value::Int(rows.len() as i64),
633 "sum" => {
634 let mut sum = 0i64;
635 for row in rows {
636 if let Some(Value::Int(val)) = row.values.get(&agg.args[0]) {
637 sum += val;
638 }
639 }
640 Value::Int(sum)
641 }
642 "avg" => {
643 if rows.is_empty() {
644 Value::Int(0)
645 } else {
646 let mut sum = 0i64;
647 let mut count = 0;
648 for row in rows {
649 if let Some(Value::Int(val)) = row.values.get(&agg.args[0]) {
650 sum += val;
651 count += 1;
652 }
653 }
654 if count > 0 {
655 Value::Int(sum / count)
656 } else {
657 Value::Int(0)
658 }
659 }
660 }
661 _ => Value::Null,
662 }
663 }
664}