kotoba_execution/execution/
executor.rs

1//! クエリ実行器
2
3use kotoba_core::{types::*, ir::*};
4use kotoba_graph::prelude::*;
5use crate::planner::*;
6use std::collections::{HashMap, HashSet};
7use kotoba_core::types::Result;
8use uuid;
9
10/// クエリ実行器
11#[derive(Debug)]
12pub struct QueryExecutor {
13    logical_planner: LogicalPlanner,
14    physical_planner: PhysicalPlanner,
15    optimizer: QueryOptimizer,
16}
17
18impl Default for QueryExecutor {
19    fn default() -> Self {
20        Self::new()
21    }
22}
23
24impl QueryExecutor {
25    pub fn new() -> Self {
26        Self {
27            logical_planner: LogicalPlanner::new(),
28            physical_planner: PhysicalPlanner::new(),
29            optimizer: QueryOptimizer::new(),
30        }
31    }
32
33    /// GQLクエリを実行
34    pub fn execute_gql(&self, gql: &str, graph: &GraphRef, catalog: &Catalog) -> Result<RowStream> {
35        // GQL → 論理プラン
36        let mut logical_plan = self.logical_planner.parse_gql(gql)?;
37
38        // 論理最適化
39        logical_plan = self.logical_planner.optimize(&logical_plan, catalog);
40
41        // クエリ最適化
42        logical_plan = self.optimizer.optimize(&logical_plan, catalog);
43
44        // 論理プラン → 物理プラン
45        let physical_plan = self.physical_planner.plan_to_physical(&logical_plan, catalog)?;
46
47        // 物理プラン実行
48        self.execute_physical_plan(&physical_plan, graph, catalog)
49    }
50
51    /// 論理プランを実行
52    pub fn execute_plan(&self, plan: &PlanIR, graph: &GraphRef, catalog: &Catalog) -> Result<RowStream> {
53        // 論理プラン → 物理プラン
54        let physical_plan = self.physical_planner.plan_to_physical(plan, catalog)?;
55
56        // 物理プラン実行
57        self.execute_physical_plan(&physical_plan, graph, catalog)
58    }
59
60    /// 物理プランを実行
61    pub fn execute_physical_plan(&self, plan: &PhysicalPlan, graph: &GraphRef, catalog: &Catalog) -> Result<RowStream> {
62        match &plan.op {
63            PhysicalOp::NodeScan { label, as_, props } => {
64                self.execute_node_scan(graph, label, as_, props.as_ref())
65            }
66            PhysicalOp::IndexScan { label, as_, index, value } => {
67                self.execute_index_scan(graph, label, as_, index, value)
68            }
69            PhysicalOp::Filter { pred, input } => {
70                let input_rows = self.execute_physical_plan(
71                    &PhysicalPlan { op: *input.clone(), estimated_cost: 0.0 },
72                    graph, catalog
73                )?;
74                self.execute_filter(input_rows, pred)
75            }
76            PhysicalOp::Expand { edge, to_as, input } => {
77                let input_rows = self.execute_physical_plan(
78                    &PhysicalPlan { op: *input.clone(), estimated_cost: 0.0 },
79                    graph, catalog
80                )?;
81                self.execute_expand(graph, input_rows, edge, to_as)
82            }
83            PhysicalOp::NestedLoopJoin { left, right, on } => {
84                let left_rows = self.execute_physical_plan(
85                    &PhysicalPlan { op: *left.clone(), estimated_cost: 0.0 },
86                    graph, catalog
87                )?;
88                let right_rows = self.execute_physical_plan(
89                    &PhysicalPlan { op: *right.clone(), estimated_cost: 0.0 },
90                    graph, catalog
91                )?;
92                self.execute_nested_loop_join(left_rows, right_rows, on)
93            }
94            PhysicalOp::HashJoin { left, right, on } => {
95                let left_rows = self.execute_physical_plan(
96                    &PhysicalPlan { op: *left.clone(), estimated_cost: 0.0 },
97                    graph, catalog
98                )?;
99                let right_rows = self.execute_physical_plan(
100                    &PhysicalPlan { op: *right.clone(), estimated_cost: 0.0 },
101                    graph, catalog
102                )?;
103                self.execute_hash_join(left_rows, right_rows, on)
104            }
105            PhysicalOp::Project { cols, input } => {
106                let input_rows = self.execute_physical_plan(
107                    &PhysicalPlan { op: *input.clone(), estimated_cost: 0.0 },
108                    graph, catalog
109                )?;
110                self.execute_project(input_rows, cols)
111            }
112            PhysicalOp::Limit { count, input } => {
113                let input_rows = self.execute_physical_plan(
114                    &PhysicalPlan { op: *input.clone(), estimated_cost: 0.0 },
115                    graph, catalog
116                )?;
117                Ok(input_rows.into_iter().take(*count).collect())
118            }
119            PhysicalOp::Distinct { input } => {
120                let input_rows = self.execute_physical_plan(
121                    &PhysicalPlan { op: *input.clone(), estimated_cost: 0.0 },
122                    graph, catalog
123                )?;
124                self.execute_distinct(input_rows)
125            }
126            PhysicalOp::Sort { keys, input } => {
127                let mut input_rows = self.execute_physical_plan(
128                    &PhysicalPlan { op: *input.clone(), estimated_cost: 0.0 },
129                    graph, catalog
130                )?;
131                self.execute_sort(&mut input_rows, keys);
132                Ok(input_rows)
133            }
134            PhysicalOp::Group { keys, aggregations, input } => {
135                let input_rows = self.execute_physical_plan(
136                    &PhysicalPlan { op: *input.clone(), estimated_cost: 0.0 },
137                    graph, catalog
138                )?;
139                self.execute_group(input_rows, keys, aggregations)
140            }
141        }
142    }
143
144    /// ノードスキャン実行
145    fn execute_node_scan(&self, graph: &GraphRef, label: &Label, as_: &str, props: Option<&Properties>) -> Result<RowStream> {
146        let graph = graph.read();
147        let mut rows = Vec::new();
148
149        let vertex_ids = if let Some(props) = props {
150            // プロパティフィルタ付きスキャン(簡易版)
151            graph.vertices.values()
152                .filter(|v| v.labels.contains(label))
153                .filter(|v| self.matches_properties(&v.props, props))
154                .map(|v| v.id)
155                .collect::<Vec<_>>()
156        } else {
157            graph.vertices_by_label(label).into_iter().collect::<Vec<_>>()
158        };
159
160        for vertex_id in vertex_ids {
161            if let Some(_vertex) = graph.get_vertex(&vertex_id) {
162                let mut row = HashMap::new();
163                row.insert(as_.to_string(), Value::String(vertex_id.to_string()));
164                rows.push(Row { values: row });
165            }
166        }
167
168        Ok(rows)
169    }
170
171    /// インデックススキャン実行
172    fn execute_index_scan(&self, graph: &GraphRef, label: &Label, as_: &str, _index: &str, _value: &Value) -> Result<RowStream> {
173        // 簡易的なインデックススキャン(実際の実装ではインデックスを使用)
174        self.execute_node_scan(graph, label, as_, None)
175    }
176
177    /// フィルタ実行
178    fn execute_filter(&self, input_rows: RowStream, pred: &Predicate) -> Result<RowStream> {
179        let mut result = Vec::new();
180
181        for row in input_rows {
182            if self.evaluate_predicate(&row, pred)? {
183                result.push(row);
184            }
185        }
186
187        Ok(result)
188    }
189
190    /// エッジ展開実行
191    fn execute_expand(&self, graph: &GraphRef, input_rows: RowStream, edge: &EdgePattern, to_as: &str) -> Result<RowStream> {
192        let graph = graph.read();
193        let mut result = Vec::new();
194
195        for row in input_rows {
196            // ソース頂点を取得(簡易版)
197            for value in row.values.values() {
198                if let Value::String(vertex_id_str) = value {
199                    if let Ok(vertex_id) = vertex_id_str.parse::<uuid::Uuid>() {
200                        if let Some(vertex_id) = graph.vertices.get_key_value(&vertex_id).map(|(id, _)| *id) {
201                            let neighbors = match edge.dir {
202                                Direction::Out => graph.adj_out.get(&vertex_id).cloned(),
203                                Direction::In => graph.adj_in.get(&vertex_id).cloned(),
204                                Direction::Both => {
205                                    // 双方向の場合、outとinをマージ
206                                    let mut all_neighbors = HashSet::new();
207                                    if let Some(out) = graph.adj_out.get(&vertex_id) {
208                                        all_neighbors.extend(out);
209                                    }
210                                    if let Some(in_) = graph.adj_in.get(&vertex_id) {
211                                        all_neighbors.extend(in_);
212                                    }
213                                    Some(all_neighbors)
214                                }
215                            };
216
217                            if let Some(neighbors) = neighbors {
218                                for &neighbor_id in &neighbors {
219                                    let mut new_row = row.clone();
220                                    new_row.values.insert(to_as.to_string(), Value::String(neighbor_id.to_string()));
221                                    result.push(Row { values: new_row.values });
222                                }
223                            }
224                        }
225                    }
226                }
227            }
228        }
229
230        Ok(result)
231    }
232
233    /// ネステッドループ結合実行
234    fn execute_nested_loop_join(&self, left_rows: RowStream, right_rows: RowStream, on: &[String]) -> Result<RowStream> {
235        let mut result = Vec::new();
236
237        for left_row in &left_rows {
238            for right_row in &right_rows {
239                if self.join_condition_matches(left_row, right_row, on) {
240                    let mut combined = left_row.values.clone();
241                    combined.extend(right_row.values.clone());
242                    result.push(Row { values: combined });
243                }
244            }
245        }
246
247        Ok(result)
248    }
249
250    /// ハッシュ結合実行
251    fn execute_hash_join(&self, left_rows: RowStream, right_rows: RowStream, on: &[String]) -> Result<RowStream> {
252        let mut hash_table = HashMap::new();
253        let mut result = Vec::new();
254
255        // 右側をハッシュ化
256        for row in right_rows {
257            let key = self.extract_join_key(&row, on);
258            hash_table.entry(key).or_insert(Vec::new()).push(row);
259        }
260
261        // 左側をプローブ
262        for left_row in left_rows {
263            let key = self.extract_join_key(&left_row, on);
264            if let Some(right_rows) = hash_table.get(&key) {
265                for right_row in right_rows {
266                    let mut combined = left_row.values.clone();
267                    combined.extend(right_row.values.clone());
268                    result.push(Row { values: combined });
269                }
270            }
271        }
272
273        Ok(result)
274    }
275
276    /// 射影実行
277    fn execute_project(&self, input_rows: RowStream, cols: &[String]) -> Result<RowStream> {
278        let mut result = Vec::new();
279
280        for row in input_rows {
281            let mut projected = HashMap::new();
282            for col in cols {
283                if let Some(value) = row.values.get(col) {
284                    projected.insert(col.clone(), value.clone());
285                }
286            }
287            result.push(Row { values: projected });
288        }
289
290        Ok(result)
291    }
292
293    /// 重複除去実行
294    fn execute_distinct(&self, input_rows: RowStream) -> Result<RowStream> {
295        let mut seen = HashSet::new();
296        let mut result = Vec::new();
297
298        for row in input_rows {
299            let key = format!("{:?}", row.values);
300            if seen.insert(key) {
301                result.push(row);
302            }
303        }
304
305        Ok(result)
306    }
307
308    /// ソート実行
309    fn execute_sort(&self, rows: &mut RowStream, keys: &[SortKey]) {
310        rows.sort_by(|a, b| {
311            for key in keys {
312                let a_val = a.values.get(&key.expr.to_string());
313                let b_val = b.values.get(&key.expr.to_string());
314
315                match (a_val, b_val) {
316                    (Some(Value::Int(x)), Some(Value::Int(y))) => {
317                        let cmp = x.cmp(y);
318                        if cmp != std::cmp::Ordering::Equal {
319                            return if key.asc { cmp } else { cmp.reverse() };
320                        }
321                    }
322                    (Some(Value::String(x)), Some(Value::String(y))) => {
323                        let cmp = x.cmp(y);
324                        if cmp != std::cmp::Ordering::Equal {
325                            return if key.asc { cmp } else { cmp.reverse() };
326                        }
327                    }
328                    _ => {}
329                }
330            }
331            std::cmp::Ordering::Equal
332        });
333    }
334
335    /// グループ化実行
336    fn execute_group(&self, input_rows: RowStream, keys: &[String], aggregations: &[Aggregation]) -> Result<RowStream> {
337        let mut groups: HashMap<String, Vec<Row>> = HashMap::new();
338
339        // グループ化
340        for row in input_rows {
341            let group_key = self.extract_group_key(&row, keys);
342            groups.entry(group_key).or_default().push(row);
343        }
344
345        // 集計
346        let mut result = Vec::new();
347        for (group_key, group_rows) in groups {
348            let mut aggregated = HashMap::new();
349
350            // グループキーを設定
351            let key_parts: Vec<&str> = group_key.split('|').collect();
352            for (i, key) in keys.iter().enumerate() {
353                if let Some(&key_part) = key_parts.get(i) {
354                    // 簡易的に文字列として扱う
355                    aggregated.insert(key.clone(), Value::String(key_part.to_string()));
356                }
357            }
358
359            // 集計関数を適用
360            for agg in aggregations {
361                let value = self.compute_aggregation(&group_rows, agg);
362                aggregated.insert(agg.as_.clone(), value);
363            }
364
365            result.push(Row { values: aggregated });
366        }
367
368        Ok(result)
369    }
370
371    /// プロパティマッチング
372    fn matches_properties(&self, vertex_props: &Properties, filter_props: &Properties) -> bool {
373        for (key, expected_value) in filter_props {
374            if let Some(actual_value) = vertex_props.get(key) {
375                if !self.values_match(actual_value, expected_value) {
376                    return false;
377                }
378            } else {
379                return false;
380            }
381        }
382        true
383    }
384
385    /// 値マッチング
386    fn values_match(&self, a: &Value, b: &Value) -> bool {
387        match (a, b) {
388            (Value::Null, Value::Null) => true,
389            (Value::Bool(x), Value::Bool(y)) => x == y,
390            (Value::Int(x), Value::Int(y)) => x == y,
391            (Value::String(x), Value::String(y)) => x == y,
392            _ => false,
393        }
394    }
395
396    /// 述語評価
397    fn evaluate_predicate(&self, row: &Row, pred: &Predicate) -> Result<bool> {
398        match pred {
399            Predicate::Eq { eq } if eq.len() == 2 => {
400                let left = self.evaluate_expr(row, &eq[0])?;
401                let right = self.evaluate_expr(row, &eq[1])?;
402                Ok(self.values_match(&left, &right))
403            }
404            Predicate::And { and } => {
405                for p in and {
406                    if !self.evaluate_predicate(row, p)? {
407                        return Ok(false);
408                    }
409                }
410                Ok(true)
411            }
412            Predicate::Or { or } => {
413                for p in or {
414                    if self.evaluate_predicate(row, p)? {
415                        return Ok(true);
416                    }
417                }
418                Ok(false)
419            }
420            _ => Ok(true), // 簡易版
421        }
422    }
423
424    /// 式評価
425    pub fn evaluate_expr(&self, row: &Row, expr: &Expr) -> Result<Value> {
426        match expr {
427            Expr::Var(var) => {
428                row.values.get(var)
429                    .cloned()
430                    .ok_or_else(|| KotobaError::Execution(format!("Variable {} not found in row", var)))
431            }
432            Expr::Const(val) => Ok(val.clone()),
433            Expr::Fn { fn_: name, args } => {
434                self.evaluate_function(name, args, row)
435            }
436        }
437    }
438
439    /// 関数呼び出し評価
440    fn evaluate_function(&self, name: &str, args: &[Expr], row: &Row) -> Result<Value> {
441        match name {
442            // グラフ関数
443            "degree" => self.evaluate_degree_function(args, row),
444            "labels" => self.evaluate_labels_function(args, row),
445            "keys" => self.evaluate_keys_function(args, row),
446            "hasLabel" => self.evaluate_has_label_function(args, row),
447            "properties" => self.evaluate_properties_function(args, row),
448
449            // 数学関数
450            "abs" | "sqrt" | "sin" | "cos" | "tan" | "log" | "exp" | "floor" | "ceil" | "round" =>
451                self.evaluate_math_function(name, args, row),
452
453            // 文字列関数
454            "length" | "substring" | "startsWith" | "endsWith" | "contains" |
455            "toLower" | "toUpper" | "trim" | "split" =>
456                self.evaluate_string_function(name, args, row),
457
458            // コレクション関数
459            "size" | "isEmpty" | "reverse" =>
460                self.evaluate_collection_function(name, args, row),
461
462            // 型変換関数
463            "toString" | "toInteger" | "toFloat" | "toBoolean" =>
464                self.evaluate_conversion_function(name, args, row),
465
466            // 集計関数(クエリ実行時のみ使用)
467            "count" | "sum" | "avg" | "min" | "max" => {
468                Err(KotobaError::Execution(format!("Aggregate function {} should be handled in Group operator", name)))
469            }
470
471            _ => Err(KotobaError::Execution(format!("Unknown function: {}", name))),
472        }
473    }
474
475    /// degree関数評価
476    fn evaluate_degree_function(&self, args: &[Expr], row: &Row) -> Result<Value> {
477        if args.len() != 1 {
478            return Err(KotobaError::Execution("degree() function requires exactly 1 argument".to_string()));
479        }
480
481        let vertex_id_str = match self.evaluate_expr(row, &args[0])? {
482            Value::String(s) => s,
483            _ => return Err(KotobaError::Execution("degree() argument must be a vertex ID string".to_string())),
484        };
485
486        // 簡易実装:実際のグラフから次数を取得する必要がある
487        // ここでは仮に固定値を返す
488        Ok(Value::Int(1))
489    }
490
491    /// labels関数評価
492    fn evaluate_labels_function(&self, args: &[Expr], row: &Row) -> Result<Value> {
493        if args.len() != 1 {
494            return Err(KotobaError::Execution("labels() function requires exactly 1 argument".to_string()));
495        }
496
497        let vertex_id_str = match self.evaluate_expr(row, &args[0])? {
498            Value::String(s) => s,
499            _ => return Err(KotobaError::Execution("labels() argument must be a vertex ID string".to_string())),
500        };
501
502        // 簡易実装:実際のグラフからラベルを取得する必要がある
503        // ここでは仮に空のリストを返す
504        Ok(Value::String("[]".to_string()))
505    }
506
507    /// keys関数評価
508    fn evaluate_keys_function(&self, args: &[Expr], row: &Row) -> Result<Value> {
509        if args.len() != 1 {
510            return Err(KotobaError::Execution("keys() function requires exactly 1 argument".to_string()));
511        }
512
513        let vertex_id_str = match self.evaluate_expr(row, &args[0])? {
514            Value::String(s) => s,
515            _ => return Err(KotobaError::Execution("keys() argument must be a vertex ID string".to_string())),
516        };
517
518        // 簡易実装:実際のグラフからプロパティキーを取得する必要がある
519        // ここでは仮に空のリストを返す
520        Ok(Value::String("[]".to_string()))
521    }
522
523    /// hasLabel関数評価
524    fn evaluate_has_label_function(&self, args: &[Expr], row: &Row) -> Result<Value> {
525        if args.len() != 2 {
526            return Err(KotobaError::Execution("hasLabel() function requires exactly 2 arguments".to_string()));
527        }
528
529        let vertex_id_str = match self.evaluate_expr(row, &args[0])? {
530            Value::String(s) => s,
531            _ => return Err(KotobaError::Execution("hasLabel() first argument must be a vertex ID string".to_string())),
532        };
533
534        let label = match self.evaluate_expr(row, &args[1])? {
535            Value::String(s) => s,
536            _ => return Err(KotobaError::Execution("hasLabel() second argument must be a label string".to_string())),
537        };
538
539        // 簡易実装:実際のグラフからラベルチェックを行う必要がある
540        // ここでは仮にfalseを返す
541        Ok(Value::Bool(false))
542    }
543
544    /// properties関数評価
545    fn evaluate_properties_function(&self, args: &[Expr], row: &Row) -> Result<Value> {
546        if args.len() != 1 {
547            return Err(KotobaError::Execution("properties() function requires exactly 1 argument".to_string()));
548        }
549
550        let vertex_id_str = match self.evaluate_expr(row, &args[0])? {
551            Value::String(s) => s,
552            _ => return Err(KotobaError::Execution("properties() argument must be a vertex ID string".to_string())),
553        };
554
555        // 簡易実装:実際のグラフからプロパティを取得する必要がある
556        // ここでは仮に空のマップを返す
557        Ok(Value::String("{}".to_string()))
558    }
559
560    /// 数学関数評価
561    fn evaluate_math_function(&self, name: &str, args: &[Expr], row: &Row) -> Result<Value> {
562        if args.len() != 1 {
563            return Err(KotobaError::Execution(format!("{}() function requires exactly 1 argument", name)));
564        }
565
566        let value = self.evaluate_expr(row, &args[0])?;
567        let num = match value {
568            Value::Int(n) => n as f64,
569            Value::Integer(n) => n as f64,
570            _ => return Err(KotobaError::Execution(format!("{}() argument must be a number", name))),
571        };
572
573        let result = match name {
574            "abs" => num.abs(),
575            "sqrt" if num >= 0.0 => num.sqrt(),
576            "sqrt" => return Err(KotobaError::Execution("sqrt() of negative number".to_string())),
577            "sin" => num.sin(),
578            "cos" => num.cos(),
579            "tan" => num.tan(),
580            "log" if num > 0.0 => num.ln(),
581            "log" => return Err(KotobaError::Execution("log() of non-positive number".to_string())),
582            "exp" => num.exp(),
583            "floor" => num.floor(),
584            "ceil" => num.ceil(),
585            "round" => num.round(),
586            _ => return Err(KotobaError::Execution(format!("Unknown math function: {}", name))),
587        };
588
589        Ok(Value::Int(result as i64))
590    }
591
592    /// 文字列関数評価
593    fn evaluate_string_function(&self, name: &str, args: &[Expr], row: &Row) -> Result<Value> {
594        match name {
595            "length" => {
596                if args.len() != 1 {
597                    return Err(KotobaError::Execution("length() function requires exactly 1 argument".to_string()));
598                }
599                match self.evaluate_expr(row, &args[0])? {
600                    Value::String(s) => Ok(Value::Int(s.len() as i64)),
601                    _ => Err(KotobaError::Execution("length() argument must be a string".to_string())),
602                }
603            }
604            "substring" => {
605                if args.len() != 3 {
606                    return Err(KotobaError::Execution("substring() function requires exactly 3 arguments".to_string()));
607                }
608                let s = match self.evaluate_expr(row, &args[0])? {
609                    Value::String(s) => s,
610                    _ => return Err(KotobaError::Execution("substring() first argument must be a string".to_string())),
611                };
612                let start = match self.evaluate_expr(row, &args[1])? {
613                    Value::Int(n) => n as usize,
614                    _ => return Err(KotobaError::Execution("substring() second argument must be an integer".to_string())),
615                };
616                let len = match self.evaluate_expr(row, &args[2])? {
617                    Value::Int(n) => n as usize,
618                    _ => return Err(KotobaError::Execution("substring() third argument must be an integer".to_string())),
619                };
620
621                if start >= s.len() {
622                    Ok(Value::String(String::new()))
623                } else {
624                    let end = (start + len).min(s.len());
625                    Ok(Value::String(s[start..end].to_string()))
626                }
627            }
628            "startsWith" => {
629                if args.len() != 2 {
630                    return Err(KotobaError::Execution("startsWith() function requires exactly 2 arguments".to_string()));
631                }
632                let s = match self.evaluate_expr(row, &args[0])? {
633                    Value::String(s) => s,
634                    _ => return Err(KotobaError::Execution("startsWith() first argument must be a string".to_string())),
635                };
636                let prefix = match self.evaluate_expr(row, &args[1])? {
637                    Value::String(s) => s,
638                    _ => return Err(KotobaError::Execution("startsWith() second argument must be a string".to_string())),
639                };
640                Ok(Value::Bool(s.starts_with(&prefix)))
641            }
642            "endsWith" => {
643                if args.len() != 2 {
644                    return Err(KotobaError::Execution("endsWith() function requires exactly 2 arguments".to_string()));
645                }
646                let s = match self.evaluate_expr(row, &args[0])? {
647                    Value::String(s) => s,
648                    _ => return Err(KotobaError::Execution("endsWith() first argument must be a string".to_string())),
649                };
650                let suffix = match self.evaluate_expr(row, &args[1])? {
651                    Value::String(s) => s,
652                    _ => return Err(KotobaError::Execution("endsWith() second argument must be a string".to_string())),
653                };
654                Ok(Value::Bool(s.ends_with(&suffix)))
655            }
656            "contains" => {
657                if args.len() != 2 {
658                    return Err(KotobaError::Execution("contains() function requires exactly 2 arguments".to_string()));
659                }
660                let s = match self.evaluate_expr(row, &args[0])? {
661                    Value::String(s) => s,
662                    _ => return Err(KotobaError::Execution("contains() first argument must be a string".to_string())),
663                };
664                let substr = match self.evaluate_expr(row, &args[1])? {
665                    Value::String(s) => s,
666                    _ => return Err(KotobaError::Execution("contains() second argument must be a string".to_string())),
667                };
668                Ok(Value::Bool(s.contains(&substr)))
669            }
670            "toLower" => {
671                if args.len() != 1 {
672                    return Err(KotobaError::Execution("toLower() function requires exactly 1 argument".to_string()));
673                }
674                match self.evaluate_expr(row, &args[0])? {
675                    Value::String(s) => Ok(Value::String(s.to_lowercase())),
676                    _ => Err(KotobaError::Execution("toLower() argument must be a string".to_string())),
677                }
678            }
679            "toUpper" => {
680                if args.len() != 1 {
681                    return Err(KotobaError::Execution("toUpper() function requires exactly 1 argument".to_string()));
682                }
683                match self.evaluate_expr(row, &args[0])? {
684                    Value::String(s) => Ok(Value::String(s.to_uppercase())),
685                    _ => Err(KotobaError::Execution("toUpper() argument must be a string".to_string())),
686                }
687            }
688            "trim" => {
689                if args.len() != 1 {
690                    return Err(KotobaError::Execution("trim() function requires exactly 1 argument".to_string()));
691                }
692                match self.evaluate_expr(row, &args[0])? {
693                    Value::String(s) => Ok(Value::String(s.trim().to_string())),
694                    _ => Err(KotobaError::Execution("trim() argument must be a string".to_string())),
695                }
696            }
697            "split" => {
698                if args.len() != 2 {
699                    return Err(KotobaError::Execution("split() function requires exactly 2 arguments".to_string()));
700                }
701                let s = match self.evaluate_expr(row, &args[0])? {
702                    Value::String(s) => s,
703                    _ => return Err(KotobaError::Execution("split() first argument must be a string".to_string())),
704                };
705                let sep = match self.evaluate_expr(row, &args[1])? {
706                    Value::String(s) => s,
707                    _ => return Err(KotobaError::Execution("split() second argument must be a string".to_string())),
708                };
709                // 簡易実装:実際にはリストを返すべき
710                Ok(Value::String(format!("[{}]", s.split(&sep).collect::<Vec<_>>().join(", "))))
711            }
712            _ => Err(KotobaError::Execution(format!("Unknown string function: {}", name))),
713        }
714    }
715
716    /// コレクション関数評価
717    fn evaluate_collection_function(&self, name: &str, args: &[Expr], row: &Row) -> Result<Value> {
718        match name {
719            "size" => {
720                if args.len() != 1 {
721                    return Err(KotobaError::Execution("size() function requires exactly 1 argument".to_string()));
722                }
723                // 簡易実装:実際のコレクションサイズを計算する必要がある
724                Ok(Value::Int(0))
725            }
726            "isEmpty" => {
727                if args.len() != 1 {
728                    return Err(KotobaError::Execution("isEmpty() function requires exactly 1 argument".to_string()));
729                }
730                // 簡易実装:実際のコレクションの空チェックを行う必要がある
731                Ok(Value::Bool(true))
732            }
733            "reverse" => {
734                if args.len() != 1 {
735                    return Err(KotobaError::Execution("reverse() function requires exactly 1 argument".to_string()));
736                }
737                // 簡易実装:実際のコレクションを逆順にする必要がある
738                Ok(Value::String("[]".to_string()))
739            }
740            _ => Err(KotobaError::Execution(format!("Unknown collection function: {}", name))),
741        }
742    }
743
744    /// 型変換関数評価
745    fn evaluate_conversion_function(&self, name: &str, args: &[Expr], row: &Row) -> Result<Value> {
746        if args.len() != 1 {
747            return Err(KotobaError::Execution(format!("{}() function requires exactly 1 argument", name)));
748        }
749
750        let value = self.evaluate_expr(row, &args[0])?;
751
752        match name {
753            "toString" => Ok(Value::String(self.value_to_string(&value))),
754            "toInteger" => {
755                match value {
756                    Value::String(s) => {
757                        match s.parse::<i64>() {
758                            Ok(n) => Ok(Value::Int(n)),
759                            Err(_) => Err(KotobaError::Execution(format!("Cannot convert '{}' to integer", s))),
760                        }
761                    }
762                    Value::Int(n) => Ok(Value::Int(n)),
763                    Value::Integer(n) => Ok(Value::Integer(n)),
764                    _ => Err(KotobaError::Execution(format!("Cannot convert {:?} to integer", value))),
765                }
766            }
767            "toFloat" => {
768                match value {
769                    Value::String(s) => {
770                        match s.parse::<f64>() {
771                            Ok(n) => Ok(Value::Int(n as i64)), // 簡易的にIntとして扱う
772                            Err(_) => Err(KotobaError::Execution(format!("Cannot convert '{}' to float", s))),
773                        }
774                    }
775                    Value::Int(n) => Ok(Value::Int(n)), // 簡易的にIntとして扱う
776                    Value::Integer(n) => Ok(Value::Integer(n)), // 簡易的にIntとして扱う
777                    _ => Err(KotobaError::Execution(format!("Cannot convert {:?} to float", value))),
778                }
779            }
780            "toBoolean" => {
781                match value {
782                    Value::String(s) => {
783                        let b = match s.to_lowercase().as_str() {
784                            "true" | "1" | "yes" => true,
785                            "false" | "0" | "no" => false,
786                            _ => return Err(KotobaError::Execution(format!("Cannot convert '{}' to boolean", s))),
787                        };
788                        Ok(Value::Bool(b))
789                    }
790                    Value::Int(n) => Ok(Value::Bool(n != 0)),
791                    Value::Integer(n) => Ok(Value::Bool(n != 0)),
792                    Value::Bool(b) => Ok(Value::Bool(b)),
793                    _ => Err(KotobaError::Execution(format!("Cannot convert {:?} to boolean", value))),
794                }
795            }
796            _ => Err(KotobaError::Execution(format!("Unknown conversion function: {}", name))),
797        }
798    }
799
800    /// 値を文字列に変換
801    fn value_to_string(&self, value: &Value) -> String {
802        match value {
803            Value::Null => "null".to_string(),
804            Value::Bool(b) => b.to_string(),
805            Value::Int(n) => n.to_string(),
806            Value::Integer(n) => n.to_string(),
807            Value::String(s) => s.clone(),
808        }
809    }
810
811    /// 結合条件チェック
812    fn join_condition_matches(&self, left: &Row, right: &Row, on: &[String]) -> bool {
813        for key in on {
814            let left_val = left.values.get(key);
815            let right_val = right.values.get(key);
816
817            match (left_val, right_val) {
818                (Some(a), Some(b)) => {
819                    if !self.values_match(a, b) {
820                        return false;
821                    }
822                }
823                _ => return false,
824            }
825        }
826        true
827    }
828
829    /// 結合キー抽出
830    fn extract_join_key(&self, row: &Row, on: &[String]) -> String {
831        let mut key_parts = Vec::new();
832        for col in on {
833            if let Some(value) = row.values.get(col) {
834                key_parts.push(format!("{:?}", value));
835            }
836        }
837        key_parts.join("|")
838    }
839
840    /// グループキー抽出
841    fn extract_group_key(&self, row: &Row, keys: &[String]) -> String {
842        let mut key_parts = Vec::new();
843        for key in keys {
844            if let Some(value) = row.values.get(key) {
845                key_parts.push(format!("{:?}", value));
846            }
847        }
848        key_parts.join("|")
849    }
850
851    /// 集計計算
852    fn compute_aggregation(&self, rows: &[Row], agg: &Aggregation) -> Value {
853        match agg.fn_.as_str() {
854            "count" => Value::Int(rows.len() as i64),
855            "sum" => {
856                let mut sum = 0i64;
857                for row in rows {
858                    if let Some(Value::Int(val)) = row.values.get(&agg.args[0]) {
859                        sum += val;
860                    }
861                }
862                Value::Int(sum)
863            }
864            "avg" => {
865                if rows.is_empty() {
866                    Value::Int(0)
867                } else {
868                    let mut sum = 0i64;
869                    let mut count = 0;
870                    for row in rows {
871                        if let Some(Value::Int(val)) = row.values.get(&agg.args[0]) {
872                            sum += val;
873                            count += 1;
874                        }
875                    }
876                    if count > 0 {
877                        Value::Int(sum / count)
878                    } else {
879                        Value::Int(0)
880                    }
881                }
882            }
883            _ => Value::Null,
884        }
885    }
886}