1use kotoba_core::{types::*, ir::*};
4use kotoba_graph::prelude::*;
5use crate::planner::*;
6use std::collections::{HashMap, HashSet};
7use kotoba_core::types::Result;
8use uuid;
9
10#[derive(Debug)]
12pub struct QueryExecutor {
13 logical_planner: LogicalPlanner,
14 physical_planner: PhysicalPlanner,
15 optimizer: QueryOptimizer,
16}
17
18impl Default for QueryExecutor {
19 fn default() -> Self {
20 Self::new()
21 }
22}
23
24impl QueryExecutor {
25 pub fn new() -> Self {
26 Self {
27 logical_planner: LogicalPlanner::new(),
28 physical_planner: PhysicalPlanner::new(),
29 optimizer: QueryOptimizer::new(),
30 }
31 }
32
33 pub fn execute_gql(&self, gql: &str, graph: &GraphRef, catalog: &Catalog) -> Result<RowStream> {
35 let mut logical_plan = self.logical_planner.parse_gql(gql)?;
37
38 logical_plan = self.logical_planner.optimize(&logical_plan, catalog);
40
41 logical_plan = self.optimizer.optimize(&logical_plan, catalog);
43
44 let physical_plan = self.physical_planner.plan_to_physical(&logical_plan, catalog)?;
46
47 self.execute_physical_plan(&physical_plan, graph, catalog)
49 }
50
51 pub fn execute_plan(&self, plan: &PlanIR, graph: &GraphRef, catalog: &Catalog) -> Result<RowStream> {
53 let physical_plan = self.physical_planner.plan_to_physical(plan, catalog)?;
55
56 self.execute_physical_plan(&physical_plan, graph, catalog)
58 }
59
60 pub fn execute_physical_plan(&self, plan: &PhysicalPlan, graph: &GraphRef, catalog: &Catalog) -> Result<RowStream> {
62 match &plan.op {
63 PhysicalOp::NodeScan { label, as_, props } => {
64 self.execute_node_scan(graph, label, as_, props.as_ref())
65 }
66 PhysicalOp::IndexScan { label, as_, index, value } => {
67 self.execute_index_scan(graph, label, as_, index, value)
68 }
69 PhysicalOp::Filter { pred, input } => {
70 let input_rows = self.execute_physical_plan(
71 &PhysicalPlan { op: *input.clone(), estimated_cost: 0.0 },
72 graph, catalog
73 )?;
74 self.execute_filter(input_rows, pred)
75 }
76 PhysicalOp::Expand { edge, to_as, input } => {
77 let input_rows = self.execute_physical_plan(
78 &PhysicalPlan { op: *input.clone(), estimated_cost: 0.0 },
79 graph, catalog
80 )?;
81 self.execute_expand(graph, input_rows, edge, to_as)
82 }
83 PhysicalOp::NestedLoopJoin { left, right, on } => {
84 let left_rows = self.execute_physical_plan(
85 &PhysicalPlan { op: *left.clone(), estimated_cost: 0.0 },
86 graph, catalog
87 )?;
88 let right_rows = self.execute_physical_plan(
89 &PhysicalPlan { op: *right.clone(), estimated_cost: 0.0 },
90 graph, catalog
91 )?;
92 self.execute_nested_loop_join(left_rows, right_rows, on)
93 }
94 PhysicalOp::HashJoin { left, right, on } => {
95 let left_rows = self.execute_physical_plan(
96 &PhysicalPlan { op: *left.clone(), estimated_cost: 0.0 },
97 graph, catalog
98 )?;
99 let right_rows = self.execute_physical_plan(
100 &PhysicalPlan { op: *right.clone(), estimated_cost: 0.0 },
101 graph, catalog
102 )?;
103 self.execute_hash_join(left_rows, right_rows, on)
104 }
105 PhysicalOp::Project { cols, input } => {
106 let input_rows = self.execute_physical_plan(
107 &PhysicalPlan { op: *input.clone(), estimated_cost: 0.0 },
108 graph, catalog
109 )?;
110 self.execute_project(input_rows, cols)
111 }
112 PhysicalOp::Limit { count, input } => {
113 let input_rows = self.execute_physical_plan(
114 &PhysicalPlan { op: *input.clone(), estimated_cost: 0.0 },
115 graph, catalog
116 )?;
117 Ok(input_rows.into_iter().take(*count).collect())
118 }
119 PhysicalOp::Distinct { input } => {
120 let input_rows = self.execute_physical_plan(
121 &PhysicalPlan { op: *input.clone(), estimated_cost: 0.0 },
122 graph, catalog
123 )?;
124 self.execute_distinct(input_rows)
125 }
126 PhysicalOp::Sort { keys, input } => {
127 let mut input_rows = self.execute_physical_plan(
128 &PhysicalPlan { op: *input.clone(), estimated_cost: 0.0 },
129 graph, catalog
130 )?;
131 self.execute_sort(&mut input_rows, keys);
132 Ok(input_rows)
133 }
134 PhysicalOp::Group { keys, aggregations, input } => {
135 let input_rows = self.execute_physical_plan(
136 &PhysicalPlan { op: *input.clone(), estimated_cost: 0.0 },
137 graph, catalog
138 )?;
139 self.execute_group(input_rows, keys, aggregations)
140 }
141 }
142 }
143
144 fn execute_node_scan(&self, graph: &GraphRef, label: &Label, as_: &str, props: Option<&Properties>) -> Result<RowStream> {
146 let graph = graph.read();
147 let mut rows = Vec::new();
148
149 let vertex_ids = if let Some(props) = props {
150 graph.vertices.values()
152 .filter(|v| v.labels.contains(label))
153 .filter(|v| self.matches_properties(&v.props, props))
154 .map(|v| v.id)
155 .collect::<Vec<_>>()
156 } else {
157 graph.vertices_by_label(label).into_iter().collect::<Vec<_>>()
158 };
159
160 for vertex_id in vertex_ids {
161 if let Some(_vertex) = graph.get_vertex(&vertex_id) {
162 let mut row = HashMap::new();
163 row.insert(as_.to_string(), Value::String(vertex_id.to_string()));
164 rows.push(Row { values: row });
165 }
166 }
167
168 Ok(rows)
169 }
170
171 fn execute_index_scan(&self, graph: &GraphRef, label: &Label, as_: &str, _index: &str, _value: &Value) -> Result<RowStream> {
173 self.execute_node_scan(graph, label, as_, None)
175 }
176
177 fn execute_filter(&self, input_rows: RowStream, pred: &Predicate) -> Result<RowStream> {
179 let mut result = Vec::new();
180
181 for row in input_rows {
182 if self.evaluate_predicate(&row, pred)? {
183 result.push(row);
184 }
185 }
186
187 Ok(result)
188 }
189
190 fn execute_expand(&self, graph: &GraphRef, input_rows: RowStream, edge: &EdgePattern, to_as: &str) -> Result<RowStream> {
192 let graph = graph.read();
193 let mut result = Vec::new();
194
195 for row in input_rows {
196 for value in row.values.values() {
198 if let Value::String(vertex_id_str) = value {
199 if let Ok(vertex_id) = vertex_id_str.parse::<uuid::Uuid>() {
200 if let Some(vertex_id) = graph.vertices.get_key_value(&vertex_id).map(|(id, _)| *id) {
201 let neighbors = match edge.dir {
202 Direction::Out => graph.adj_out.get(&vertex_id).cloned(),
203 Direction::In => graph.adj_in.get(&vertex_id).cloned(),
204 Direction::Both => {
205 let mut all_neighbors = HashSet::new();
207 if let Some(out) = graph.adj_out.get(&vertex_id) {
208 all_neighbors.extend(out);
209 }
210 if let Some(in_) = graph.adj_in.get(&vertex_id) {
211 all_neighbors.extend(in_);
212 }
213 Some(all_neighbors)
214 }
215 };
216
217 if let Some(neighbors) = neighbors {
218 for &neighbor_id in &neighbors {
219 let mut new_row = row.clone();
220 new_row.values.insert(to_as.to_string(), Value::String(neighbor_id.to_string()));
221 result.push(Row { values: new_row.values });
222 }
223 }
224 }
225 }
226 }
227 }
228 }
229
230 Ok(result)
231 }
232
233 fn execute_nested_loop_join(&self, left_rows: RowStream, right_rows: RowStream, on: &[String]) -> Result<RowStream> {
235 let mut result = Vec::new();
236
237 for left_row in &left_rows {
238 for right_row in &right_rows {
239 if self.join_condition_matches(left_row, right_row, on) {
240 let mut combined = left_row.values.clone();
241 combined.extend(right_row.values.clone());
242 result.push(Row { values: combined });
243 }
244 }
245 }
246
247 Ok(result)
248 }
249
250 fn execute_hash_join(&self, left_rows: RowStream, right_rows: RowStream, on: &[String]) -> Result<RowStream> {
252 let mut hash_table = HashMap::new();
253 let mut result = Vec::new();
254
255 for row in right_rows {
257 let key = self.extract_join_key(&row, on);
258 hash_table.entry(key).or_insert(Vec::new()).push(row);
259 }
260
261 for left_row in left_rows {
263 let key = self.extract_join_key(&left_row, on);
264 if let Some(right_rows) = hash_table.get(&key) {
265 for right_row in right_rows {
266 let mut combined = left_row.values.clone();
267 combined.extend(right_row.values.clone());
268 result.push(Row { values: combined });
269 }
270 }
271 }
272
273 Ok(result)
274 }
275
276 fn execute_project(&self, input_rows: RowStream, cols: &[String]) -> Result<RowStream> {
278 let mut result = Vec::new();
279
280 for row in input_rows {
281 let mut projected = HashMap::new();
282 for col in cols {
283 if let Some(value) = row.values.get(col) {
284 projected.insert(col.clone(), value.clone());
285 }
286 }
287 result.push(Row { values: projected });
288 }
289
290 Ok(result)
291 }
292
293 fn execute_distinct(&self, input_rows: RowStream) -> Result<RowStream> {
295 let mut seen = HashSet::new();
296 let mut result = Vec::new();
297
298 for row in input_rows {
299 let key = format!("{:?}", row.values);
300 if seen.insert(key) {
301 result.push(row);
302 }
303 }
304
305 Ok(result)
306 }
307
308 fn execute_sort(&self, rows: &mut RowStream, keys: &[SortKey]) {
310 rows.sort_by(|a, b| {
311 for key in keys {
312 let a_val = a.values.get(&key.expr.to_string());
313 let b_val = b.values.get(&key.expr.to_string());
314
315 match (a_val, b_val) {
316 (Some(Value::Int(x)), Some(Value::Int(y))) => {
317 let cmp = x.cmp(y);
318 if cmp != std::cmp::Ordering::Equal {
319 return if key.asc { cmp } else { cmp.reverse() };
320 }
321 }
322 (Some(Value::String(x)), Some(Value::String(y))) => {
323 let cmp = x.cmp(y);
324 if cmp != std::cmp::Ordering::Equal {
325 return if key.asc { cmp } else { cmp.reverse() };
326 }
327 }
328 _ => {}
329 }
330 }
331 std::cmp::Ordering::Equal
332 });
333 }
334
335 fn execute_group(&self, input_rows: RowStream, keys: &[String], aggregations: &[Aggregation]) -> Result<RowStream> {
337 let mut groups: HashMap<String, Vec<Row>> = HashMap::new();
338
339 for row in input_rows {
341 let group_key = self.extract_group_key(&row, keys);
342 groups.entry(group_key).or_default().push(row);
343 }
344
345 let mut result = Vec::new();
347 for (group_key, group_rows) in groups {
348 let mut aggregated = HashMap::new();
349
350 let key_parts: Vec<&str> = group_key.split('|').collect();
352 for (i, key) in keys.iter().enumerate() {
353 if let Some(&key_part) = key_parts.get(i) {
354 aggregated.insert(key.clone(), Value::String(key_part.to_string()));
356 }
357 }
358
359 for agg in aggregations {
361 let value = self.compute_aggregation(&group_rows, agg);
362 aggregated.insert(agg.as_.clone(), value);
363 }
364
365 result.push(Row { values: aggregated });
366 }
367
368 Ok(result)
369 }
370
371 fn matches_properties(&self, vertex_props: &Properties, filter_props: &Properties) -> bool {
373 for (key, expected_value) in filter_props {
374 if let Some(actual_value) = vertex_props.get(key) {
375 if !self.values_match(actual_value, expected_value) {
376 return false;
377 }
378 } else {
379 return false;
380 }
381 }
382 true
383 }
384
385 fn values_match(&self, a: &Value, b: &Value) -> bool {
387 match (a, b) {
388 (Value::Null, Value::Null) => true,
389 (Value::Bool(x), Value::Bool(y)) => x == y,
390 (Value::Int(x), Value::Int(y)) => x == y,
391 (Value::String(x), Value::String(y)) => x == y,
392 _ => false,
393 }
394 }
395
396 fn evaluate_predicate(&self, row: &Row, pred: &Predicate) -> Result<bool> {
398 match pred {
399 Predicate::Eq { eq } if eq.len() == 2 => {
400 let left = self.evaluate_expr(row, &eq[0])?;
401 let right = self.evaluate_expr(row, &eq[1])?;
402 Ok(self.values_match(&left, &right))
403 }
404 Predicate::And { and } => {
405 for p in and {
406 if !self.evaluate_predicate(row, p)? {
407 return Ok(false);
408 }
409 }
410 Ok(true)
411 }
412 Predicate::Or { or } => {
413 for p in or {
414 if self.evaluate_predicate(row, p)? {
415 return Ok(true);
416 }
417 }
418 Ok(false)
419 }
420 _ => Ok(true), }
422 }
423
424 pub fn evaluate_expr(&self, row: &Row, expr: &Expr) -> Result<Value> {
426 match expr {
427 Expr::Var(var) => {
428 row.values.get(var)
429 .cloned()
430 .ok_or_else(|| KotobaError::Execution(format!("Variable {} not found in row", var)))
431 }
432 Expr::Const(val) => Ok(val.clone()),
433 Expr::Fn { fn_: name, args } => {
434 self.evaluate_function(name, args, row)
435 }
436 }
437 }
438
439 fn evaluate_function(&self, name: &str, args: &[Expr], row: &Row) -> Result<Value> {
441 match name {
442 "degree" => self.evaluate_degree_function(args, row),
444 "labels" => self.evaluate_labels_function(args, row),
445 "keys" => self.evaluate_keys_function(args, row),
446 "hasLabel" => self.evaluate_has_label_function(args, row),
447 "properties" => self.evaluate_properties_function(args, row),
448
449 "abs" | "sqrt" | "sin" | "cos" | "tan" | "log" | "exp" | "floor" | "ceil" | "round" =>
451 self.evaluate_math_function(name, args, row),
452
453 "length" | "substring" | "startsWith" | "endsWith" | "contains" |
455 "toLower" | "toUpper" | "trim" | "split" =>
456 self.evaluate_string_function(name, args, row),
457
458 "size" | "isEmpty" | "reverse" =>
460 self.evaluate_collection_function(name, args, row),
461
462 "toString" | "toInteger" | "toFloat" | "toBoolean" =>
464 self.evaluate_conversion_function(name, args, row),
465
466 "count" | "sum" | "avg" | "min" | "max" => {
468 Err(KotobaError::Execution(format!("Aggregate function {} should be handled in Group operator", name)))
469 }
470
471 _ => Err(KotobaError::Execution(format!("Unknown function: {}", name))),
472 }
473 }
474
475 fn evaluate_degree_function(&self, args: &[Expr], row: &Row) -> Result<Value> {
477 if args.len() != 1 {
478 return Err(KotobaError::Execution("degree() function requires exactly 1 argument".to_string()));
479 }
480
481 let vertex_id_str = match self.evaluate_expr(row, &args[0])? {
482 Value::String(s) => s,
483 _ => return Err(KotobaError::Execution("degree() argument must be a vertex ID string".to_string())),
484 };
485
486 Ok(Value::Int(1))
489 }
490
491 fn evaluate_labels_function(&self, args: &[Expr], row: &Row) -> Result<Value> {
493 if args.len() != 1 {
494 return Err(KotobaError::Execution("labels() function requires exactly 1 argument".to_string()));
495 }
496
497 let vertex_id_str = match self.evaluate_expr(row, &args[0])? {
498 Value::String(s) => s,
499 _ => return Err(KotobaError::Execution("labels() argument must be a vertex ID string".to_string())),
500 };
501
502 Ok(Value::String("[]".to_string()))
505 }
506
507 fn evaluate_keys_function(&self, args: &[Expr], row: &Row) -> Result<Value> {
509 if args.len() != 1 {
510 return Err(KotobaError::Execution("keys() function requires exactly 1 argument".to_string()));
511 }
512
513 let vertex_id_str = match self.evaluate_expr(row, &args[0])? {
514 Value::String(s) => s,
515 _ => return Err(KotobaError::Execution("keys() argument must be a vertex ID string".to_string())),
516 };
517
518 Ok(Value::String("[]".to_string()))
521 }
522
523 fn evaluate_has_label_function(&self, args: &[Expr], row: &Row) -> Result<Value> {
525 if args.len() != 2 {
526 return Err(KotobaError::Execution("hasLabel() function requires exactly 2 arguments".to_string()));
527 }
528
529 let vertex_id_str = match self.evaluate_expr(row, &args[0])? {
530 Value::String(s) => s,
531 _ => return Err(KotobaError::Execution("hasLabel() first argument must be a vertex ID string".to_string())),
532 };
533
534 let label = match self.evaluate_expr(row, &args[1])? {
535 Value::String(s) => s,
536 _ => return Err(KotobaError::Execution("hasLabel() second argument must be a label string".to_string())),
537 };
538
539 Ok(Value::Bool(false))
542 }
543
544 fn evaluate_properties_function(&self, args: &[Expr], row: &Row) -> Result<Value> {
546 if args.len() != 1 {
547 return Err(KotobaError::Execution("properties() function requires exactly 1 argument".to_string()));
548 }
549
550 let vertex_id_str = match self.evaluate_expr(row, &args[0])? {
551 Value::String(s) => s,
552 _ => return Err(KotobaError::Execution("properties() argument must be a vertex ID string".to_string())),
553 };
554
555 Ok(Value::String("{}".to_string()))
558 }
559
560 fn evaluate_math_function(&self, name: &str, args: &[Expr], row: &Row) -> Result<Value> {
562 if args.len() != 1 {
563 return Err(KotobaError::Execution(format!("{}() function requires exactly 1 argument", name)));
564 }
565
566 let value = self.evaluate_expr(row, &args[0])?;
567 let num = match value {
568 Value::Int(n) => n as f64,
569 Value::Integer(n) => n as f64,
570 _ => return Err(KotobaError::Execution(format!("{}() argument must be a number", name))),
571 };
572
573 let result = match name {
574 "abs" => num.abs(),
575 "sqrt" if num >= 0.0 => num.sqrt(),
576 "sqrt" => return Err(KotobaError::Execution("sqrt() of negative number".to_string())),
577 "sin" => num.sin(),
578 "cos" => num.cos(),
579 "tan" => num.tan(),
580 "log" if num > 0.0 => num.ln(),
581 "log" => return Err(KotobaError::Execution("log() of non-positive number".to_string())),
582 "exp" => num.exp(),
583 "floor" => num.floor(),
584 "ceil" => num.ceil(),
585 "round" => num.round(),
586 _ => return Err(KotobaError::Execution(format!("Unknown math function: {}", name))),
587 };
588
589 Ok(Value::Int(result as i64))
590 }
591
592 fn evaluate_string_function(&self, name: &str, args: &[Expr], row: &Row) -> Result<Value> {
594 match name {
595 "length" => {
596 if args.len() != 1 {
597 return Err(KotobaError::Execution("length() function requires exactly 1 argument".to_string()));
598 }
599 match self.evaluate_expr(row, &args[0])? {
600 Value::String(s) => Ok(Value::Int(s.len() as i64)),
601 _ => Err(KotobaError::Execution("length() argument must be a string".to_string())),
602 }
603 }
604 "substring" => {
605 if args.len() != 3 {
606 return Err(KotobaError::Execution("substring() function requires exactly 3 arguments".to_string()));
607 }
608 let s = match self.evaluate_expr(row, &args[0])? {
609 Value::String(s) => s,
610 _ => return Err(KotobaError::Execution("substring() first argument must be a string".to_string())),
611 };
612 let start = match self.evaluate_expr(row, &args[1])? {
613 Value::Int(n) => n as usize,
614 _ => return Err(KotobaError::Execution("substring() second argument must be an integer".to_string())),
615 };
616 let len = match self.evaluate_expr(row, &args[2])? {
617 Value::Int(n) => n as usize,
618 _ => return Err(KotobaError::Execution("substring() third argument must be an integer".to_string())),
619 };
620
621 if start >= s.len() {
622 Ok(Value::String(String::new()))
623 } else {
624 let end = (start + len).min(s.len());
625 Ok(Value::String(s[start..end].to_string()))
626 }
627 }
628 "startsWith" => {
629 if args.len() != 2 {
630 return Err(KotobaError::Execution("startsWith() function requires exactly 2 arguments".to_string()));
631 }
632 let s = match self.evaluate_expr(row, &args[0])? {
633 Value::String(s) => s,
634 _ => return Err(KotobaError::Execution("startsWith() first argument must be a string".to_string())),
635 };
636 let prefix = match self.evaluate_expr(row, &args[1])? {
637 Value::String(s) => s,
638 _ => return Err(KotobaError::Execution("startsWith() second argument must be a string".to_string())),
639 };
640 Ok(Value::Bool(s.starts_with(&prefix)))
641 }
642 "endsWith" => {
643 if args.len() != 2 {
644 return Err(KotobaError::Execution("endsWith() function requires exactly 2 arguments".to_string()));
645 }
646 let s = match self.evaluate_expr(row, &args[0])? {
647 Value::String(s) => s,
648 _ => return Err(KotobaError::Execution("endsWith() first argument must be a string".to_string())),
649 };
650 let suffix = match self.evaluate_expr(row, &args[1])? {
651 Value::String(s) => s,
652 _ => return Err(KotobaError::Execution("endsWith() second argument must be a string".to_string())),
653 };
654 Ok(Value::Bool(s.ends_with(&suffix)))
655 }
656 "contains" => {
657 if args.len() != 2 {
658 return Err(KotobaError::Execution("contains() function requires exactly 2 arguments".to_string()));
659 }
660 let s = match self.evaluate_expr(row, &args[0])? {
661 Value::String(s) => s,
662 _ => return Err(KotobaError::Execution("contains() first argument must be a string".to_string())),
663 };
664 let substr = match self.evaluate_expr(row, &args[1])? {
665 Value::String(s) => s,
666 _ => return Err(KotobaError::Execution("contains() second argument must be a string".to_string())),
667 };
668 Ok(Value::Bool(s.contains(&substr)))
669 }
670 "toLower" => {
671 if args.len() != 1 {
672 return Err(KotobaError::Execution("toLower() function requires exactly 1 argument".to_string()));
673 }
674 match self.evaluate_expr(row, &args[0])? {
675 Value::String(s) => Ok(Value::String(s.to_lowercase())),
676 _ => Err(KotobaError::Execution("toLower() argument must be a string".to_string())),
677 }
678 }
679 "toUpper" => {
680 if args.len() != 1 {
681 return Err(KotobaError::Execution("toUpper() function requires exactly 1 argument".to_string()));
682 }
683 match self.evaluate_expr(row, &args[0])? {
684 Value::String(s) => Ok(Value::String(s.to_uppercase())),
685 _ => Err(KotobaError::Execution("toUpper() argument must be a string".to_string())),
686 }
687 }
688 "trim" => {
689 if args.len() != 1 {
690 return Err(KotobaError::Execution("trim() function requires exactly 1 argument".to_string()));
691 }
692 match self.evaluate_expr(row, &args[0])? {
693 Value::String(s) => Ok(Value::String(s.trim().to_string())),
694 _ => Err(KotobaError::Execution("trim() argument must be a string".to_string())),
695 }
696 }
697 "split" => {
698 if args.len() != 2 {
699 return Err(KotobaError::Execution("split() function requires exactly 2 arguments".to_string()));
700 }
701 let s = match self.evaluate_expr(row, &args[0])? {
702 Value::String(s) => s,
703 _ => return Err(KotobaError::Execution("split() first argument must be a string".to_string())),
704 };
705 let sep = match self.evaluate_expr(row, &args[1])? {
706 Value::String(s) => s,
707 _ => return Err(KotobaError::Execution("split() second argument must be a string".to_string())),
708 };
709 Ok(Value::String(format!("[{}]", s.split(&sep).collect::<Vec<_>>().join(", "))))
711 }
712 _ => Err(KotobaError::Execution(format!("Unknown string function: {}", name))),
713 }
714 }
715
716 fn evaluate_collection_function(&self, name: &str, args: &[Expr], row: &Row) -> Result<Value> {
718 match name {
719 "size" => {
720 if args.len() != 1 {
721 return Err(KotobaError::Execution("size() function requires exactly 1 argument".to_string()));
722 }
723 Ok(Value::Int(0))
725 }
726 "isEmpty" => {
727 if args.len() != 1 {
728 return Err(KotobaError::Execution("isEmpty() function requires exactly 1 argument".to_string()));
729 }
730 Ok(Value::Bool(true))
732 }
733 "reverse" => {
734 if args.len() != 1 {
735 return Err(KotobaError::Execution("reverse() function requires exactly 1 argument".to_string()));
736 }
737 Ok(Value::String("[]".to_string()))
739 }
740 _ => Err(KotobaError::Execution(format!("Unknown collection function: {}", name))),
741 }
742 }
743
744 fn evaluate_conversion_function(&self, name: &str, args: &[Expr], row: &Row) -> Result<Value> {
746 if args.len() != 1 {
747 return Err(KotobaError::Execution(format!("{}() function requires exactly 1 argument", name)));
748 }
749
750 let value = self.evaluate_expr(row, &args[0])?;
751
752 match name {
753 "toString" => Ok(Value::String(self.value_to_string(&value))),
754 "toInteger" => {
755 match value {
756 Value::String(s) => {
757 match s.parse::<i64>() {
758 Ok(n) => Ok(Value::Int(n)),
759 Err(_) => Err(KotobaError::Execution(format!("Cannot convert '{}' to integer", s))),
760 }
761 }
762 Value::Int(n) => Ok(Value::Int(n)),
763 Value::Integer(n) => Ok(Value::Integer(n)),
764 _ => Err(KotobaError::Execution(format!("Cannot convert {:?} to integer", value))),
765 }
766 }
767 "toFloat" => {
768 match value {
769 Value::String(s) => {
770 match s.parse::<f64>() {
771 Ok(n) => Ok(Value::Int(n as i64)), Err(_) => Err(KotobaError::Execution(format!("Cannot convert '{}' to float", s))),
773 }
774 }
775 Value::Int(n) => Ok(Value::Int(n)), Value::Integer(n) => Ok(Value::Integer(n)), _ => Err(KotobaError::Execution(format!("Cannot convert {:?} to float", value))),
778 }
779 }
780 "toBoolean" => {
781 match value {
782 Value::String(s) => {
783 let b = match s.to_lowercase().as_str() {
784 "true" | "1" | "yes" => true,
785 "false" | "0" | "no" => false,
786 _ => return Err(KotobaError::Execution(format!("Cannot convert '{}' to boolean", s))),
787 };
788 Ok(Value::Bool(b))
789 }
790 Value::Int(n) => Ok(Value::Bool(n != 0)),
791 Value::Integer(n) => Ok(Value::Bool(n != 0)),
792 Value::Bool(b) => Ok(Value::Bool(b)),
793 _ => Err(KotobaError::Execution(format!("Cannot convert {:?} to boolean", value))),
794 }
795 }
796 _ => Err(KotobaError::Execution(format!("Unknown conversion function: {}", name))),
797 }
798 }
799
800 fn value_to_string(&self, value: &Value) -> String {
802 match value {
803 Value::Null => "null".to_string(),
804 Value::Bool(b) => b.to_string(),
805 Value::Int(n) => n.to_string(),
806 Value::Integer(n) => n.to_string(),
807 Value::String(s) => s.clone(),
808 }
809 }
810
811 fn join_condition_matches(&self, left: &Row, right: &Row, on: &[String]) -> bool {
813 for key in on {
814 let left_val = left.values.get(key);
815 let right_val = right.values.get(key);
816
817 match (left_val, right_val) {
818 (Some(a), Some(b)) => {
819 if !self.values_match(a, b) {
820 return false;
821 }
822 }
823 _ => return false,
824 }
825 }
826 true
827 }
828
829 fn extract_join_key(&self, row: &Row, on: &[String]) -> String {
831 let mut key_parts = Vec::new();
832 for col in on {
833 if let Some(value) = row.values.get(col) {
834 key_parts.push(format!("{:?}", value));
835 }
836 }
837 key_parts.join("|")
838 }
839
840 fn extract_group_key(&self, row: &Row, keys: &[String]) -> String {
842 let mut key_parts = Vec::new();
843 for key in keys {
844 if let Some(value) = row.values.get(key) {
845 key_parts.push(format!("{:?}", value));
846 }
847 }
848 key_parts.join("|")
849 }
850
851 fn compute_aggregation(&self, rows: &[Row], agg: &Aggregation) -> Value {
853 match agg.fn_.as_str() {
854 "count" => Value::Int(rows.len() as i64),
855 "sum" => {
856 let mut sum = 0i64;
857 for row in rows {
858 if let Some(Value::Int(val)) = row.values.get(&agg.args[0]) {
859 sum += val;
860 }
861 }
862 Value::Int(sum)
863 }
864 "avg" => {
865 if rows.is_empty() {
866 Value::Int(0)
867 } else {
868 let mut sum = 0i64;
869 let mut count = 0;
870 for row in rows {
871 if let Some(Value::Int(val)) = row.values.get(&agg.args[0]) {
872 sum += val;
873 count += 1;
874 }
875 }
876 if count > 0 {
877 Value::Int(sum / count)
878 } else {
879 Value::Int(0)
880 }
881 }
882 }
883 _ => Value::Null,
884 }
885 }
886}