kotoba_execution/execution/
executor.rs

1//! クエリ実行器
2
3use kotoba_core::ir::*;
4use kotoba_core::types::*;
5use kotoba_graph::prelude::*;
6use kotoba_errors::KotobaError;
7use std::collections::HashSet;
8
9// Use std::result::Result instead of kotoba_core::types::Result to avoid conflicts
10type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
11use crate::planner::{PhysicalPlanner, PhysicalPlan, PhysicalOp};
12// use kotoba_distributed::DistributedEngine; // DistributedEngine not available
13// use kotoba_core::CidManager; // CidManager not available in kotoba_core
14use std::collections::HashMap;
15use std::sync::Arc;
16use tokio::sync::RwLock;
17use crate::planner::logical::LogicalPlanner;
18use crate::planner::optimizer::QueryOptimizer;
19
20/// クエリ実行器
21#[derive(Debug)]
22pub struct QueryExecutor {
23    logical_planner: LogicalPlanner,
24    physical_planner: PhysicalPlanner,
25    optimizer: QueryOptimizer,
26    // distributed_engine: Option<Arc<tokio::sync::RwLock<DistributedEngine>>>, // Distributed execution not supported
27}
28
29impl QueryExecutor {
30    pub fn new() -> Self {
31        Self {
32            logical_planner: LogicalPlanner::new(),
33            physical_planner: PhysicalPlanner::new(),
34            optimizer: QueryOptimizer::new(),
35            // distributed_engine: None, // Distributed execution not supported
36        }
37    }
38
39    /// 分散実行エンジンを設定(現在未サポート)
40    // pub fn with_distributed_engine(mut self, engine: Arc<tokio::sync::RwLock<DistributedEngine>>) -> Self {
41    //     self.distributed_engine = Some(engine);
42    //     self
43    // }
44
45    /// GQLクエリを実行
46    pub fn execute_gql(&self, gql: &str, graph: &GraphRef, catalog: &Catalog) -> Result<RowStream> {
47        // ローカル実行(分散実行は現在サポートされていない)
48        self.execute_gql_local(gql, graph, catalog)
49    }
50
51    /// ローカルGQLクエリ実行
52    fn execute_gql_local(&self, gql: &str, graph: &GraphRef, catalog: &Catalog) -> Result<RowStream> {
53        // GQL → 論理プラン
54        let mut logical_plan = self.logical_planner.parse_gql(gql)?;
55
56        // 論理最適化
57        logical_plan = self.logical_planner.optimize(&logical_plan, catalog);
58
59        // クエリ最適化
60        logical_plan = self.optimizer.optimize(&logical_plan, catalog);
61
62        // 論理プラン → 物理プラン
63        let physical_plan = self.physical_planner.plan_to_physical(&logical_plan, catalog)?;
64
65        // 物理プラン実行
66        self.execute_physical_plan(&physical_plan, graph, catalog)
67    }
68
69    /// 分散実行結果をRowStreamに変換
70    fn convert_distributed_result_to_row_stream(&self, _dist_result: ()) -> Result<RowStream> {
71        // 簡易実装: 分散実行は現在サポートされていないので空の結果を返す
72        Ok(vec![])
73    }
74
75    /// 論理プランを実行
76    pub fn execute_plan(&self, plan: &PlanIR, graph: &GraphRef, catalog: &Catalog) -> Result<RowStream> {
77        // 論理プラン → 物理プラン
78        let physical_plan = self.physical_planner.plan_to_physical(plan, catalog)?;
79
80        // 物理プラン実行
81        self.execute_physical_plan(&physical_plan, graph, catalog)
82    }
83
84    /// 物理プランを実行
85    pub fn execute_physical_plan(&self, plan: &PhysicalPlan, graph: &GraphRef, catalog: &Catalog) -> Result<RowStream> {
86        match &plan.op {
87            PhysicalOp::NodeScan { label, as_, props } => {
88                self.execute_node_scan(graph, label, as_, props.as_ref())
89            }
90            PhysicalOp::IndexScan { label, as_, index, value } => {
91                self.execute_index_scan(graph, label, as_, index, value)
92            }
93            PhysicalOp::Filter { pred, input } => {
94                let input_rows = self.execute_physical_plan(
95                    &PhysicalPlan { op: *input.clone(), estimated_cost: 0.0 },
96                    graph, catalog
97                )?;
98                self.execute_filter(input_rows, pred)
99            }
100            PhysicalOp::Expand { edge, to_as, input } => {
101                let input_rows = self.execute_physical_plan(
102                    &PhysicalPlan { op: *input.clone(), estimated_cost: 0.0 },
103                    graph, catalog
104                )?;
105                self.execute_expand(graph, input_rows, edge, to_as)
106            }
107            PhysicalOp::NestedLoopJoin { left, right, on } => {
108                let left_rows = self.execute_physical_plan(
109                    &PhysicalPlan { op: *left.clone(), estimated_cost: 0.0 },
110                    graph, catalog
111                )?;
112                let right_rows = self.execute_physical_plan(
113                    &PhysicalPlan { op: *right.clone(), estimated_cost: 0.0 },
114                    graph, catalog
115                )?;
116                self.execute_nested_loop_join(left_rows, right_rows, on)
117            }
118            PhysicalOp::HashJoin { left, right, on } => {
119                let left_rows = self.execute_physical_plan(
120                    &PhysicalPlan { op: *left.clone(), estimated_cost: 0.0 },
121                    graph, catalog
122                )?;
123                let right_rows = self.execute_physical_plan(
124                    &PhysicalPlan { op: *right.clone(), estimated_cost: 0.0 },
125                    graph, catalog
126                )?;
127                self.execute_hash_join(left_rows, right_rows, on)
128            }
129            PhysicalOp::Project { cols, input } => {
130                let input_rows = self.execute_physical_plan(
131                    &PhysicalPlan { op: *input.clone(), estimated_cost: 0.0 },
132                    graph, catalog
133                )?;
134                self.execute_project(input_rows, cols)
135            }
136            PhysicalOp::Limit { count, input } => {
137                let input_rows = self.execute_physical_plan(
138                    &PhysicalPlan { op: *input.clone(), estimated_cost: 0.0 },
139                    graph, catalog
140                )?;
141                Ok(input_rows.into_iter().take(*count).collect())
142            }
143            PhysicalOp::Distinct { input } => {
144                let input_rows = self.execute_physical_plan(
145                    &PhysicalPlan { op: *input.clone(), estimated_cost: 0.0 },
146                    graph, catalog
147                )?;
148                self.execute_distinct(input_rows)
149            }
150            PhysicalOp::Sort { keys, input } => {
151                let mut input_rows = self.execute_physical_plan(
152                    &PhysicalPlan { op: *input.clone(), estimated_cost: 0.0 },
153                    graph, catalog
154                )?;
155                self.execute_sort(&mut input_rows, keys);
156                Ok(input_rows)
157            }
158            PhysicalOp::Group { keys, aggregations, input } => {
159                let input_rows = self.execute_physical_plan(
160                    &PhysicalPlan { op: *input.clone(), estimated_cost: 0.0 },
161                    graph, catalog
162                )?;
163                self.execute_group(input_rows, keys, aggregations)
164            }
165        }
166    }
167
168    /// ノードスキャン実行
169    fn execute_node_scan(&self, graph: &GraphRef, label: &Label, as_: &str, props: Option<&Properties>) -> Result<RowStream> {
170        let graph = graph.read();
171        let mut rows = Vec::new();
172
173        let vertex_ids = if let Some(props) = props {
174            // プロパティフィルタ付きスキャン(簡易版)
175            graph.vertices.values()
176                .filter(|v| v.labels.contains(label))
177                .filter(|v| self.matches_properties(&v.props, props))
178                .map(|v| v.id)
179                .collect::<Vec<_>>()
180        } else {
181            graph.vertices_by_label(label).into_iter().collect::<Vec<_>>()
182        };
183
184        for vertex_id in vertex_ids {
185            if let Some(_vertex) = graph.get_vertex(&vertex_id) {
186                let mut row = HashMap::new();
187                row.insert(as_.to_string(), Value::String(vertex_id.to_string()));
188                rows.push(Row { values: row });
189            }
190        }
191
192        Ok(rows)
193    }
194
195    /// インデックススキャン実行
196    fn execute_index_scan(&self, graph: &GraphRef, label: &Label, as_: &str, _index: &str, _value: &Value) -> Result<RowStream> {
197        // 簡易的なインデックススキャン(実際の実装ではインデックスを使用)
198        self.execute_node_scan(graph, label, as_, None)
199    }
200
201    /// フィルタ実行
202    fn execute_filter(&self, input_rows: RowStream, pred: &Predicate) -> Result<RowStream> {
203        let mut result = Vec::new();
204
205        for row in input_rows {
206            if self.evaluate_predicate(&row, pred)? {
207                result.push(row);
208            }
209        }
210
211        Ok(result)
212    }
213
214    /// エッジ展開実行
215    fn execute_expand(&self, graph: &GraphRef, input_rows: RowStream, edge: &EdgePattern, to_as: &str) -> Result<RowStream> {
216        let graph = graph.read();
217        let mut result = Vec::new();
218
219        for row in input_rows {
220            // ソース頂点を取得(簡易版)
221            for value in row.values.values() {
222                if let Value::String(vertex_id_str) = value {
223                    if let Ok(vertex_id) = vertex_id_str.parse::<uuid::Uuid>() {
224                        if let Some(vertex_id) = graph.vertices.get_key_value(&vertex_id.into()).map(|(id, _)| *id) {
225                            let neighbors = match edge.dir {
226                                Direction::Out => graph.adj_out.get(&vertex_id).cloned(),
227                                Direction::In => graph.adj_in.get(&vertex_id).cloned(),
228                                Direction::Both => {
229                                    // 双方向の場合、outとinをマージ
230                                    let mut all_neighbors = HashSet::new();
231                                    if let Some(out) = graph.adj_out.get(&vertex_id) {
232                                        all_neighbors.extend(out);
233                                    }
234                                    if let Some(in_) = graph.adj_in.get(&vertex_id) {
235                                        all_neighbors.extend(in_);
236                                    }
237                                    Some(all_neighbors)
238                                }
239                            };
240
241                            if let Some(neighbors) = neighbors {
242                                for &neighbor_id in &neighbors {
243                                    let mut new_row = row.clone();
244                                    new_row.values.insert(to_as.to_string(), Value::String(neighbor_id.to_string()));
245                                    result.push(Row { values: new_row.values });
246                                }
247                            }
248                        }
249                    }
250                }
251            }
252        }
253
254        Ok(result)
255    }
256
257    /// ネステッドループ結合実行
258    fn execute_nested_loop_join(&self, left_rows: RowStream, right_rows: RowStream, on: &[String]) -> Result<RowStream> {
259        let mut result = Vec::new();
260
261        for left_row in &left_rows {
262            for right_row in &right_rows {
263                if self.join_condition_matches(left_row, right_row, on) {
264                    let mut combined = left_row.values.clone();
265                    combined.extend(right_row.values.clone());
266                    result.push(Row { values: combined });
267                }
268            }
269        }
270
271        Ok(result)
272    }
273
274    /// ハッシュ結合実行
275    fn execute_hash_join(&self, left_rows: RowStream, right_rows: RowStream, on: &[String]) -> Result<RowStream> {
276        let mut hash_table = HashMap::new();
277        let mut result = Vec::new();
278
279        // 右側をハッシュ化
280        for row in right_rows {
281            let key = self.extract_join_key(&row, on);
282            hash_table.entry(key).or_insert(Vec::new()).push(row);
283        }
284
285        // 左側をプローブ
286        for left_row in left_rows {
287            let key = self.extract_join_key(&left_row, on);
288            if let Some(right_rows) = hash_table.get(&key) {
289                for right_row in right_rows {
290                    let mut combined = left_row.values.clone();
291                    combined.extend(right_row.values.clone());
292                    result.push(Row { values: combined });
293                }
294            }
295        }
296
297        Ok(result)
298    }
299
300    /// 射影実行
301    fn execute_project(&self, input_rows: RowStream, cols: &[String]) -> Result<RowStream> {
302        let mut result = Vec::new();
303
304        for row in input_rows {
305            let mut projected = HashMap::new();
306            for col in cols {
307                if let Some(value) = row.values.get(col) {
308                    projected.insert(col.clone(), value.clone());
309                }
310            }
311            result.push(Row { values: projected });
312        }
313
314        Ok(result)
315    }
316
317    /// 重複除去実行
318    fn execute_distinct(&self, input_rows: RowStream) -> Result<RowStream> {
319        let mut seen = HashSet::new();
320        let mut result = Vec::new();
321
322        for row in input_rows {
323            let key = format!("{:?}", row.values);
324            if seen.insert(key) {
325                result.push(row);
326            }
327        }
328
329        Ok(result)
330    }
331
332    /// ソート実行
333    fn execute_sort(&self, rows: &mut RowStream, keys: &[SortKey]) {
334        rows.sort_by(|a, b| {
335            for key in keys {
336                let a_val = a.values.get(&key.expr.to_string());
337                let b_val = b.values.get(&key.expr.to_string());
338
339                match (a_val, b_val) {
340                    (Some(Value::Int(x)), Some(Value::Int(y))) => {
341                        let cmp = x.cmp(y);
342                        if cmp != std::cmp::Ordering::Equal {
343                            return if key.asc { cmp } else { cmp.reverse() };
344                        }
345                    }
346                    (Some(Value::String(x)), Some(Value::String(y))) => {
347                        let cmp = x.cmp(y);
348                        if cmp != std::cmp::Ordering::Equal {
349                            return if key.asc { cmp } else { cmp.reverse() };
350                        }
351                    }
352                    _ => {}
353                }
354            }
355            std::cmp::Ordering::Equal
356        });
357    }
358
359    /// グループ化実行
360    fn execute_group(&self, input_rows: RowStream, keys: &[String], aggregations: &[Aggregation]) -> Result<RowStream> {
361        let mut groups: HashMap<String, Vec<Row>> = HashMap::new();
362
363        // グループ化
364        for row in input_rows {
365            let group_key = self.extract_group_key(&row, keys);
366            groups.entry(group_key).or_insert(Vec::new()).push(row);
367        }
368
369        // 集計
370        let mut result = Vec::new();
371        for (group_key, group_rows) in groups {
372            let mut aggregated = HashMap::new();
373
374            // グループキーを設定
375            let key_parts: Vec<&str> = group_key.split('|').collect();
376            for (i, key) in keys.iter().enumerate() {
377                if let Some(&key_part) = key_parts.get(i) {
378                    // 簡易的に文字列として扱う
379                    aggregated.insert(key.clone(), Value::String(key_part.to_string()));
380                }
381            }
382
383            // 集計関数を適用
384            for agg in aggregations {
385                let value = self.compute_aggregation(&group_rows, agg);
386                aggregated.insert(agg.as_.clone(), value);
387            }
388
389            result.push(Row { values: aggregated });
390        }
391
392        Ok(result)
393    }
394
395    /// プロパティマッチング
396    fn matches_properties(&self, vertex_props: &Properties, filter_props: &Properties) -> bool {
397        for (key, expected_value) in filter_props {
398            if let Some(actual_value) = vertex_props.get(key) {
399                if !self.values_match(actual_value, expected_value) {
400                    return false;
401                }
402            } else {
403                return false;
404            }
405        }
406        true
407    }
408
409    /// 値マッチング
410    fn values_match(&self, a: &Value, b: &Value) -> bool {
411        match (a, b) {
412            (Value::Null, Value::Null) => true,
413            (Value::Bool(x), Value::Bool(y)) => x == y,
414            (Value::Int(x), Value::Int(y)) => x == y,
415            (Value::String(x), Value::String(y)) => x == y,
416            _ => false,
417        }
418    }
419
420    /// 述語評価
421    fn evaluate_predicate(&self, row: &Row, pred: &Predicate) -> Result<bool> {
422        match pred {
423            Predicate::Eq { eq } if eq.len() == 2 => {
424                let left = self.evaluate_expr(row, &eq[0])?;
425                let right = self.evaluate_expr(row, &eq[1])?;
426                Ok(self.values_match(&left, &right))
427            }
428            Predicate::And { and } => {
429                for p in and {
430                    if !self.evaluate_predicate(row, p)? {
431                        return Ok(false);
432                    }
433                }
434                Ok(true)
435            }
436            Predicate::Or { or } => {
437                for p in or {
438                    if self.evaluate_predicate(row, p)? {
439                        return Ok(true);
440                    }
441                }
442                Ok(false)
443            }
444            _ => Ok(true), // 簡易版
445        }
446    }
447
448    /// 式評価
449    fn evaluate_expr(&self, row: &Row, expr: &Expr) -> Result<Value> {
450        match expr {
451            Expr::Var(var) => {
452        row.values.get(var)
453            .cloned()
454            .ok_or_else(|| Box::new(KotobaError::Execution(format!("Variable {} not found", var))) as Box<dyn std::error::Error + Send + Sync>)
455            }
456            Expr::Const(val) => Ok(val.clone()),
457            Expr::Fn { fn_: name, args } => {
458                // アルゴリズム関数かチェック
459                if name.starts_with("algorithm_") {
460                    return self.evaluate_algorithm_function(&name[10..], args, row);
461                }
462
463                // 通常の関数
464                match name.as_str() {
465                    "degree" => {
466                        // 次数関数(簡易版)
467                        Ok(Value::Int(1))
468                    }
469                    "property" => {
470                        // プロパティアクセス関数
471                        if args.len() >= 2 {
472                            if let (Expr::Var(var), Expr::Const(Value::String(prop))) = (&args[0], &args[1]) {
473                                if let Some(Value::String(vertex_id_str)) = row.values.get(var) {
474                                    // 実際の実装ではグラフから頂点を取得してプロパティを返す
475                                    // ここでは簡易版として固定値を返す
476                                    Ok(Value::String(format!("{}.{}", vertex_id_str, prop)))
477                                } else {
478                                    Ok(Value::Null)
479                                }
480                            } else {
481                                Ok(Value::Null)
482                            }
483                        } else {
484                            Ok(Value::Null)
485                        }
486                    }
487                    _ => Ok(Value::Null),
488                }
489            }
490        }
491    }
492
493    /// アルゴリズム関数評価
494    fn evaluate_algorithm_function(&self, algorithm_name: &str, args: &[Expr], row: &Row) -> Result<Value> {
495        match algorithm_name {
496            "dijkstra" | "shortest_path" => {
497                // 例: shortest_path(source, target)
498                if args.len() >= 2 {
499                    if let (Expr::Var(source_var), Expr::Var(target_var)) = (&args[0], &args[1]) {
500                        if let (Some(Value::String(source_id)), Some(Value::String(target_id))) =
501                            (row.values.get(source_var), row.values.get(target_var)) {
502
503                            // 実際の実装ではグラフに対してDijkstraを実行
504                            // ここでは簡易版として距離を返す
505                            Ok(Value::Int(5)) // 仮の距離
506                        } else {
507                            Ok(Value::Null)
508                        }
509                    } else {
510                        Ok(Value::Null)
511                    }
512                } else {
513                    Ok(Value::Null)
514                }
515            }
516            "degree_centrality" => {
517                // 例: degree_centrality(vertex)
518                if args.len() >= 1 {
519                    if let Expr::Var(var) = &args[0] {
520                        if let Some(Value::String(_vertex_id)) = row.values.get(var) {
521                            // 実際の実装では次数中央性を計算
522                            Ok(Value::Int(3)) // 仮の次数
523                        } else {
524                            Ok(Value::Null)
525                        }
526                    } else {
527                        Ok(Value::Null)
528                    }
529                } else {
530                    Ok(Value::Null)
531                }
532            }
533            "betweenness_centrality" => {
534                // 媒介中央性
535                if args.len() >= 1 {
536                    if let Expr::Var(var) = &args[0] {
537                        if let Some(Value::String(_vertex_id)) = row.values.get(var) {
538                            Ok(Value::Int(10)) // 仮の媒介中央性
539                        } else {
540                            Ok(Value::Null)
541                        }
542                    } else {
543                        Ok(Value::Null)
544                    }
545                } else {
546                    Ok(Value::Null)
547                }
548            }
549            "closeness_centrality" => {
550                // 近接中央性
551                if args.len() >= 1 {
552                    if let Expr::Var(var) = &args[0] {
553                        if let Some(Value::String(_vertex_id)) = row.values.get(var) {
554                            Ok(Value::Int(8)) // 仮の近接中央性
555                        } else {
556                            Ok(Value::Null)
557                        }
558                    } else {
559                        Ok(Value::Null)
560                    }
561                } else {
562                    Ok(Value::Null)
563                }
564            }
565            "pagerank" => {
566                // PageRank
567                if args.len() >= 1 {
568                    if let Expr::Var(var) = &args[0] {
569                        if let Some(Value::String(_vertex_id)) = row.values.get(var) {
570                            Ok(Value::Int(15)) // 仮のPageRankスコア
571                        } else {
572                            Ok(Value::Null)
573                        }
574                    } else {
575                        Ok(Value::Null)
576                    }
577                } else {
578                    Ok(Value::Null)
579                }
580            }
581            "pattern_matching" => {
582                // パターンマッチング
583                Ok(Value::Int(2)) // 仮のマッチ数
584            }
585            _ => Ok(Value::Null),
586        }
587    }
588
589    /// 結合条件チェック
590    fn join_condition_matches(&self, left: &Row, right: &Row, on: &[String]) -> bool {
591        for key in on {
592            let left_val = left.values.get(key);
593            let right_val = right.values.get(key);
594
595            match (left_val, right_val) {
596                (Some(a), Some(b)) => {
597                    if !self.values_match(a, b) {
598                        return false;
599                    }
600                }
601                _ => return false,
602            }
603        }
604        true
605    }
606
607    /// 結合キー抽出
608    fn extract_join_key(&self, row: &Row, on: &[String]) -> String {
609        let mut key_parts = Vec::new();
610        for col in on {
611            if let Some(value) = row.values.get(col) {
612                key_parts.push(format!("{:?}", value));
613            }
614        }
615        key_parts.join("|")
616    }
617
618    /// グループキー抽出
619    fn extract_group_key(&self, row: &Row, keys: &[String]) -> String {
620        let mut key_parts = Vec::new();
621        for key in keys {
622            if let Some(value) = row.values.get(key) {
623                key_parts.push(format!("{:?}", value));
624            }
625        }
626        key_parts.join("|")
627    }
628
629    /// 集計計算
630    fn compute_aggregation(&self, rows: &[Row], agg: &Aggregation) -> Value {
631        match agg.fn_.as_str() {
632            "count" => Value::Int(rows.len() as i64),
633            "sum" => {
634                let mut sum = 0i64;
635                for row in rows {
636                    if let Some(Value::Int(val)) = row.values.get(&agg.args[0]) {
637                        sum += val;
638                    }
639                }
640                Value::Int(sum)
641            }
642            "avg" => {
643                if rows.is_empty() {
644                    Value::Int(0)
645                } else {
646                    let mut sum = 0i64;
647                    let mut count = 0;
648                    for row in rows {
649                        if let Some(Value::Int(val)) = row.values.get(&agg.args[0]) {
650                            sum += val;
651                            count += 1;
652                        }
653                    }
654                    if count > 0 {
655                        Value::Int(sum / count)
656                    } else {
657                        Value::Int(0)
658                    }
659                }
660            }
661            _ => Value::Null,
662        }
663    }
664}