Skip to main content

mdql_core/
query_engine.rs

1//! Execute parsed queries over in-memory rows.
2
3use std::cmp::Ordering;
4use std::collections::HashMap;
5
6use regex::Regex;
7
8use crate::errors::MdqlError;
9use crate::model::{Row, Value};
10use crate::query_parser::*;
11use crate::schema::Schema;
12
13pub fn execute_query(
14    query: &SelectQuery,
15    rows: &[Row],
16    _schema: &Schema,
17) -> crate::errors::Result<(Vec<Row>, Vec<String>)> {
18    if let Some(ref sub) = query.subquery {
19        let (sub_rows, _sub_cols) = execute_inner(sub, rows, None)?;
20        return execute_inner(query, &sub_rows, None);
21    }
22    execute_inner(query, rows, None)
23}
24
25#[allow(dead_code)]
26pub(crate) fn execute_query_indexed(
27    query: &SelectQuery,
28    rows: &[Row],
29    schema: &Schema,
30    index: Option<&crate::index::TableIndex>,
31    searcher: Option<&crate::search::TableSearcher>,
32) -> crate::errors::Result<(Vec<Row>, Vec<String>)> {
33    // Pre-compute FTS results for any LIKE clauses on section columns
34    let fts_results = if let (Some(ref wc), Some(searcher)) = (&query.where_clause, searcher) {
35        collect_fts_results(wc, schema, searcher)
36    } else {
37        HashMap::new()
38    };
39
40    execute_with_fts(query, rows, index, &fts_results)
41}
42
43#[allow(dead_code)]
44fn collect_fts_results(
45    clause: &WhereClause,
46    schema: &Schema,
47    searcher: &crate::search::TableSearcher,
48) -> HashMap<(String, String), std::collections::HashSet<String>> {
49    let mut results = HashMap::new();
50    collect_fts_results_inner(clause, schema, searcher, &mut results);
51    results
52}
53
54#[allow(dead_code)]
55fn collect_fts_results_inner(
56    clause: &WhereClause,
57    schema: &Schema,
58    searcher: &crate::search::TableSearcher,
59    results: &mut HashMap<(String, String), std::collections::HashSet<String>>,
60) {
61    match clause {
62        WhereClause::Comparison(cmp) => {
63            if (cmp.op == CmpOp::Like || cmp.op == CmpOp::NotLike) && schema.sections.contains_key(&cmp.column) {
64                if let Some(SqlValue::String(pattern)) = &cmp.value {
65                    // Strip SQL wildcards for Tantivy query
66                    let search_term = pattern.replace('%', " ").replace('_', " ").trim().to_string();
67                    if !search_term.is_empty() {
68                        if let Ok(paths) = searcher.search(&search_term, Some(&cmp.column)) {
69                            let key = (cmp.column.clone(), pattern.clone());
70                            results.insert(key, paths.into_iter().collect());
71                        }
72                    }
73                }
74            }
75        }
76        WhereClause::BoolOp(bop) => {
77            collect_fts_results_inner(&bop.left, schema, searcher, results);
78            collect_fts_results_inner(&bop.right, schema, searcher, results);
79        }
80    }
81}
82
83type FtsResults = HashMap<(String, String), std::collections::HashSet<String>>;
84
85fn execute_with_fts(
86    query: &SelectQuery,
87    rows: &[Row],
88    index: Option<&crate::index::TableIndex>,
89    fts: &FtsResults,
90) -> crate::errors::Result<(Vec<Row>, Vec<String>)> {
91    // Determine available columns
92    let mut all_columns: Vec<String> = Vec::new();
93    let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
94    for r in rows {
95        for k in r.keys() {
96            if seen.insert(k.clone()) {
97                all_columns.push(k.clone());
98            }
99        }
100    }
101
102    // Check if query has aggregates
103    let has_aggregates = match &query.columns {
104        ColumnList::Named(exprs) => exprs.iter().any(|e| e.is_aggregate()),
105        _ => false,
106    };
107
108    // Output column names
109    let columns: Vec<String> = match &query.columns {
110        ColumnList::All => all_columns,
111        ColumnList::Named(exprs) => exprs.iter().map(|e| e.output_name()).collect(),
112    };
113
114    // Filter — try index first, fall back to full scan
115    let filtered: Vec<Row> = if let Some(ref wc) = query.where_clause {
116        let candidate_paths = index.and_then(|idx| try_index_filter(wc, idx));
117        if let Some(paths) = candidate_paths {
118            rows.iter()
119                .filter(|r| {
120                    r.get("path")
121                        .and_then(|v| v.as_str())
122                        .map_or(false, |p| paths.contains(p))
123                })
124                .filter(|r| evaluate_with_fts(wc, r, fts))
125                .cloned()
126                .collect()
127        } else {
128            rows.iter()
129                .filter(|r| evaluate_with_fts(wc, r, fts))
130                .cloned()
131                .collect()
132        }
133    } else {
134        rows.to_vec()
135    };
136
137    // Aggregate if needed
138    let mut result = if has_aggregates || query.group_by.is_some() {
139        let exprs = match &query.columns {
140            ColumnList::Named(exprs) => exprs.clone(),
141            _ => return Err(MdqlError::QueryExecution(
142                "SELECT * with GROUP BY is not supported".into(),
143            )),
144        };
145        let group_keys = query.group_by.as_deref().unwrap_or(&[]);
146        aggregate_rows(&filtered, &exprs, group_keys)?
147    } else {
148        filtered
149    };
150
151    // HAVING filter — apply after aggregation
152    if let Some(ref having) = query.having {
153        result.retain(|row| evaluate(having, row));
154    }
155
156    // Window functions — compute after aggregation/HAVING, before ORDER BY
157    let has_windows = match &query.columns {
158        ColumnList::Named(exprs) => exprs.iter().any(|e| match e {
159            SelectExpr::Expr { expr, .. } => expr.contains_window(),
160            _ => false,
161        }),
162        _ => false,
163    };
164    if has_windows {
165        if let ColumnList::Named(ref exprs) = query.columns {
166            compute_windows(&mut result, exprs)?;
167        }
168    }
169
170    // Sort — resolve ORDER BY aliases against SELECT list
171    if let Some(ref order_by) = query.order_by {
172        let resolved = resolve_order_aliases(order_by, &query.columns);
173        sort_rows(&mut result, &resolved);
174    }
175
176    // Limit
177    if let Some(limit) = query.limit {
178        result.truncate(limit as usize);
179    }
180
181    // Project — evaluate expressions and strip to requested columns
182    if !matches!(query.columns, ColumnList::All) {
183        let named_exprs = match &query.columns {
184            ColumnList::Named(exprs) => exprs,
185            _ => unreachable!(),
186        };
187
188        // Compute expression columns first, then retain only requested columns.
189        // Skip if aggregation already computed them (re-evaluating would lose
190        // columns that only existed in pre-aggregation rows, e.g. dict fields).
191        let has_expr_cols = named_exprs.iter().any(|e| matches!(e, SelectExpr::Expr { .. }));
192        let already_aggregated = has_aggregates || query.group_by.is_some();
193        if has_expr_cols && !already_aggregated {
194            for row in &mut result {
195                for expr in named_exprs {
196                    if let SelectExpr::Expr { expr: e, alias } = expr {
197                        if e.contains_window() { continue; }
198                        let name = alias.clone().unwrap_or_else(|| e.display_name());
199                        let val = evaluate_expr(e, row);
200                        row.insert(name, val);
201                    }
202                }
203            }
204        }
205
206        let col_set: std::collections::HashSet<&str> =
207            columns.iter().map(|s| s.as_str()).collect();
208        for row in &mut result {
209            row.retain(|k, _| col_set.contains(k.as_str()));
210        }
211    }
212
213    Ok((result, columns))
214}
215
216fn aggregate_rows(
217    rows: &[Row],
218    exprs: &[SelectExpr],
219    group_keys: &[String],
220) -> crate::errors::Result<Vec<Row>> {
221    // Group rows by group_keys
222    let mut groups: Vec<(Vec<Value>, Vec<&Row>)> = Vec::new();
223    let mut key_index: HashMap<Vec<String>, usize> = HashMap::new();
224
225    if group_keys.is_empty() {
226        // No GROUP BY — all rows are one group
227        let all_refs: Vec<&Row> = rows.iter().collect();
228        groups.push((vec![], all_refs));
229    } else {
230        for row in rows {
231            let key: Vec<String> = group_keys
232                .iter()
233                .map(|k| {
234                    row.get(k)
235                        .map(|v| v.to_display_string())
236                        .unwrap_or_default()
237                })
238                .collect();
239            let key_vals: Vec<Value> = group_keys
240                .iter()
241                .map(|k| row.get(k).cloned().unwrap_or(Value::Null))
242                .collect();
243            if let Some(&idx) = key_index.get(&key) {
244                groups[idx].1.push(row);
245            } else {
246                let idx = groups.len();
247                key_index.insert(key, idx);
248                groups.push((key_vals, vec![row]));
249            }
250        }
251    }
252
253    // Compute aggregates per group
254    let mut result = Vec::new();
255    for (key_vals, group_rows) in &groups {
256        let mut out = Row::new();
257
258        // Fill in group key values
259        for (i, k) in group_keys.iter().enumerate() {
260            out.insert(k.clone(), key_vals[i].clone());
261        }
262
263        // Compute each expression
264        for expr in exprs {
265            match expr {
266                SelectExpr::Column(name) => {
267                    // Already filled if it's a group key; otherwise take first row's value
268                    if !out.contains_key(name) {
269                        if let Some(first) = group_rows.first() {
270                            out.insert(
271                                name.clone(),
272                                first.get(name).cloned().unwrap_or(Value::Null),
273                            );
274                        }
275                    }
276                }
277                SelectExpr::Aggregate { func, arg, arg_expr, alias } => {
278                    let out_name = alias
279                        .clone()
280                        .unwrap_or_else(|| expr.output_name());
281                    let val = compute_aggregate(func, arg, arg_expr.as_ref(), group_rows);
282                    out.insert(out_name, val);
283                }
284                SelectExpr::Expr { expr: e, alias } => {
285                    let out_name = alias.clone().unwrap_or_else(|| e.display_name());
286                    if e.contains_aggregate() {
287                        let val = evaluate_agg_expr(e, group_rows);
288                        out.insert(out_name, val);
289                    } else if let Some(first) = group_rows.first() {
290                        let val = evaluate_expr(e, first);
291                        out.insert(out_name, val);
292                    }
293                }
294            }
295        }
296
297        result.push(out);
298    }
299
300    Ok(result)
301}
302
303/// Resolve a per-row value for an aggregate argument.
304/// If `arg_expr` is set, evaluate it; otherwise look up `arg` as a column name.
305fn resolve_agg_value<'a>(arg: &str, arg_expr: Option<&Expr>, row: &'a Row) -> Value {
306    if let Some(expr) = arg_expr {
307        evaluate_expr(expr, row)
308    } else {
309        row.get(arg).cloned().unwrap_or(Value::Null)
310    }
311}
312
313fn compute_aggregate(func: &AggFunc, arg: &str, arg_expr: Option<&Expr>, rows: &[&Row]) -> Value {
314    match func {
315        AggFunc::Count => {
316            if arg == "*" && arg_expr.is_none() {
317                Value::Int(rows.len() as i64)
318            } else {
319                let count = rows
320                    .iter()
321                    .filter(|r| {
322                        let v = resolve_agg_value(arg, arg_expr, r);
323                        !v.is_null()
324                    })
325                    .count();
326                Value::Int(count as i64)
327            }
328        }
329        AggFunc::Sum => {
330            let mut total = 0.0f64;
331            let mut has_any = false;
332            for r in rows {
333                let v = resolve_agg_value(arg, arg_expr, r);
334                match v {
335                    Value::Int(n) => { total += n as f64; has_any = true; }
336                    Value::Float(f) => { total += f; has_any = true; }
337                    _ => {}
338                }
339            }
340            if has_any { Value::Float(total) } else { Value::Null }
341        }
342        AggFunc::Avg => {
343            let mut total = 0.0f64;
344            let mut count = 0usize;
345            for r in rows {
346                let v = resolve_agg_value(arg, arg_expr, r);
347                match v {
348                    Value::Int(n) => { total += n as f64; count += 1; }
349                    Value::Float(f) => { total += f; count += 1; }
350                    _ => {}
351                }
352            }
353            if count > 0 { Value::Float(total / count as f64) } else { Value::Null }
354        }
355        AggFunc::Min => {
356            let mut min_val: Option<Value> = None;
357            for r in rows {
358                let v = resolve_agg_value(arg, arg_expr, r);
359                if v.is_null() { continue; }
360                min_val = Some(match min_val {
361                    None => v,
362                    Some(ref current) => {
363                        if v.partial_cmp(current) == Some(std::cmp::Ordering::Less) {
364                            v
365                        } else {
366                            current.clone()
367                        }
368                    }
369                });
370            }
371            min_val.unwrap_or(Value::Null)
372        }
373        AggFunc::Max => {
374            let mut max_val: Option<Value> = None;
375            for r in rows {
376                let v = resolve_agg_value(arg, arg_expr, r);
377                if v.is_null() { continue; }
378                max_val = Some(match max_val {
379                    None => v,
380                    Some(ref current) => {
381                        if v.partial_cmp(current) == Some(std::cmp::Ordering::Greater) {
382                            v
383                        } else {
384                            current.clone()
385                        }
386                    }
387                });
388            }
389            max_val.unwrap_or(Value::Null)
390        }
391    }
392}
393
394fn compute_windows(rows: &mut Vec<Row>, select_exprs: &[SelectExpr]) -> crate::errors::Result<()> {
395    for se in select_exprs {
396        if let SelectExpr::Expr { expr, alias } = se {
397            if let Expr::Window { func, args, over } = expr {
398                let col_name = alias.clone().unwrap_or_else(|| expr.display_name());
399                compute_single_window(rows, func, args, over, &col_name)?;
400            }
401        }
402    }
403    Ok(())
404}
405
406fn compute_single_window(
407    rows: &mut Vec<Row>,
408    func: &WindowFunc,
409    args: &[Expr],
410    over: &WindowSpec,
411    col_name: &str,
412) -> crate::errors::Result<()> {
413    let mut partitions: Vec<Vec<usize>> = Vec::new();
414    let mut partition_map: HashMap<Vec<String>, usize> = HashMap::new();
415
416    for (i, row) in rows.iter().enumerate() {
417        let key: Vec<String> = over.partition_by.iter()
418            .map(|col| row.get(col).map(|v| v.to_display_string()).unwrap_or_default())
419            .collect();
420        if let Some(&idx) = partition_map.get(&key) {
421            partitions[idx].push(i);
422        } else {
423            let idx = partitions.len();
424            partition_map.insert(key, idx);
425            partitions.push(vec![i]);
426        }
427    }
428
429    for partition in &mut partitions {
430        if !over.order_by.is_empty() {
431            partition.sort_by(|&a, &b| {
432                for spec in &over.order_by {
433                    let (va, vb) = if let Some(ref expr) = spec.expr {
434                        (evaluate_expr(expr, &rows[a]), evaluate_expr(expr, &rows[b]))
435                    } else {
436                        (
437                            rows[a].get(&spec.column).cloned().unwrap_or(Value::Null),
438                            rows[b].get(&spec.column).cloned().unwrap_or(Value::Null),
439                        )
440                    };
441                    let ordering = match (&va, &vb) {
442                        (Value::Null, Value::Null) => Ordering::Equal,
443                        (Value::Null, _) => Ordering::Greater,
444                        (_, Value::Null) => Ordering::Less,
445                        (a_val, b_val) => compare_model_values(a_val, b_val).unwrap_or(Ordering::Equal),
446                    };
447                    let ordering = if spec.descending { ordering.reverse() } else { ordering };
448                    if ordering != Ordering::Equal {
449                        return ordering;
450                    }
451                }
452                Ordering::Equal
453            });
454        }
455    }
456
457    let mut values: Vec<(usize, Value)> = Vec::new();
458
459    for partition in &partitions {
460        match func {
461            WindowFunc::RowNumber => {
462                for (i, &row_idx) in partition.iter().enumerate() {
463                    values.push((row_idx, Value::Int((i + 1) as i64)));
464                }
465            }
466            WindowFunc::Rank => {
467                let mut rank = 1usize;
468                for (i, &row_idx) in partition.iter().enumerate() {
469                    if i > 0 {
470                        let prev_idx = partition[i - 1];
471                        let same = over.order_by.iter().all(|spec| {
472                            let va = if let Some(ref expr) = spec.expr {
473                                evaluate_expr(expr, &rows[prev_idx])
474                            } else {
475                                rows[prev_idx].get(&spec.column).cloned().unwrap_or(Value::Null)
476                            };
477                            let vb = if let Some(ref expr) = spec.expr {
478                                evaluate_expr(expr, &rows[row_idx])
479                            } else {
480                                rows[row_idx].get(&spec.column).cloned().unwrap_or(Value::Null)
481                            };
482                            va == vb
483                        });
484                        if !same {
485                            rank = i + 1;
486                        }
487                    }
488                    values.push((row_idx, Value::Int(rank as i64)));
489                }
490            }
491            WindowFunc::DenseRank => {
492                let mut rank = 1usize;
493                for (i, &row_idx) in partition.iter().enumerate() {
494                    if i > 0 {
495                        let prev_idx = partition[i - 1];
496                        let same = over.order_by.iter().all(|spec| {
497                            let va = if let Some(ref expr) = spec.expr {
498                                evaluate_expr(expr, &rows[prev_idx])
499                            } else {
500                                rows[prev_idx].get(&spec.column).cloned().unwrap_or(Value::Null)
501                            };
502                            let vb = if let Some(ref expr) = spec.expr {
503                                evaluate_expr(expr, &rows[row_idx])
504                            } else {
505                                rows[row_idx].get(&spec.column).cloned().unwrap_or(Value::Null)
506                            };
507                            va == vb
508                        });
509                        if !same {
510                            rank += 1;
511                        }
512                    }
513                    values.push((row_idx, Value::Int(rank as i64)));
514                }
515            }
516            WindowFunc::Lag => {
517                let offset = if args.len() > 1 {
518                    if let Expr::Literal(SqlValue::Int(n)) = &args[1] { *n as usize } else { 1 }
519                } else {
520                    1
521                };
522                for (i, &row_idx) in partition.iter().enumerate() {
523                    let val = if i >= offset && !args.is_empty() {
524                        evaluate_expr(&args[0], &rows[partition[i - offset]])
525                    } else {
526                        Value::Null
527                    };
528                    values.push((row_idx, val));
529                }
530            }
531            WindowFunc::Lead => {
532                let offset = if args.len() > 1 {
533                    if let Expr::Literal(SqlValue::Int(n)) = &args[1] { *n as usize } else { 1 }
534                } else {
535                    1
536                };
537                for (i, &row_idx) in partition.iter().enumerate() {
538                    let val = if i + offset < partition.len() && !args.is_empty() {
539                        evaluate_expr(&args[0], &rows[partition[i + offset]])
540                    } else {
541                        Value::Null
542                    };
543                    values.push((row_idx, val));
544                }
545            }
546            WindowFunc::Agg(agg_func) => {
547                let partition_rows: Vec<&Row> = partition.iter().map(|&i| &rows[i]).collect();
548                let (arg_name, arg_expr_opt) = if args.is_empty() {
549                    ("*".to_string(), None)
550                } else {
551                    (args[0].display_name(), Some(&args[0]))
552                };
553                let agg_val = compute_aggregate(agg_func, &arg_name, arg_expr_opt, &partition_rows);
554                for &row_idx in partition {
555                    values.push((row_idx, agg_val.clone()));
556                }
557            }
558        }
559    }
560
561    for (row_idx, val) in values {
562        rows[row_idx].insert(col_name.to_string(), val);
563    }
564
565    Ok(())
566}
567
568fn evaluate_with_fts(clause: &WhereClause, row: &Row, fts: &FtsResults) -> bool {
569    match clause {
570        WhereClause::BoolOp(bop) => {
571            let left = evaluate_with_fts(&bop.left, row, fts);
572            match bop.op {
573                BoolOpKind::And => left && evaluate_with_fts(&bop.right, row, fts),
574                BoolOpKind::Or => left || evaluate_with_fts(&bop.right, row, fts),
575            }
576        }
577        WhereClause::Comparison(cmp) => {
578            // Check if we have FTS results for this comparison
579            if cmp.op == CmpOp::Like || cmp.op == CmpOp::NotLike {
580                if let Some(SqlValue::String(pattern)) = &cmp.value {
581                    let key = (cmp.column.clone(), pattern.clone());
582                    if let Some(matching_paths) = fts.get(&key) {
583                        let row_path = row.get("path").and_then(|v| v.as_str()).unwrap_or("");
584                        let matched = matching_paths.contains(row_path);
585                        return if cmp.op == CmpOp::Like { matched } else { !matched };
586                    }
587                }
588            }
589            evaluate_comparison(cmp, row)
590        }
591    }
592}
593
594pub use crate::query_join::execute_join_query;
595
596pub(crate) fn execute_inner(
597    query: &SelectQuery,
598    rows: &[Row],
599    index: Option<&crate::index::TableIndex>,
600) -> crate::errors::Result<(Vec<Row>, Vec<String>)> {
601    let empty_fts = HashMap::new();
602    execute_with_fts(query, rows, index, &empty_fts)
603}
604
605pub fn evaluate(clause: &WhereClause, row: &Row) -> bool {
606    match clause {
607        WhereClause::BoolOp(bop) => {
608            let left = evaluate(&bop.left, row);
609            match bop.op {
610                BoolOpKind::And => left && evaluate(&bop.right, row),
611                BoolOpKind::Or => left || evaluate(&bop.right, row),
612            }
613        }
614        WhereClause::Comparison(cmp) => evaluate_comparison(cmp, row),
615    }
616}
617
618/// Evaluate an Expr against a row, returning a Value.
619pub(crate) fn evaluate_expr(expr: &Expr, row: &Row) -> Value {
620    match expr {
621        Expr::Literal(SqlValue::Int(n)) => Value::Int(*n),
622        Expr::Literal(SqlValue::Float(f)) => Value::Float(*f),
623        Expr::Literal(SqlValue::String(s)) => Value::String(s.clone()),
624        Expr::Literal(SqlValue::Null) => Value::Null,
625        Expr::Literal(SqlValue::List(_)) => Value::Null,
626        Expr::Column(name) => {
627            if let Some(val) = row.get(name) {
628                return val.clone();
629            }
630            // Try all possible dot splits for dict access (e.g. "s.params.key")
631            for (i, _) in name.match_indices('.') {
632                let dict_col = &name[..i];
633                let dict_key = &name[i + 1..];
634                if let Some(Value::Dict(map)) = row.get(dict_col) {
635                    return map.get(dict_key).cloned().unwrap_or(Value::Null);
636                }
637            }
638            Value::Null
639        }
640        Expr::UnaryMinus(inner) => {
641            match evaluate_expr(inner, row) {
642                Value::Int(n) => Value::Int(-n),
643                Value::Float(f) => Value::Float(-f),
644                Value::Null => Value::Null,
645                _ => Value::Null, // non-numeric → NULL
646            }
647        }
648        Expr::BinaryOp { left, op, right } => {
649            let lv = evaluate_expr(left, row);
650            let rv = evaluate_expr(right, row);
651
652            // NULL propagation: any NULL operand → NULL
653            if lv.is_null() || rv.is_null() {
654                return Value::Null;
655            }
656
657            // Extract numeric values with int→float coercion
658            match (&lv, &rv) {
659                (Value::Int(a), Value::Int(b)) => {
660                    match op {
661                        ArithOp::Add => Value::Int(a.wrapping_add(*b)),
662                        ArithOp::Sub => Value::Int(a.wrapping_sub(*b)),
663                        ArithOp::Mul => Value::Int(a.wrapping_mul(*b)),
664                        ArithOp::Div => {
665                            if *b == 0 { Value::Null } else { Value::Int(a / b) }
666                        }
667                        ArithOp::Mod => {
668                            if *b == 0 { Value::Null } else { Value::Int(a % b) }
669                        }
670                    }
671                }
672                _ => {
673                    // Coerce to float
674                    let a = match &lv {
675                        Value::Int(n) => *n as f64,
676                        Value::Float(f) => *f,
677                        _ => return Value::Null,
678                    };
679                    let b = match &rv {
680                        Value::Int(n) => *n as f64,
681                        Value::Float(f) => *f,
682                        _ => return Value::Null,
683                    };
684                    match op {
685                        ArithOp::Add => Value::Float(a + b),
686                        ArithOp::Sub => Value::Float(a - b),
687                        ArithOp::Mul => Value::Float(a * b),
688                        ArithOp::Div => {
689                            if b == 0.0 { Value::Null } else { Value::Float(a / b) }
690                        }
691                        ArithOp::Mod => {
692                            if b == 0.0 { Value::Null } else { Value::Float(a % b) }
693                        }
694                    }
695                }
696            }
697        }
698        Expr::Case { whens, else_expr } => {
699            for (condition, result) in whens {
700                if evaluate(condition, row) {
701                    return evaluate_expr(result, row);
702                }
703            }
704            match else_expr {
705                Some(e) => evaluate_expr(e, row),
706                None => Value::Null,
707            }
708        }
709        Expr::CurrentDate => {
710            Value::Date(chrono::Local::now().naive_local().date())
711        }
712        Expr::CurrentTimestamp => {
713            Value::DateTime(chrono::Local::now().naive_local())
714        }
715        Expr::DateAdd { date, days } => {
716            let date_val = evaluate_expr(date, row);
717            let days_val = evaluate_expr(days, row);
718            let n = match &days_val {
719                Value::Int(n) => *n,
720                Value::Float(f) => *f as i64,
721                _ => return Value::Null,
722            };
723            let duration = chrono::Duration::days(n);
724            match date_val {
725                Value::Date(d) => {
726                    match d.checked_add_signed(duration) {
727                        Some(result) => Value::Date(result),
728                        None => Value::Null,
729                    }
730                }
731                Value::DateTime(dt) => {
732                    match dt.checked_add_signed(duration) {
733                        Some(result) => Value::DateTime(result),
734                        None => Value::Null,
735                    }
736                }
737                _ => Value::Null,
738            }
739        }
740        Expr::DateDiff { left, right } => {
741            let lv = evaluate_expr(left, row);
742            let rv = evaluate_expr(right, row);
743            let left_date = match &lv {
744                Value::Date(d) => d.and_hms_opt(0, 0, 0).unwrap(),
745                Value::DateTime(dt) => *dt,
746                _ => return Value::Null,
747            };
748            let right_date = match &rv {
749                Value::Date(d) => d.and_hms_opt(0, 0, 0).unwrap(),
750                Value::DateTime(dt) => *dt,
751                _ => return Value::Null,
752            };
753            Value::Int((left_date - right_date).num_days())
754        }
755        Expr::Aggregate { func, arg, .. } => {
756            // Post-aggregation: look up the pre-computed column name
757            let func_name = match func {
758                AggFunc::Count => "COUNT",
759                AggFunc::Sum => "SUM",
760                AggFunc::Avg => "AVG",
761                AggFunc::Min => "MIN",
762                AggFunc::Max => "MAX",
763            };
764            let col = format!("{}({})", func_name, arg);
765            row.get(&col).cloned().unwrap_or(Value::Null)
766        }
767        Expr::Subquery(_) => Value::Null,
768        Expr::Window { .. } => {
769            let display = expr.display_name();
770            row.get(&display).cloned().unwrap_or(Value::Null)
771        }
772    }
773}
774
775fn evaluate_agg_expr(expr: &Expr, group_rows: &[&Row]) -> Value {
776    match expr {
777        Expr::Aggregate { func, arg, arg_expr } => {
778            compute_aggregate(func, arg, arg_expr.as_deref(), group_rows)
779        }
780        Expr::BinaryOp { left, op, right } => {
781            let lv = evaluate_agg_expr(left, group_rows);
782            let rv = evaluate_agg_expr(right, group_rows);
783            apply_arith_op(op, &lv, &rv)
784        }
785        Expr::UnaryMinus(inner) => {
786            match evaluate_agg_expr(inner, group_rows) {
787                Value::Int(n) => Value::Int(-n),
788                Value::Float(f) => Value::Float(-f),
789                _ => Value::Null,
790            }
791        }
792        other => {
793            if let Some(first) = group_rows.first() {
794                evaluate_expr(other, first)
795            } else {
796                Value::Null
797            }
798        }
799    }
800}
801
802fn apply_arith_op(op: &ArithOp, lv: &Value, rv: &Value) -> Value {
803    if lv.is_null() || rv.is_null() {
804        return Value::Null;
805    }
806    match (lv, rv) {
807        (Value::Int(a), Value::Int(b)) => match op {
808            ArithOp::Add => Value::Int(a.wrapping_add(*b)),
809            ArithOp::Sub => Value::Int(a.wrapping_sub(*b)),
810            ArithOp::Mul => Value::Int(a.wrapping_mul(*b)),
811            ArithOp::Div => if *b == 0 { Value::Null } else { Value::Int(a / b) },
812            ArithOp::Mod => if *b == 0 { Value::Null } else { Value::Int(a % b) },
813        },
814        _ => {
815            let a = match lv {
816                Value::Int(n) => *n as f64,
817                Value::Float(f) => *f,
818                _ => return Value::Null,
819            };
820            let b = match rv {
821                Value::Int(n) => *n as f64,
822                Value::Float(f) => *f,
823                _ => return Value::Null,
824            };
825            match op {
826                ArithOp::Add => Value::Float(a + b),
827                ArithOp::Sub => Value::Float(a - b),
828                ArithOp::Mul => Value::Float(a * b),
829                ArithOp::Div => if b == 0.0 { Value::Null } else { Value::Float(a / b) },
830                ArithOp::Mod => if b == 0.0 { Value::Null } else { Value::Float(a % b) },
831            }
832        }
833    }
834}
835
836fn evaluate_comparison(cmp: &Comparison, row: &Row) -> bool {
837    // If we have expression-based comparison (new path), use it for standard ops
838    if let (Some(left_expr), Some(right_expr)) = (&cmp.left_expr, &cmp.right_expr) {
839        if matches!(cmp.op, CmpOp::Eq | CmpOp::Ne | CmpOp::Lt | CmpOp::Gt | CmpOp::Le | CmpOp::Ge) {
840            let left_val = evaluate_expr(left_expr, row);
841            let right_val = evaluate_expr(right_expr, row);
842
843            // NULL comparison: always false (except IS NULL handled below)
844            if left_val.is_null() || right_val.is_null() {
845                return false;
846            }
847
848            // Coerce for comparison: if types differ, try int→float
849            let ord = compare_model_values(&left_val, &right_val);
850
851            return match cmp.op {
852                CmpOp::Eq => ord == Some(Ordering::Equal),
853                CmpOp::Ne => ord != Some(Ordering::Equal),
854                CmpOp::Lt => ord == Some(Ordering::Less),
855                CmpOp::Gt => ord == Some(Ordering::Greater),
856                CmpOp::Le => matches!(ord, Some(Ordering::Less | Ordering::Equal)),
857                CmpOp::Ge => matches!(ord, Some(Ordering::Greater | Ordering::Equal)),
858                _ => false,
859            };
860        }
861    }
862
863    // Fall back to legacy column-based comparison for IS NULL, IN, LIKE, etc.
864    let actual = row.get(&cmp.column);
865
866    if cmp.op == CmpOp::IsNull {
867        return actual.map_or(true, |v| v.is_null());
868    }
869    if cmp.op == CmpOp::IsNotNull {
870        return actual.map_or(false, |v| !v.is_null());
871    }
872
873    let actual = match actual {
874        Some(v) if !v.is_null() => v,
875        _ => return false,
876    };
877
878    let expected = match &cmp.value {
879        Some(v) => v,
880        None => return false,
881    };
882
883    match cmp.op {
884        CmpOp::Eq => eq_match(actual, expected),
885        CmpOp::Ne => !eq_match(actual, expected),
886        CmpOp::Lt => compare_values(actual, expected) == Some(Ordering::Less),
887        CmpOp::Gt => compare_values(actual, expected) == Some(Ordering::Greater),
888        CmpOp::Le => matches!(compare_values(actual, expected), Some(Ordering::Less | Ordering::Equal)),
889        CmpOp::Ge => matches!(compare_values(actual, expected), Some(Ordering::Greater | Ordering::Equal)),
890        CmpOp::Like => like_match(actual, expected),
891        CmpOp::NotLike => !like_match(actual, expected),
892        CmpOp::In => {
893            if let SqlValue::List(items) = expected {
894                items.iter().any(|v| eq_match(actual, v))
895            } else {
896                eq_match(actual, expected)
897            }
898        }
899        CmpOp::IsNull | CmpOp::IsNotNull => unreachable!(),
900    }
901}
902
903/// Compare two model::Value instances, with int↔float coercion.
904fn compare_model_values(a: &Value, b: &Value) -> Option<Ordering> {
905    match (a, b) {
906        (Value::Int(x), Value::Float(y)) => (*x as f64).partial_cmp(y),
907        (Value::Float(x), Value::Int(y)) => x.partial_cmp(&(*y as f64)),
908        _ => a.partial_cmp(b),
909    }
910}
911
912fn coerce_sql_to_value(sql_val: &SqlValue, target: &Value) -> Value {
913    match sql_val {
914        SqlValue::Null => Value::Null,
915        SqlValue::String(s) => {
916            match target {
917                Value::Int(_) => s.parse::<i64>().map(Value::Int).unwrap_or(Value::String(s.clone())),
918                Value::Float(_) => s.parse::<f64>().map(Value::Float).unwrap_or(Value::String(s.clone())),
919                Value::Date(_) => {
920                    chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d")
921                        .map(Value::Date)
922                        .unwrap_or(Value::String(s.clone()))
923                }
924                Value::DateTime(_) => {
925                    chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S")
926                        .or_else(|_| chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f"))
927                        .map(Value::DateTime)
928                        .unwrap_or(Value::String(s.clone()))
929                }
930                _ => Value::String(s.clone()),
931            }
932        }
933        SqlValue::Int(n) => {
934            match target {
935                Value::Float(_) => Value::Float(*n as f64),
936                _ => Value::Int(*n),
937            }
938        }
939        SqlValue::Float(f) => Value::Float(*f),
940        SqlValue::List(_) => Value::Null, // Lists handled separately
941    }
942}
943
944fn eq_match(actual: &Value, expected: &SqlValue) -> bool {
945    // Special handling for lists (e.g., categories)
946    if let Value::List(items) = actual {
947        if let SqlValue::String(s) = expected {
948            return items.contains(s);
949        }
950    }
951
952    let coerced = coerce_sql_to_value(expected, actual);
953    actual == &coerced
954}
955
956fn like_match(actual: &Value, pattern: &SqlValue) -> bool {
957    let pattern_str = match pattern {
958        SqlValue::String(s) => s,
959        _ => return false,
960    };
961
962    // Convert SQL LIKE to regex
963    let mut regex_str = String::from("(?is)^");
964    for ch in pattern_str.chars() {
965        match ch {
966            '%' => regex_str.push_str(".*"),
967            '_' => regex_str.push('.'),
968            c => {
969                if regex::escape(&c.to_string()) != c.to_string() {
970                    regex_str.push_str(&regex::escape(&c.to_string()));
971                } else {
972                    regex_str.push(c);
973                }
974            }
975        }
976    }
977    regex_str.push('$');
978
979    let re = match Regex::new(&regex_str) {
980        Ok(r) => r,
981        Err(_) => return false,
982    };
983
984    match actual {
985        Value::List(items) => items.iter().any(|item| re.is_match(item)),
986        _ => re.is_match(&actual.to_display_string()),
987    }
988}
989
990fn compare_values(actual: &Value, expected: &SqlValue) -> Option<Ordering> {
991    let coerced = coerce_sql_to_value(expected, actual);
992    actual.partial_cmp(&coerced)
993}
994
995/// Convert a SqlValue to a Value for index lookups (without a target type for coercion).
996fn sql_value_to_index_value(sv: &SqlValue) -> Value {
997    match sv {
998        SqlValue::String(s) => {
999            // Try datetime first (more specific)
1000            if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
1001                return Value::DateTime(dt);
1002            }
1003            if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f") {
1004                return Value::DateTime(dt);
1005            }
1006            // Try date
1007            if let Ok(d) = chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d") {
1008                return Value::Date(d);
1009            }
1010            Value::String(s.clone())
1011        }
1012        SqlValue::Int(n) => Value::Int(*n),
1013        SqlValue::Float(f) => Value::Float(*f),
1014        SqlValue::Null => Value::Null,
1015        SqlValue::List(_) => Value::Null,
1016    }
1017}
1018
1019/// Try to use B-tree indexes to narrow the candidate row set.
1020/// Returns Some(paths) if the entire WHERE clause could be resolved via index,
1021/// or None if a full scan is needed.
1022fn try_index_filter(
1023    clause: &WhereClause,
1024    index: &crate::index::TableIndex,
1025) -> Option<std::collections::HashSet<String>> {
1026    match clause {
1027        WhereClause::Comparison(cmp) => {
1028            if !index.has_index(&cmp.column) {
1029                return None;
1030            }
1031            match cmp.op {
1032                CmpOp::Eq => {
1033                    let val = sql_value_to_index_value(cmp.value.as_ref()?);
1034                    let paths = index.lookup_eq(&cmp.column, &val);
1035                    Some(paths.into_iter().map(|s| s.to_string()).collect())
1036                }
1037                CmpOp::Lt => {
1038                    let val = sql_value_to_index_value(cmp.value.as_ref()?);
1039                    // exclusive upper bound: use range with max < val
1040                    // lookup_range is inclusive, so we get all <= val then remove exact matches
1041                    let range_paths = index.lookup_range(&cmp.column, None, Some(&val));
1042                    let eq_paths: std::collections::HashSet<&str> = index.lookup_eq(&cmp.column, &val).into_iter().collect();
1043                    Some(range_paths.into_iter().filter(|p| !eq_paths.contains(p)).map(|s| s.to_string()).collect())
1044                }
1045                CmpOp::Gt => {
1046                    let val = sql_value_to_index_value(cmp.value.as_ref()?);
1047                    let range_paths = index.lookup_range(&cmp.column, Some(&val), None);
1048                    let eq_paths: std::collections::HashSet<&str> = index.lookup_eq(&cmp.column, &val).into_iter().collect();
1049                    Some(range_paths.into_iter().filter(|p| !eq_paths.contains(p)).map(|s| s.to_string()).collect())
1050                }
1051                CmpOp::Le => {
1052                    let val = sql_value_to_index_value(cmp.value.as_ref()?);
1053                    let paths = index.lookup_range(&cmp.column, None, Some(&val));
1054                    Some(paths.into_iter().map(|s| s.to_string()).collect())
1055                }
1056                CmpOp::Ge => {
1057                    let val = sql_value_to_index_value(cmp.value.as_ref()?);
1058                    let paths = index.lookup_range(&cmp.column, Some(&val), None);
1059                    Some(paths.into_iter().map(|s| s.to_string()).collect())
1060                }
1061                CmpOp::In => {
1062                    if let Some(SqlValue::List(items)) = &cmp.value {
1063                        let vals: Vec<Value> = items.iter().map(sql_value_to_index_value).collect();
1064                        let paths = index.lookup_in(&cmp.column, &vals);
1065                        Some(paths.into_iter().map(|s| s.to_string()).collect())
1066                    } else {
1067                        None
1068                    }
1069                }
1070                _ => None, // LIKE, IS NULL, etc. can't use index
1071            }
1072        }
1073        WhereClause::BoolOp(bop) => {
1074            let left = try_index_filter(&bop.left, index);
1075            let right = try_index_filter(&bop.right, index);
1076            match bop.op {
1077                BoolOpKind::And => {
1078                    match (left, right) {
1079                        (Some(l), Some(r)) => Some(l.intersection(&r).cloned().collect()),
1080                        (Some(l), None) => Some(l), // narrow with left, scan-verify right
1081                        (None, Some(r)) => Some(r),
1082                        (None, None) => None,
1083                    }
1084                }
1085                BoolOpKind::Or => {
1086                    match (left, right) {
1087                        (Some(l), Some(r)) => Some(l.union(&r).cloned().collect()),
1088                        _ => None, // Can't use index if either side needs full scan
1089                    }
1090                }
1091            }
1092        }
1093    }
1094}
1095
1096/// If an ORDER BY column matches a SELECT alias, replace its expr with the
1097/// aliased expression so sorting uses the computed value.
1098fn resolve_order_aliases(specs: &[OrderSpec], columns: &ColumnList) -> Vec<OrderSpec> {
1099    let named = match columns {
1100        ColumnList::Named(exprs) => exprs,
1101        _ => return specs.to_vec(),
1102    };
1103
1104    // Build alias → expr map (skip window exprs — their values are already in rows)
1105    let alias_map: HashMap<String, &Expr> = named
1106        .iter()
1107        .filter_map(|se| match se {
1108            SelectExpr::Expr { expr, alias: Some(a) } if !expr.contains_window() => {
1109                Some((a.clone(), expr))
1110            }
1111            _ => None,
1112        })
1113        .collect();
1114
1115    specs
1116        .iter()
1117        .map(|spec| {
1118            // If the ORDER BY column name matches a SELECT alias, use that expression
1119            if let Some(expr) = alias_map.get(&spec.column) {
1120                OrderSpec {
1121                    column: spec.column.clone(),
1122                    expr: Some((*expr).clone()),
1123                    descending: spec.descending,
1124                }
1125            } else {
1126                spec.clone()
1127            }
1128        })
1129        .collect()
1130}
1131
1132fn sort_rows(rows: &mut Vec<Row>, specs: &[OrderSpec]) {
1133    rows.sort_by(|a, b| {
1134        for spec in specs {
1135            let (va, vb) = if let Some(ref expr) = spec.expr {
1136                (evaluate_expr(expr, a), evaluate_expr(expr, b))
1137            } else {
1138                (
1139                    a.get(&spec.column).cloned().unwrap_or(Value::Null),
1140                    b.get(&spec.column).cloned().unwrap_or(Value::Null),
1141                )
1142            };
1143
1144            // NULLs sort last
1145            let ordering = match (&va, &vb) {
1146                (Value::Null, Value::Null) => Ordering::Equal,
1147                (Value::Null, _) => Ordering::Greater,
1148                (_, Value::Null) => Ordering::Less,
1149                (a_val, b_val) => {
1150                    compare_model_values(a_val, b_val).unwrap_or(Ordering::Equal)
1151                }
1152            };
1153
1154            let ordering = if spec.descending {
1155                ordering.reverse()
1156            } else {
1157                ordering
1158            };
1159
1160            if ordering != Ordering::Equal {
1161                return ordering;
1162            }
1163        }
1164        Ordering::Equal
1165    });
1166}
1167
1168/// Convert a SqlValue to our model Value (for use in insert/update).
1169pub(crate) fn sql_value_to_value(sql_val: &SqlValue) -> Value {
1170    match sql_val {
1171        SqlValue::Null => Value::Null,
1172        SqlValue::String(s) => Value::String(s.clone()),
1173        SqlValue::Int(n) => Value::Int(*n),
1174        SqlValue::Float(f) => Value::Float(*f),
1175        SqlValue::List(items) => {
1176            let strings: Vec<String> = items
1177                .iter()
1178                .filter_map(|v| match v {
1179                    SqlValue::String(s) => Some(s.clone()),
1180                    _ => None,
1181                })
1182                .collect();
1183            Value::List(strings)
1184        }
1185    }
1186}
1187
1188#[cfg(test)]
1189mod tests {
1190    use super::*;
1191
1192    fn make_rows() -> Vec<Row> {
1193        vec![
1194            Row::from([
1195                ("path".into(), Value::String("a.md".into())),
1196                ("title".into(), Value::String("Alpha".into())),
1197                ("count".into(), Value::Int(10)),
1198            ]),
1199            Row::from([
1200                ("path".into(), Value::String("b.md".into())),
1201                ("title".into(), Value::String("Beta".into())),
1202                ("count".into(), Value::Int(5)),
1203            ]),
1204            Row::from([
1205                ("path".into(), Value::String("c.md".into())),
1206                ("title".into(), Value::String("Gamma".into())),
1207                ("count".into(), Value::Int(20)),
1208            ]),
1209        ]
1210    }
1211
1212    #[test]
1213    fn test_select_all() {
1214        let q = SelectQuery {
1215            columns: ColumnList::All,
1216            table: "test".into(),
1217            table_alias: None,
1218            subquery: None,
1219            joins: vec![],
1220            where_clause: None,
1221            group_by: None,
1222            having: None,
1223            order_by: None,
1224            limit: None,
1225            ctes: vec![],
1226        };
1227        let (rows, _cols) = execute_inner(&q, &make_rows(), None).unwrap();
1228        assert_eq!(rows.len(), 3);
1229    }
1230
1231    #[test]
1232    fn test_where_gt() {
1233        let q = SelectQuery {
1234            columns: ColumnList::All,
1235            table: "test".into(),
1236            table_alias: None,
1237            subquery: None,
1238            joins: vec![],
1239            where_clause: Some(WhereClause::Comparison(Comparison {
1240                column: "count".into(),
1241                op: CmpOp::Gt,
1242                value: Some(SqlValue::Int(5)),
1243                left_expr: Some(Expr::Column("count".into())),
1244                right_expr: Some(Expr::Literal(SqlValue::Int(5))),
1245            })),
1246            group_by: None,
1247            having: None,
1248            order_by: None,
1249            limit: None,
1250            ctes: vec![],
1251        };
1252        let (rows, _) = execute_inner(&q, &make_rows(), None).unwrap();
1253        assert_eq!(rows.len(), 2);
1254    }
1255
1256    #[test]
1257    fn test_order_by_desc() {
1258        let q = SelectQuery {
1259            columns: ColumnList::All,
1260            table: "test".into(),
1261            table_alias: None,
1262            subquery: None,
1263            joins: vec![],
1264            where_clause: None,
1265            group_by: None,
1266            having: None,
1267            order_by: Some(vec![OrderSpec {
1268                column: "count".into(),
1269                expr: Some(Expr::Column("count".into())),
1270                descending: true,
1271            }]),
1272            limit: None,
1273            ctes: vec![],
1274        };
1275        let (rows, _) = execute_inner(&q, &make_rows(), None).unwrap();
1276        assert_eq!(rows[0]["count"], Value::Int(20));
1277        assert_eq!(rows[2]["count"], Value::Int(5));
1278    }
1279
1280    #[test]
1281    fn test_limit() {
1282        let q = SelectQuery {
1283            columns: ColumnList::All,
1284            table: "test".into(),
1285            table_alias: None,
1286            subquery: None,
1287            joins: vec![],
1288            where_clause: None,
1289            group_by: None,
1290            having: None,
1291            order_by: None,
1292            limit: Some(2),
1293            ctes: vec![],
1294        };
1295        let (rows, _) = execute_inner(&q, &make_rows(), None).unwrap();
1296        assert_eq!(rows.len(), 2);
1297    }
1298
1299    #[test]
1300    fn test_like() {
1301        let q = SelectQuery {
1302            columns: ColumnList::All,
1303            table: "test".into(),
1304            table_alias: None,
1305            subquery: None,
1306            joins: vec![],
1307            where_clause: Some(WhereClause::Comparison(Comparison {
1308                column: "title".into(),
1309                op: CmpOp::Like,
1310                value: Some(SqlValue::String("%lph%".into())),
1311                left_expr: Some(Expr::Column("title".into())),
1312                right_expr: None,
1313            })),
1314            group_by: None,
1315            having: None,
1316            order_by: None,
1317            limit: None,
1318            ctes: vec![],
1319        };
1320        let (rows, _) = execute_inner(&q, &make_rows(), None).unwrap();
1321        assert_eq!(rows.len(), 1);
1322        assert_eq!(rows[0]["title"], Value::String("Alpha".into()));
1323    }
1324
1325    #[test]
1326    fn test_is_null() {
1327        let mut rows = make_rows();
1328        rows[1].insert("optional".into(), Value::Null);
1329
1330        let q = SelectQuery {
1331            columns: ColumnList::All,
1332            table: "test".into(),
1333            table_alias: None,
1334            subquery: None,
1335            joins: vec![],
1336            where_clause: Some(WhereClause::Comparison(Comparison {
1337                column: "optional".into(),
1338                op: CmpOp::IsNull,
1339                value: None,
1340                left_expr: Some(Expr::Column("optional".into())),
1341                right_expr: None,
1342            })),
1343            group_by: None,
1344            having: None,
1345            order_by: None,
1346            limit: None,
1347            ctes: vec![],
1348        };
1349        let (result, _) = execute_inner(&q, &rows, None).unwrap();
1350        // All rows where optional is NULL or missing
1351        assert_eq!(result.len(), 3);
1352    }
1353
1354    // ── Expression evaluation tests ─────────────────────────��─────
1355
1356    #[test]
1357    fn test_evaluate_expr_literal() {
1358        let row = Row::new();
1359        assert_eq!(evaluate_expr(&Expr::Literal(SqlValue::Int(42)), &row), Value::Int(42));
1360        assert_eq!(evaluate_expr(&Expr::Literal(SqlValue::Float(3.14)), &row), Value::Float(3.14));
1361        assert_eq!(evaluate_expr(&Expr::Literal(SqlValue::Null), &row), Value::Null);
1362    }
1363
1364    #[test]
1365    fn test_evaluate_expr_column() {
1366        let row = Row::from([("x".into(), Value::Int(10))]);
1367        assert_eq!(evaluate_expr(&Expr::Column("x".into()), &row), Value::Int(10));
1368        assert_eq!(evaluate_expr(&Expr::Column("missing".into()), &row), Value::Null);
1369    }
1370
1371    #[test]
1372    fn test_evaluate_expr_int_arithmetic() {
1373        let row = Row::from([("a".into(), Value::Int(10)), ("b".into(), Value::Int(3))]);
1374        let add = Expr::BinaryOp {
1375            left: Box::new(Expr::Column("a".into())),
1376            op: ArithOp::Add,
1377            right: Box::new(Expr::Column("b".into())),
1378        };
1379        assert_eq!(evaluate_expr(&add, &row), Value::Int(13));
1380
1381        let sub = Expr::BinaryOp {
1382            left: Box::new(Expr::Column("a".into())),
1383            op: ArithOp::Sub,
1384            right: Box::new(Expr::Column("b".into())),
1385        };
1386        assert_eq!(evaluate_expr(&sub, &row), Value::Int(7));
1387
1388        let mul = Expr::BinaryOp {
1389            left: Box::new(Expr::Column("a".into())),
1390            op: ArithOp::Mul,
1391            right: Box::new(Expr::Column("b".into())),
1392        };
1393        assert_eq!(evaluate_expr(&mul, &row), Value::Int(30));
1394
1395        let div = Expr::BinaryOp {
1396            left: Box::new(Expr::Column("a".into())),
1397            op: ArithOp::Div,
1398            right: Box::new(Expr::Column("b".into())),
1399        };
1400        assert_eq!(evaluate_expr(&div, &row), Value::Int(3)); // integer division
1401
1402        let modulo = Expr::BinaryOp {
1403            left: Box::new(Expr::Column("a".into())),
1404            op: ArithOp::Mod,
1405            right: Box::new(Expr::Column("b".into())),
1406        };
1407        assert_eq!(evaluate_expr(&modulo, &row), Value::Int(1));
1408    }
1409
1410    #[test]
1411    fn test_evaluate_expr_float_coercion() {
1412        let row = Row::from([("a".into(), Value::Int(10)), ("b".into(), Value::Float(3.0))]);
1413        let add = Expr::BinaryOp {
1414            left: Box::new(Expr::Column("a".into())),
1415            op: ArithOp::Add,
1416            right: Box::new(Expr::Column("b".into())),
1417        };
1418        assert_eq!(evaluate_expr(&add, &row), Value::Float(13.0));
1419    }
1420
1421    #[test]
1422    fn test_evaluate_expr_null_propagation() {
1423        let row = Row::from([("a".into(), Value::Int(10))]);
1424        let add = Expr::BinaryOp {
1425            left: Box::new(Expr::Column("a".into())),
1426            op: ArithOp::Add,
1427            right: Box::new(Expr::Column("missing".into())),
1428        };
1429        assert_eq!(evaluate_expr(&add, &row), Value::Null);
1430    }
1431
1432    #[test]
1433    fn test_evaluate_expr_div_by_zero() {
1434        let row = Row::from([("a".into(), Value::Int(10)), ("b".into(), Value::Int(0))]);
1435        let div = Expr::BinaryOp {
1436            left: Box::new(Expr::Column("a".into())),
1437            op: ArithOp::Div,
1438            right: Box::new(Expr::Column("b".into())),
1439        };
1440        assert_eq!(evaluate_expr(&div, &row), Value::Null);
1441    }
1442
1443    #[test]
1444    fn test_evaluate_expr_unary_minus() {
1445        let row = Row::from([("x".into(), Value::Int(5))]);
1446        let neg = Expr::UnaryMinus(Box::new(Expr::Column("x".into())));
1447        assert_eq!(evaluate_expr(&neg, &row), Value::Int(-5));
1448    }
1449
1450    #[test]
1451    fn test_select_with_expression() {
1452        // Integration test: SELECT count * 2 AS doubled FROM test
1453        let stmt = crate::query_parser::parse_query(
1454            "SELECT count * 2 AS doubled FROM test"
1455        ).unwrap();
1456        if let crate::query_parser::Statement::Select(q) = stmt {
1457            let (rows, cols) = execute_inner(&q, &make_rows(), None).unwrap();
1458            assert_eq!(cols, vec!["doubled"]);
1459            assert_eq!(rows.len(), 3);
1460            // Rows are: count=10, count=5, count=20
1461            let values: Vec<Value> = rows.iter().map(|r| r["doubled"].clone()).collect();
1462            assert!(values.contains(&Value::Int(20)));
1463            assert!(values.contains(&Value::Int(10)));
1464            assert!(values.contains(&Value::Int(40)));
1465        } else {
1466            panic!("Expected Select");
1467        }
1468    }
1469
1470    #[test]
1471    fn test_where_with_expression() {
1472        // SELECT * FROM test WHERE count * 2 > 15
1473        let stmt = crate::query_parser::parse_query(
1474            "SELECT * FROM test WHERE count * 2 > 15"
1475        ).unwrap();
1476        if let crate::query_parser::Statement::Select(q) = stmt {
1477            let (rows, _) = execute_inner(&q, &make_rows(), None).unwrap();
1478            // count=10 → 20 > 15 ✓, count=5 → 10 > 15 ✗, count=20 → 40 > 15 ✓
1479            assert_eq!(rows.len(), 2);
1480        } else {
1481            panic!("Expected Select");
1482        }
1483    }
1484
1485    #[test]
1486    fn test_order_by_expression() {
1487        // SELECT * FROM test ORDER BY count * -1 ASC (effectively DESC by count)
1488        let stmt = crate::query_parser::parse_query(
1489            "SELECT title, count FROM test ORDER BY count * -1 ASC"
1490        ).unwrap();
1491        if let crate::query_parser::Statement::Select(q) = stmt {
1492            let (rows, _) = execute_inner(&q, &make_rows(), None).unwrap();
1493            // count: 20 → -20, 10 → -10, 5 → -5, ASC means -20, -10, -5
1494            assert_eq!(rows[0]["count"], Value::Int(20));
1495            assert_eq!(rows[1]["count"], Value::Int(10));
1496            assert_eq!(rows[2]["count"], Value::Int(5));
1497        } else {
1498            panic!("Expected Select");
1499        }
1500    }
1501
1502    // ── CASE WHEN evaluation tests ────────────────────────────────
1503
1504    #[test]
1505    fn test_case_when_eval_basic() {
1506        let row = Row::from([("status".into(), Value::String("ACTIVE".into()))]);
1507        let expr = Expr::Case {
1508            whens: vec![(
1509                WhereClause::Comparison(Comparison {
1510                    column: "status".into(),
1511                    op: CmpOp::Eq,
1512                    value: Some(SqlValue::String("ACTIVE".into())),
1513                    left_expr: Some(Expr::Column("status".into())),
1514                    right_expr: Some(Expr::Literal(SqlValue::String("ACTIVE".into()))),
1515                }),
1516                Box::new(Expr::Literal(SqlValue::Int(1))),
1517            )],
1518            else_expr: Some(Box::new(Expr::Literal(SqlValue::Int(0)))),
1519        };
1520        assert_eq!(evaluate_expr(&expr, &row), Value::Int(1));
1521    }
1522
1523    #[test]
1524    fn test_case_when_eval_else() {
1525        let row = Row::from([("status".into(), Value::String("KILLED".into()))]);
1526        let expr = Expr::Case {
1527            whens: vec![(
1528                WhereClause::Comparison(Comparison {
1529                    column: "status".into(),
1530                    op: CmpOp::Eq,
1531                    value: Some(SqlValue::String("ACTIVE".into())),
1532                    left_expr: Some(Expr::Column("status".into())),
1533                    right_expr: Some(Expr::Literal(SqlValue::String("ACTIVE".into()))),
1534                }),
1535                Box::new(Expr::Literal(SqlValue::Int(1))),
1536            )],
1537            else_expr: Some(Box::new(Expr::Literal(SqlValue::Int(0)))),
1538        };
1539        assert_eq!(evaluate_expr(&expr, &row), Value::Int(0));
1540    }
1541
1542    #[test]
1543    fn test_case_when_eval_no_else_null() {
1544        let row = Row::from([("x".into(), Value::Int(99))]);
1545        let expr = Expr::Case {
1546            whens: vec![(
1547                WhereClause::Comparison(Comparison {
1548                    column: "x".into(),
1549                    op: CmpOp::Eq,
1550                    value: Some(SqlValue::Int(1)),
1551                    left_expr: Some(Expr::Column("x".into())),
1552                    right_expr: Some(Expr::Literal(SqlValue::Int(1))),
1553                }),
1554                Box::new(Expr::Literal(SqlValue::String("one".into()))),
1555            )],
1556            else_expr: None,
1557        };
1558        assert_eq!(evaluate_expr(&expr, &row), Value::Null);
1559    }
1560
1561    #[test]
1562    fn test_case_when_in_aggregate_query() {
1563        // SUM(CASE WHEN count > 5 THEN count ELSE 0 END)
1564        // Rows: count=10, count=5, count=20 → should sum 10 + 0 + 20 = 30
1565        let stmt = crate::query_parser::parse_query(
1566            "SELECT SUM(CASE WHEN count > 5 THEN count ELSE 0 END) AS total FROM test"
1567        ).unwrap();
1568        if let crate::query_parser::Statement::Select(q) = stmt {
1569            let (rows, cols) = execute_inner(&q, &make_rows(), None).unwrap();
1570            assert_eq!(cols, vec!["total"]);
1571            assert_eq!(rows.len(), 1);
1572            assert_eq!(rows[0]["total"], Value::Float(30.0));
1573        } else {
1574            panic!("Expected Select");
1575        }
1576    }
1577
1578    #[test]
1579    fn test_case_when_with_unary_minus_in_aggregate() {
1580        // SUM(CASE WHEN title = 'Alpha' THEN count ELSE -count END)
1581        // Alpha: 10, Beta: -5, Gamma: -20 → 10 - 5 - 20 = -15
1582        let stmt = crate::query_parser::parse_query(
1583            "SELECT SUM(CASE WHEN title = 'Alpha' THEN count ELSE -count END) AS net FROM test"
1584        ).unwrap();
1585        if let crate::query_parser::Statement::Select(q) = stmt {
1586            let (rows, _) = execute_inner(&q, &make_rows(), None).unwrap();
1587            assert_eq!(rows.len(), 1);
1588            assert_eq!(rows[0]["net"], Value::Float(-15.0));
1589        } else {
1590            panic!("Expected Select");
1591        }
1592    }
1593
1594    #[test]
1595    fn test_dateadd_with_dict_in_group_by() {
1596        // Simulate a joined row with a dict field, then GROUP BY + DateAdd expr
1597        use indexmap::IndexMap;
1598        let mut params = IndexMap::new();
1599        params.insert("exit_days".to_string(), Value::Int(21));
1600
1601        let rows = vec![
1602            Row::from([
1603                ("o.token".into(), Value::String("BTC".into())),
1604                ("o.event_date".into(), Value::Date(
1605                    chrono::NaiveDate::from_ymd_opt(2026, 1, 1).unwrap()
1606                )),
1607                ("o.size".into(), Value::Int(100)),
1608                ("s.params".into(), Value::Dict(params.clone())),
1609            ]),
1610            Row::from([
1611                ("o.token".into(), Value::String("BTC".into())),
1612                ("o.event_date".into(), Value::Date(
1613                    chrono::NaiveDate::from_ymd_opt(2026, 1, 1).unwrap()
1614                )),
1615                ("o.size".into(), Value::Int(50)),
1616                ("s.params".into(), Value::Dict(params.clone())),
1617            ]),
1618        ];
1619
1620        let q = SelectQuery {
1621            columns: ColumnList::Named(vec![
1622                SelectExpr::Column("o.token".into()),
1623                SelectExpr::Column("o.event_date".into()),
1624                SelectExpr::Expr {
1625                    expr: Expr::DateAdd {
1626                        date: Box::new(Expr::Column("o.event_date".into())),
1627                        days: Box::new(Expr::Column("s.params.exit_days".into())),
1628                    },
1629                    alias: Some("exit_date".into()),
1630                },
1631                SelectExpr::Aggregate {
1632                    func: AggFunc::Sum,
1633                    arg: "o.size".into(),
1634                    arg_expr: Some(Expr::Column("o.size".into())),
1635                    alias: Some("total".into()),
1636                },
1637            ]),
1638            table: "orders".into(),
1639            table_alias: None,
1640            subquery: None,
1641            joins: vec![],
1642            where_clause: None,
1643            group_by: Some(vec!["o.token".into(), "o.event_date".into()]),
1644            having: None,
1645            order_by: None,
1646            limit: None,
1647            ctes: vec![],
1648        };
1649
1650        let (rows, cols) = execute_inner(&q, &rows, None).unwrap();
1651        assert_eq!(rows.len(), 1);
1652        assert!(cols.contains(&"exit_date".to_string()));
1653        assert_eq!(rows[0]["total"], Value::Float(150.0));
1654        // The key test: exit_date should be 2026-01-22, not Null
1655        assert_eq!(
1656            rows[0]["exit_date"],
1657            Value::Date(chrono::NaiveDate::from_ymd_opt(2026, 1, 22).unwrap())
1658        );
1659    }
1660
1661    #[test]
1662    fn test_aggregate_arithmetic() {
1663        // SUM(count) for all rows = 10 + 5 + 20 = 35
1664        // COUNT(*) = 3
1665        // SUM produces Float, COUNT produces Int → mixed → Float division
1666        let stmt = crate::query_parser::parse_query(
1667            "SELECT SUM(count) / COUNT(*) AS avg_count FROM test"
1668        ).unwrap();
1669        if let crate::query_parser::Statement::Select(q) = stmt {
1670            let (rows, cols) = execute_inner(&q, &make_rows(), None).unwrap();
1671            assert_eq!(cols, vec!["avg_count"]);
1672            assert_eq!(rows.len(), 1);
1673            match &rows[0]["avg_count"] {
1674                Value::Float(f) => assert!((f - 11.666666666666666).abs() < 0.001),
1675                other => panic!("Expected Float, got {:?}", other),
1676            }
1677        } else {
1678            panic!("Expected Select");
1679        }
1680    }
1681
1682    #[test]
1683    fn test_aggregate_subtraction_with_group_by() {
1684        let rows = vec![
1685            {
1686                let mut r = Row::new();
1687                r.insert("token".into(), Value::String("BTC".into()));
1688                r.insert("side".into(), Value::String("BUY".into()));
1689                r.insert("size".into(), Value::Float(100.0));
1690                r
1691            },
1692            {
1693                let mut r = Row::new();
1694                r.insert("token".into(), Value::String("BTC".into()));
1695                r.insert("side".into(), Value::String("SELL".into()));
1696                r.insert("size".into(), Value::Float(60.0));
1697                r
1698            },
1699        ];
1700        let stmt = crate::query_parser::parse_query(
1701            "SELECT token, SUM(CASE WHEN side = 'BUY' THEN size ELSE 0 END) - SUM(CASE WHEN side = 'SELL' THEN size ELSE 0 END) AS net FROM test GROUP BY token"
1702        ).unwrap();
1703        if let crate::query_parser::Statement::Select(q) = stmt {
1704            let (result, _) = execute_inner(&q, &rows, None).unwrap();
1705            assert_eq!(result.len(), 1);
1706            assert_eq!(result[0]["net"], Value::Float(40.0));
1707        } else {
1708            panic!("Expected Select");
1709        }
1710    }
1711
1712    // ── Issue #42: Aggregate subtraction without GROUP BY ──
1713
1714    #[test]
1715    fn test_aggregate_subtraction_no_group() {
1716        // SUM(count) = 10 + 5 + 20 = 35, COUNT(*) = 3, diff = 35 - 3 = 32
1717        let stmt = crate::query_parser::parse_query(
1718            "SELECT SUM(count) - COUNT(*) as diff FROM test"
1719        ).unwrap();
1720        if let crate::query_parser::Statement::Select(q) = stmt {
1721            let (rows, cols) = execute_inner(&q, &make_rows(), None).unwrap();
1722            assert_eq!(cols, vec!["diff"]);
1723            assert_eq!(rows.len(), 1);
1724            assert_eq!(rows[0]["diff"], Value::Float(32.0));
1725        } else {
1726            panic!("Expected Select");
1727        }
1728    }
1729
1730    // ── Issue #42: Aggregate division with GROUP BY ──
1731
1732    #[test]
1733    fn test_aggregate_division_with_group_by() {
1734        let rows = vec![
1735            {
1736                let mut r = Row::new();
1737                r.insert("category".into(), Value::String("A".into()));
1738                r.insert("count".into(), Value::Int(10));
1739                r
1740            },
1741            {
1742                let mut r = Row::new();
1743                r.insert("category".into(), Value::String("A".into()));
1744                r.insert("count".into(), Value::Int(20));
1745                r
1746            },
1747            {
1748                let mut r = Row::new();
1749                r.insert("category".into(), Value::String("B".into()));
1750                r.insert("count".into(), Value::Int(6));
1751                r
1752            },
1753        ];
1754        // Group A: SUM(count)=30, COUNT(*)=2, ratio=15.0
1755        // Group B: SUM(count)=6, COUNT(*)=1, ratio=6.0
1756        let stmt = crate::query_parser::parse_query(
1757            "SELECT category, SUM(count) / COUNT(*) as ratio FROM test GROUP BY category"
1758        ).unwrap();
1759        if let crate::query_parser::Statement::Select(q) = stmt {
1760            let (result, cols) = execute_inner(&q, &rows, None).unwrap();
1761            assert!(cols.contains(&"ratio".to_string()));
1762            assert_eq!(result.len(), 2);
1763            // Find group A and B by category value
1764            let group_a = result.iter().find(|r| r["category"] == Value::String("A".into())).unwrap();
1765            let group_b = result.iter().find(|r| r["category"] == Value::String("B".into())).unwrap();
1766            match &group_a["ratio"] {
1767                Value::Float(f) => assert!((f - 15.0).abs() < 0.001),
1768                other => panic!("Expected Float for group A ratio, got {:?}", other),
1769            }
1770            match &group_b["ratio"] {
1771                Value::Float(f) => assert!((f - 6.0).abs() < 0.001),
1772                other => panic!("Expected Float for group B ratio, got {:?}", other),
1773            }
1774        } else {
1775            panic!("Expected Select");
1776        }
1777    }
1778
1779    // ── Window function tests ────────────────────────────────────
1780
1781    #[test]
1782    fn test_window_row_number() {
1783        let stmt = crate::query_parser::parse_query(
1784            "SELECT title, ROW_NUMBER() OVER (ORDER BY count DESC) AS rn FROM test"
1785        ).unwrap();
1786        if let crate::query_parser::Statement::Select(q) = stmt {
1787            let (rows, cols) = execute_inner(&q, &make_rows(), None).unwrap();
1788            assert_eq!(cols, vec!["title", "rn"]);
1789            assert_eq!(rows.len(), 3);
1790            let by_title: HashMap<String, i64> = rows.iter()
1791                .map(|r| (r["title"].to_display_string(), match &r["rn"] { Value::Int(n) => *n, _ => panic!("Expected Int") }))
1792                .collect();
1793            assert_eq!(by_title["Gamma"], 1); // count=20
1794            assert_eq!(by_title["Alpha"], 2); // count=10
1795            assert_eq!(by_title["Beta"], 3);  // count=5
1796        } else {
1797            panic!("Expected Select");
1798        }
1799    }
1800
1801    #[test]
1802    fn test_window_rank_with_ties() {
1803        let mut rows = make_rows();
1804        rows[0].insert("count".into(), Value::Int(10));
1805        rows[1].insert("count".into(), Value::Int(10));
1806        rows[2].insert("count".into(), Value::Int(5));
1807
1808        let stmt = crate::query_parser::parse_query(
1809            "SELECT title, RANK() OVER (ORDER BY count DESC) AS rnk FROM test"
1810        ).unwrap();
1811        if let crate::query_parser::Statement::Select(q) = stmt {
1812            let (result, _) = execute_inner(&q, &rows, None).unwrap();
1813            let ranks: Vec<i64> = result.iter()
1814                .map(|r| match &r["rnk"] { Value::Int(n) => *n, _ => panic!("Expected Int") })
1815                .collect();
1816            assert!(ranks.contains(&1)); // two tied at rank 1
1817            assert!(ranks.iter().filter(|&&r| r == 1).count() == 2);
1818            assert!(ranks.contains(&3)); // rank 3 (gap after tie)
1819        } else {
1820            panic!("Expected Select");
1821        }
1822    }
1823
1824    #[test]
1825    fn test_window_dense_rank() {
1826        let mut rows = make_rows();
1827        rows[0].insert("count".into(), Value::Int(10));
1828        rows[1].insert("count".into(), Value::Int(10));
1829        rows[2].insert("count".into(), Value::Int(5));
1830
1831        let stmt = crate::query_parser::parse_query(
1832            "SELECT title, DENSE_RANK() OVER (ORDER BY count DESC) AS dr FROM test"
1833        ).unwrap();
1834        if let crate::query_parser::Statement::Select(q) = stmt {
1835            let (result, _) = execute_inner(&q, &rows, None).unwrap();
1836            let ranks: Vec<i64> = result.iter()
1837                .map(|r| match &r["dr"] { Value::Int(n) => *n, _ => panic!("Expected Int") })
1838                .collect();
1839            assert!(ranks.iter().filter(|&&r| r == 1).count() == 2);
1840            assert!(ranks.contains(&2)); // dense rank: no gap
1841            assert!(!ranks.contains(&3));
1842        } else {
1843            panic!("Expected Select");
1844        }
1845    }
1846
1847    #[test]
1848    fn test_window_lag() {
1849        let stmt = crate::query_parser::parse_query(
1850            "SELECT title, LAG(count, 1) OVER (ORDER BY count ASC) AS prev FROM test"
1851        ).unwrap();
1852        if let crate::query_parser::Statement::Select(q) = stmt {
1853            let (rows, _) = execute_inner(&q, &make_rows(), None).unwrap();
1854            // Sorted ASC: Beta(5), Alpha(10), Gamma(20)
1855            // LAG: NULL, 5, 10
1856            let first = rows.iter().find(|r| r["title"] == Value::String("Beta".into())).unwrap();
1857            assert_eq!(first["prev"], Value::Null);
1858            let second = rows.iter().find(|r| r["title"] == Value::String("Alpha".into())).unwrap();
1859            assert_eq!(second["prev"], Value::Int(5));
1860            let third = rows.iter().find(|r| r["title"] == Value::String("Gamma".into())).unwrap();
1861            assert_eq!(third["prev"], Value::Int(10));
1862        } else {
1863            panic!("Expected Select");
1864        }
1865    }
1866
1867    #[test]
1868    fn test_window_lead() {
1869        let stmt = crate::query_parser::parse_query(
1870            "SELECT title, LEAD(count, 1) OVER (ORDER BY count ASC) AS next FROM test"
1871        ).unwrap();
1872        if let crate::query_parser::Statement::Select(q) = stmt {
1873            let (rows, _) = execute_inner(&q, &make_rows(), None).unwrap();
1874            let first = rows.iter().find(|r| r["title"] == Value::String("Beta".into())).unwrap();
1875            assert_eq!(first["next"], Value::Int(10));
1876            let last = rows.iter().find(|r| r["title"] == Value::String("Gamma".into())).unwrap();
1877            assert_eq!(last["next"], Value::Null);
1878        } else {
1879            panic!("Expected Select");
1880        }
1881    }
1882
1883    #[test]
1884    fn test_window_sum_partition() {
1885        let rows = vec![
1886            Row::from([
1887                ("cat".into(), Value::String("A".into())),
1888                ("val".into(), Value::Int(10)),
1889            ]),
1890            Row::from([
1891                ("cat".into(), Value::String("A".into())),
1892                ("val".into(), Value::Int(20)),
1893            ]),
1894            Row::from([
1895                ("cat".into(), Value::String("B".into())),
1896                ("val".into(), Value::Int(5)),
1897            ]),
1898        ];
1899        let stmt = crate::query_parser::parse_query(
1900            "SELECT cat, val, SUM(val) OVER (PARTITION BY cat) AS cat_total FROM test"
1901        ).unwrap();
1902        if let crate::query_parser::Statement::Select(q) = stmt {
1903            let (result, cols) = execute_inner(&q, &rows, None).unwrap();
1904            assert_eq!(cols, vec!["cat", "val", "cat_total"]);
1905            assert_eq!(result.len(), 3);
1906            let a_rows: Vec<_> = result.iter().filter(|r| r["cat"] == Value::String("A".into())).collect();
1907            assert_eq!(a_rows.len(), 2);
1908            for r in &a_rows {
1909                assert_eq!(r["cat_total"], Value::Float(30.0));
1910            }
1911            let b_row = result.iter().find(|r| r["cat"] == Value::String("B".into())).unwrap();
1912            assert_eq!(b_row["cat_total"], Value::Float(5.0));
1913        } else {
1914            panic!("Expected Select");
1915        }
1916    }
1917
1918    #[test]
1919    fn test_window_with_where_order_limit() {
1920        let stmt = crate::query_parser::parse_query(
1921            "SELECT title, ROW_NUMBER() OVER (ORDER BY count DESC) AS rn FROM test WHERE count > 4 ORDER BY rn LIMIT 2"
1922        ).unwrap();
1923        if let crate::query_parser::Statement::Select(q) = stmt {
1924            let (result, _) = execute_inner(&q, &make_rows(), None).unwrap();
1925            assert_eq!(result.len(), 2);
1926            assert_eq!(result[0]["rn"], Value::Int(1));
1927            assert_eq!(result[1]["rn"], Value::Int(2));
1928        } else {
1929            panic!("Expected Select");
1930        }
1931    }
1932}