Skip to main content

mdql_core/
query_engine.rs

1//! Execute parsed queries over in-memory rows.
2
3use std::cmp::Ordering;
4use std::collections::HashMap;
5
6use regex::Regex;
7
8use crate::errors::MdqlError;
9use crate::model::{Row, Value};
10use crate::query_parser::*;
11use crate::schema::Schema;
12
13pub fn execute_query(
14    query: &SelectQuery,
15    rows: &[Row],
16    _schema: &Schema,
17) -> crate::errors::Result<(Vec<Row>, Vec<String>)> {
18    if let Some(ref sub) = query.subquery {
19        let (sub_rows, _sub_cols) = execute_inner(sub, rows, None)?;
20        return execute_inner(query, &sub_rows, None);
21    }
22    execute_inner(query, rows, None)
23}
24
25#[allow(dead_code)]
26pub(crate) fn execute_query_indexed(
27    query: &SelectQuery,
28    rows: &[Row],
29    schema: &Schema,
30    index: Option<&crate::index::TableIndex>,
31    searcher: Option<&crate::search::TableSearcher>,
32) -> crate::errors::Result<(Vec<Row>, Vec<String>)> {
33    // Pre-compute FTS results for any LIKE clauses on section columns
34    let fts_results = if let (Some(ref wc), Some(searcher)) = (&query.where_clause, searcher) {
35        collect_fts_results(wc, schema, searcher)
36    } else {
37        HashMap::new()
38    };
39
40    execute_with_fts(query, rows, index, &fts_results)
41}
42
43#[allow(dead_code)]
44fn collect_fts_results(
45    clause: &WhereClause,
46    schema: &Schema,
47    searcher: &crate::search::TableSearcher,
48) -> HashMap<(String, String), std::collections::HashSet<String>> {
49    let mut results = HashMap::new();
50    collect_fts_results_inner(clause, schema, searcher, &mut results);
51    results
52}
53
54#[allow(dead_code)]
55fn collect_fts_results_inner(
56    clause: &WhereClause,
57    schema: &Schema,
58    searcher: &crate::search::TableSearcher,
59    results: &mut HashMap<(String, String), std::collections::HashSet<String>>,
60) {
61    match clause {
62        WhereClause::Comparison(cmp) => {
63            if (cmp.op == CmpOp::Like || cmp.op == CmpOp::NotLike) && schema.sections.contains_key(&cmp.column) {
64                if let Some(SqlValue::String(pattern)) = &cmp.value {
65                    // Strip SQL wildcards for Tantivy query
66                    let search_term = pattern.replace('%', " ").replace('_', " ").trim().to_string();
67                    if !search_term.is_empty() {
68                        if let Ok(paths) = searcher.search(&search_term, Some(&cmp.column)) {
69                            let key = (cmp.column.clone(), pattern.clone());
70                            results.insert(key, paths.into_iter().collect());
71                        }
72                    }
73                }
74            }
75        }
76        WhereClause::BoolOp(bop) => {
77            collect_fts_results_inner(&bop.left, schema, searcher, results);
78            collect_fts_results_inner(&bop.right, schema, searcher, results);
79        }
80    }
81}
82
83type FtsResults = HashMap<(String, String), std::collections::HashSet<String>>;
84
85fn execute_with_fts(
86    query: &SelectQuery,
87    rows: &[Row],
88    index: Option<&crate::index::TableIndex>,
89    fts: &FtsResults,
90) -> crate::errors::Result<(Vec<Row>, Vec<String>)> {
91    // Determine available columns
92    let mut all_columns: Vec<String> = Vec::new();
93    let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
94    for r in rows {
95        for k in r.keys() {
96            if seen.insert(k.clone()) {
97                all_columns.push(k.clone());
98            }
99        }
100    }
101
102    // Check if query has aggregates
103    let has_aggregates = match &query.columns {
104        ColumnList::Named(exprs) => exprs.iter().any(|e| e.is_aggregate()),
105        _ => false,
106    };
107
108    // Output column names
109    let columns: Vec<String> = match &query.columns {
110        ColumnList::All => all_columns,
111        ColumnList::Named(exprs) => exprs.iter().map(|e| e.output_name()).collect(),
112    };
113
114    // Reject duplicate output names. A result row is a key->value map, so two
115    // columns with the same output name cannot both be represented — the dict
116    // would collapse them and the header/row lengths would silently disagree.
117    // Require the caller to disambiguate with AS.
118    if let ColumnList::Named(_) = &query.columns {
119        let mut seen = std::collections::HashSet::new();
120        for c in &columns {
121            if !seen.insert(c.as_str()) {
122                return Err(MdqlError::QueryExecution(format!(
123                    "duplicate output column '{}' — give each projection a unique name with AS",
124                    c
125                )));
126            }
127        }
128    }
129
130    // Filter — try index first, fall back to full scan
131    let filtered: Vec<Row> = if let Some(ref wc) = query.where_clause {
132        let candidate_paths = index.and_then(|idx| try_index_filter(wc, idx));
133        if let Some(paths) = candidate_paths {
134            rows.iter()
135                .filter(|r| {
136                    r.get("path")
137                        .and_then(|v| v.as_str())
138                        .map_or(false, |p| paths.contains(p))
139                })
140                .filter(|r| evaluate_with_fts(wc, r, fts))
141                .cloned()
142                .collect()
143        } else {
144            rows.iter()
145                .filter(|r| evaluate_with_fts(wc, r, fts))
146                .cloned()
147                .collect()
148        }
149    } else {
150        rows.to_vec()
151    };
152
153    // Aggregate if needed
154    let mut result = if has_aggregates || query.group_by.is_some() {
155        let exprs = match &query.columns {
156            ColumnList::Named(exprs) => exprs.clone(),
157            _ => return Err(MdqlError::QueryExecution(
158                "SELECT * with GROUP BY is not supported".into(),
159            )),
160        };
161        let group_keys = query.group_by.as_deref().unwrap_or(&[]);
162        aggregate_rows(&filtered, &exprs, group_keys)?
163    } else {
164        filtered
165    };
166
167    // HAVING filter — apply after aggregation
168    if let Some(ref having) = query.having {
169        result.retain(|row| evaluate(having, row));
170    }
171
172    // Window functions — compute after aggregation/HAVING, before ORDER BY
173    let has_windows = match &query.columns {
174        ColumnList::Named(exprs) => exprs.iter().any(|e| match e {
175            SelectExpr::Expr { expr, .. } => expr.contains_window(),
176            _ => false,
177        }),
178        _ => false,
179    };
180    if has_windows {
181        if let ColumnList::Named(ref exprs) = query.columns {
182            compute_windows(&mut result, exprs)?;
183        }
184    }
185
186    // DISTINCT — dedupe on the projected output values, keeping the first
187    // occurrence. Runs before ORDER BY and LIMIT per SQL semantics.
188    if query.distinct {
189        let mut seen = std::collections::HashSet::new();
190        result.retain(|row| seen.insert(distinct_key(row, &query.columns, &columns)));
191    }
192
193    // Sort — resolve ORDER BY aliases against SELECT list
194    if let Some(ref order_by) = query.order_by {
195        let resolved = resolve_order_aliases(order_by, &query.columns);
196        sort_rows(&mut result, &resolved);
197    }
198
199    // Limit
200    if let Some(limit) = query.limit {
201        result.truncate(limit as usize);
202    }
203
204    // Project — evaluate expressions and strip to requested columns
205    if !matches!(query.columns, ColumnList::All) {
206        let named_exprs = match &query.columns {
207            ColumnList::Named(exprs) => exprs,
208            _ => unreachable!(),
209        };
210
211        // Compute expression columns first, then retain only requested columns.
212        // Skip if aggregation already computed them (re-evaluating would lose
213        // columns that only existed in pre-aggregation rows, e.g. dict fields).
214        let has_expr_cols = named_exprs.iter().any(|e| matches!(e, SelectExpr::Expr { .. }));
215        let already_aggregated = has_aggregates || query.group_by.is_some();
216        if has_expr_cols && !already_aggregated {
217            for row in &mut result {
218                for expr in named_exprs {
219                    if let SelectExpr::Expr { expr: e, alias } = expr {
220                        if e.contains_window() { continue; }
221                        let name = alias.clone().unwrap_or_else(|| e.display_name());
222                        let val = evaluate_expr(e, row);
223                        row.insert(name, val);
224                    }
225                }
226            }
227        }
228
229        let col_set: std::collections::HashSet<&str> =
230            columns.iter().map(|s| s.as_str()).collect();
231        for row in &mut result {
232            row.retain(|k, _| col_set.contains(k.as_str()));
233        }
234    }
235
236    // Null-fill so every result row carries every header column as a key.
237    // A requested column may be absent from a row because it does not exist
238    // on the table at all, or because it is an optional field/section missing
239    // on that row (and SELECT * unions keys across rows). Inserting Null keeps
240    // the column header aligned with each row dict, so consumers that zip
241    // `columns` with row values stay in sync.
242    for row in &mut result {
243        for col in &columns {
244            if !row.contains_key(col) {
245                row.insert(col.clone(), Value::Null);
246            }
247        }
248    }
249
250    Ok((result, columns))
251}
252
253/// Build a dedup key for SELECT DISTINCT from a row's projected output values.
254///
255/// Mirrors how projection resolves each output column: plain columns and
256/// already-computed aggregate/window outputs read from the row; other
257/// expressions are evaluated. Values are serialized via Debug, which
258/// distinguishes variants (Null vs empty string, Int(1) vs Bool(true)).
259fn distinct_key(row: &Row, column_spec: &ColumnList, header: &[String]) -> String {
260    let mut parts: Vec<String> = Vec::new();
261    match column_spec {
262        ColumnList::All => {
263            for col in header {
264                parts.push(format!("{:?}", row.get(col).unwrap_or(&Value::Null)));
265            }
266        }
267        ColumnList::Named(exprs) => {
268            for se in exprs {
269                let val = match se {
270                    SelectExpr::Column(name) => {
271                        row.get(name).cloned().unwrap_or(Value::Null)
272                    }
273                    SelectExpr::Aggregate { .. } => {
274                        row.get(&se.output_name()).cloned().unwrap_or(Value::Null)
275                    }
276                    SelectExpr::Expr { expr, .. } => {
277                        if expr.contains_window() {
278                            row.get(&se.output_name()).cloned().unwrap_or(Value::Null)
279                        } else {
280                            evaluate_expr(expr, row)
281                        }
282                    }
283                };
284                parts.push(format!("{:?}", val));
285            }
286        }
287    }
288    parts.join("\u{1f}")
289}
290
291fn aggregate_rows(
292    rows: &[Row],
293    exprs: &[SelectExpr],
294    group_keys: &[String],
295) -> crate::errors::Result<Vec<Row>> {
296    // Group rows by group_keys
297    let mut groups: Vec<(Vec<Value>, Vec<&Row>)> = Vec::new();
298    let mut key_index: HashMap<Vec<String>, usize> = HashMap::new();
299
300    if group_keys.is_empty() {
301        // No GROUP BY — all rows are one group
302        let all_refs: Vec<&Row> = rows.iter().collect();
303        groups.push((vec![], all_refs));
304    } else {
305        for row in rows {
306            let key: Vec<String> = group_keys
307                .iter()
308                .map(|k| {
309                    row.get(k)
310                        .map(|v| v.to_display_string())
311                        .unwrap_or_default()
312                })
313                .collect();
314            let key_vals: Vec<Value> = group_keys
315                .iter()
316                .map(|k| row.get(k).cloned().unwrap_or(Value::Null))
317                .collect();
318            if let Some(&idx) = key_index.get(&key) {
319                groups[idx].1.push(row);
320            } else {
321                let idx = groups.len();
322                key_index.insert(key, idx);
323                groups.push((key_vals, vec![row]));
324            }
325        }
326    }
327
328    // Compute aggregates per group
329    let mut result = Vec::new();
330    for (key_vals, group_rows) in &groups {
331        let mut out = Row::new();
332
333        // Fill in group key values
334        for (i, k) in group_keys.iter().enumerate() {
335            out.insert(k.clone(), key_vals[i].clone());
336        }
337
338        // Compute each expression
339        for expr in exprs {
340            match expr {
341                SelectExpr::Column(name) => {
342                    // Already filled if it's a group key; otherwise take first row's value
343                    if !out.contains_key(name) {
344                        if let Some(first) = group_rows.first() {
345                            out.insert(
346                                name.clone(),
347                                first.get(name).cloned().unwrap_or(Value::Null),
348                            );
349                        }
350                    }
351                }
352                SelectExpr::Aggregate { func, arg, arg_expr, alias } => {
353                    let out_name = alias
354                        .clone()
355                        .unwrap_or_else(|| expr.output_name());
356                    let val = compute_aggregate(func, arg, arg_expr.as_ref(), group_rows);
357                    out.insert(out_name, val);
358                }
359                SelectExpr::Expr { expr: e, alias } => {
360                    let out_name = alias.clone().unwrap_or_else(|| e.display_name());
361                    if e.contains_aggregate() {
362                        let val = evaluate_agg_expr(e, group_rows);
363                        out.insert(out_name, val);
364                    } else if let Some(first) = group_rows.first() {
365                        let val = evaluate_expr(e, first);
366                        out.insert(out_name, val);
367                    }
368                }
369            }
370        }
371
372        result.push(out);
373    }
374
375    Ok(result)
376}
377
378/// Resolve a per-row value for an aggregate argument.
379/// If `arg_expr` is set, evaluate it; otherwise look up `arg` as a column name.
380fn resolve_agg_value<'a>(arg: &str, arg_expr: Option<&Expr>, row: &'a Row) -> Value {
381    if let Some(expr) = arg_expr {
382        evaluate_expr(expr, row)
383    } else {
384        row.get(arg).cloned().unwrap_or(Value::Null)
385    }
386}
387
388fn compute_aggregate(func: &AggFunc, arg: &str, arg_expr: Option<&Expr>, rows: &[&Row]) -> Value {
389    match func {
390        AggFunc::Count => {
391            if arg == "*" && arg_expr.is_none() {
392                Value::Int(rows.len() as i64)
393            } else {
394                let count = rows
395                    .iter()
396                    .filter(|r| {
397                        let v = resolve_agg_value(arg, arg_expr, r);
398                        !v.is_null()
399                    })
400                    .count();
401                Value::Int(count as i64)
402            }
403        }
404        AggFunc::Sum => {
405            let mut total = 0.0f64;
406            let mut has_any = false;
407            for r in rows {
408                let v = resolve_agg_value(arg, arg_expr, r);
409                match v {
410                    Value::Int(n) => { total += n as f64; has_any = true; }
411                    Value::Float(f) => { total += f; has_any = true; }
412                    _ => {}
413                }
414            }
415            if has_any { Value::Float(total) } else { Value::Null }
416        }
417        AggFunc::Avg => {
418            let mut total = 0.0f64;
419            let mut count = 0usize;
420            for r in rows {
421                let v = resolve_agg_value(arg, arg_expr, r);
422                match v {
423                    Value::Int(n) => { total += n as f64; count += 1; }
424                    Value::Float(f) => { total += f; count += 1; }
425                    _ => {}
426                }
427            }
428            if count > 0 { Value::Float(total / count as f64) } else { Value::Null }
429        }
430        AggFunc::Min => {
431            let mut min_val: Option<Value> = None;
432            for r in rows {
433                let v = resolve_agg_value(arg, arg_expr, r);
434                if v.is_null() { continue; }
435                min_val = Some(match min_val {
436                    None => v,
437                    Some(ref current) => {
438                        if v.partial_cmp(current) == Some(std::cmp::Ordering::Less) {
439                            v
440                        } else {
441                            current.clone()
442                        }
443                    }
444                });
445            }
446            min_val.unwrap_or(Value::Null)
447        }
448        AggFunc::Max => {
449            let mut max_val: Option<Value> = None;
450            for r in rows {
451                let v = resolve_agg_value(arg, arg_expr, r);
452                if v.is_null() { continue; }
453                max_val = Some(match max_val {
454                    None => v,
455                    Some(ref current) => {
456                        if v.partial_cmp(current) == Some(std::cmp::Ordering::Greater) {
457                            v
458                        } else {
459                            current.clone()
460                        }
461                    }
462                });
463            }
464            max_val.unwrap_or(Value::Null)
465        }
466    }
467}
468
469fn compute_windows(rows: &mut Vec<Row>, select_exprs: &[SelectExpr]) -> crate::errors::Result<()> {
470    for se in select_exprs {
471        if let SelectExpr::Expr { expr, alias } = se {
472            if let Expr::Window { func, args, over } = expr {
473                let col_name = alias.clone().unwrap_or_else(|| expr.display_name());
474                compute_single_window(rows, func, args, over, &col_name)?;
475            }
476        }
477    }
478    Ok(())
479}
480
481fn compute_single_window(
482    rows: &mut Vec<Row>,
483    func: &WindowFunc,
484    args: &[Expr],
485    over: &WindowSpec,
486    col_name: &str,
487) -> crate::errors::Result<()> {
488    let mut partitions: Vec<Vec<usize>> = Vec::new();
489    let mut partition_map: HashMap<Vec<String>, usize> = HashMap::new();
490
491    for (i, row) in rows.iter().enumerate() {
492        let key: Vec<String> = over.partition_by.iter()
493            .map(|col| row.get(col).map(|v| v.to_display_string()).unwrap_or_default())
494            .collect();
495        if let Some(&idx) = partition_map.get(&key) {
496            partitions[idx].push(i);
497        } else {
498            let idx = partitions.len();
499            partition_map.insert(key, idx);
500            partitions.push(vec![i]);
501        }
502    }
503
504    for partition in &mut partitions {
505        if !over.order_by.is_empty() {
506            partition.sort_by(|&a, &b| {
507                for spec in &over.order_by {
508                    let (va, vb) = if let Some(ref expr) = spec.expr {
509                        (evaluate_expr(expr, &rows[a]), evaluate_expr(expr, &rows[b]))
510                    } else {
511                        (
512                            rows[a].get(&spec.column).cloned().unwrap_or(Value::Null),
513                            rows[b].get(&spec.column).cloned().unwrap_or(Value::Null),
514                        )
515                    };
516                    let ordering = match (&va, &vb) {
517                        (Value::Null, Value::Null) => Ordering::Equal,
518                        (Value::Null, _) => Ordering::Greater,
519                        (_, Value::Null) => Ordering::Less,
520                        (a_val, b_val) => compare_model_values(a_val, b_val).unwrap_or(Ordering::Equal),
521                    };
522                    let ordering = if spec.descending { ordering.reverse() } else { ordering };
523                    if ordering != Ordering::Equal {
524                        return ordering;
525                    }
526                }
527                Ordering::Equal
528            });
529        }
530    }
531
532    let mut values: Vec<(usize, Value)> = Vec::new();
533
534    for partition in &partitions {
535        match func {
536            WindowFunc::RowNumber => {
537                for (i, &row_idx) in partition.iter().enumerate() {
538                    values.push((row_idx, Value::Int((i + 1) as i64)));
539                }
540            }
541            WindowFunc::Rank => {
542                let mut rank = 1usize;
543                for (i, &row_idx) in partition.iter().enumerate() {
544                    if i > 0 {
545                        let prev_idx = partition[i - 1];
546                        let same = over.order_by.iter().all(|spec| {
547                            let va = if let Some(ref expr) = spec.expr {
548                                evaluate_expr(expr, &rows[prev_idx])
549                            } else {
550                                rows[prev_idx].get(&spec.column).cloned().unwrap_or(Value::Null)
551                            };
552                            let vb = if let Some(ref expr) = spec.expr {
553                                evaluate_expr(expr, &rows[row_idx])
554                            } else {
555                                rows[row_idx].get(&spec.column).cloned().unwrap_or(Value::Null)
556                            };
557                            va == vb
558                        });
559                        if !same {
560                            rank = i + 1;
561                        }
562                    }
563                    values.push((row_idx, Value::Int(rank as i64)));
564                }
565            }
566            WindowFunc::DenseRank => {
567                let mut rank = 1usize;
568                for (i, &row_idx) in partition.iter().enumerate() {
569                    if i > 0 {
570                        let prev_idx = partition[i - 1];
571                        let same = over.order_by.iter().all(|spec| {
572                            let va = if let Some(ref expr) = spec.expr {
573                                evaluate_expr(expr, &rows[prev_idx])
574                            } else {
575                                rows[prev_idx].get(&spec.column).cloned().unwrap_or(Value::Null)
576                            };
577                            let vb = if let Some(ref expr) = spec.expr {
578                                evaluate_expr(expr, &rows[row_idx])
579                            } else {
580                                rows[row_idx].get(&spec.column).cloned().unwrap_or(Value::Null)
581                            };
582                            va == vb
583                        });
584                        if !same {
585                            rank += 1;
586                        }
587                    }
588                    values.push((row_idx, Value::Int(rank as i64)));
589                }
590            }
591            WindowFunc::Lag => {
592                let offset = if args.len() > 1 {
593                    if let Expr::Literal(SqlValue::Int(n)) = &args[1] { *n as usize } else { 1 }
594                } else {
595                    1
596                };
597                for (i, &row_idx) in partition.iter().enumerate() {
598                    let val = if i >= offset && !args.is_empty() {
599                        evaluate_expr(&args[0], &rows[partition[i - offset]])
600                    } else {
601                        Value::Null
602                    };
603                    values.push((row_idx, val));
604                }
605            }
606            WindowFunc::Lead => {
607                let offset = if args.len() > 1 {
608                    if let Expr::Literal(SqlValue::Int(n)) = &args[1] { *n as usize } else { 1 }
609                } else {
610                    1
611                };
612                for (i, &row_idx) in partition.iter().enumerate() {
613                    let val = if i + offset < partition.len() && !args.is_empty() {
614                        evaluate_expr(&args[0], &rows[partition[i + offset]])
615                    } else {
616                        Value::Null
617                    };
618                    values.push((row_idx, val));
619                }
620            }
621            WindowFunc::Agg(agg_func) => {
622                let partition_rows: Vec<&Row> = partition.iter().map(|&i| &rows[i]).collect();
623                let (arg_name, arg_expr_opt) = if args.is_empty() {
624                    ("*".to_string(), None)
625                } else {
626                    (args[0].display_name(), Some(&args[0]))
627                };
628                let agg_val = compute_aggregate(agg_func, &arg_name, arg_expr_opt, &partition_rows);
629                for &row_idx in partition {
630                    values.push((row_idx, agg_val.clone()));
631                }
632            }
633        }
634    }
635
636    for (row_idx, val) in values {
637        rows[row_idx].insert(col_name.to_string(), val);
638    }
639
640    Ok(())
641}
642
643fn evaluate_with_fts(clause: &WhereClause, row: &Row, fts: &FtsResults) -> bool {
644    match clause {
645        WhereClause::BoolOp(bop) => {
646            let left = evaluate_with_fts(&bop.left, row, fts);
647            match bop.op {
648                BoolOpKind::And => left && evaluate_with_fts(&bop.right, row, fts),
649                BoolOpKind::Or => left || evaluate_with_fts(&bop.right, row, fts),
650            }
651        }
652        WhereClause::Comparison(cmp) => {
653            // Check if we have FTS results for this comparison
654            if cmp.op == CmpOp::Like || cmp.op == CmpOp::NotLike {
655                if let Some(SqlValue::String(pattern)) = &cmp.value {
656                    let key = (cmp.column.clone(), pattern.clone());
657                    if let Some(matching_paths) = fts.get(&key) {
658                        let row_path = row.get("path").and_then(|v| v.as_str()).unwrap_or("");
659                        let matched = matching_paths.contains(row_path);
660                        return if cmp.op == CmpOp::Like { matched } else { !matched };
661                    }
662                }
663            }
664            evaluate_comparison(cmp, row)
665        }
666    }
667}
668
669pub use crate::query_join::execute_join_query;
670
671pub(crate) fn execute_inner(
672    query: &SelectQuery,
673    rows: &[Row],
674    index: Option<&crate::index::TableIndex>,
675) -> crate::errors::Result<(Vec<Row>, Vec<String>)> {
676    let empty_fts = HashMap::new();
677    execute_with_fts(query, rows, index, &empty_fts)
678}
679
680pub fn evaluate(clause: &WhereClause, row: &Row) -> bool {
681    match clause {
682        WhereClause::BoolOp(bop) => {
683            let left = evaluate(&bop.left, row);
684            match bop.op {
685                BoolOpKind::And => left && evaluate(&bop.right, row),
686                BoolOpKind::Or => left || evaluate(&bop.right, row),
687            }
688        }
689        WhereClause::Comparison(cmp) => evaluate_comparison(cmp, row),
690    }
691}
692
693/// Evaluate an Expr against a row, returning a Value.
694pub(crate) fn evaluate_expr(expr: &Expr, row: &Row) -> Value {
695    match expr {
696        Expr::Literal(SqlValue::Int(n)) => Value::Int(*n),
697        Expr::Literal(SqlValue::Float(f)) => Value::Float(*f),
698        Expr::Literal(SqlValue::String(s)) => Value::String(s.clone()),
699        Expr::Literal(SqlValue::Bool(b)) => Value::Bool(*b),
700        Expr::Literal(SqlValue::Null) => Value::Null,
701        Expr::Literal(SqlValue::List(_)) => Value::Null,
702        Expr::Column(name) => {
703            if let Some(val) = row.get(name) {
704                return val.clone();
705            }
706            // Try all possible dot splits for dict access (e.g. "s.params.key")
707            for (i, _) in name.match_indices('.') {
708                let dict_col = &name[..i];
709                let dict_key = &name[i + 1..];
710                if let Some(Value::Dict(map)) = row.get(dict_col) {
711                    return map.get(dict_key).cloned().unwrap_or(Value::Null);
712                }
713            }
714            Value::Null
715        }
716        Expr::UnaryMinus(inner) => {
717            match evaluate_expr(inner, row) {
718                Value::Int(n) => Value::Int(-n),
719                Value::Float(f) => Value::Float(-f),
720                Value::Null => Value::Null,
721                _ => Value::Null, // non-numeric → NULL
722            }
723        }
724        Expr::BinaryOp { left, op, right } => {
725            let lv = evaluate_expr(left, row);
726            let rv = evaluate_expr(right, row);
727
728            // NULL propagation: any NULL operand → NULL
729            if lv.is_null() || rv.is_null() {
730                return Value::Null;
731            }
732
733            // Extract numeric values with int→float coercion
734            match (&lv, &rv) {
735                (Value::Int(a), Value::Int(b)) => {
736                    match op {
737                        ArithOp::Add => Value::Int(a.wrapping_add(*b)),
738                        ArithOp::Sub => Value::Int(a.wrapping_sub(*b)),
739                        ArithOp::Mul => Value::Int(a.wrapping_mul(*b)),
740                        ArithOp::Div => {
741                            if *b == 0 { Value::Null } else { Value::Int(a / b) }
742                        }
743                        ArithOp::Mod => {
744                            if *b == 0 { Value::Null } else { Value::Int(a % b) }
745                        }
746                    }
747                }
748                _ => {
749                    // Coerce to float
750                    let a = match &lv {
751                        Value::Int(n) => *n as f64,
752                        Value::Float(f) => *f,
753                        _ => return Value::Null,
754                    };
755                    let b = match &rv {
756                        Value::Int(n) => *n as f64,
757                        Value::Float(f) => *f,
758                        _ => return Value::Null,
759                    };
760                    match op {
761                        ArithOp::Add => Value::Float(a + b),
762                        ArithOp::Sub => Value::Float(a - b),
763                        ArithOp::Mul => Value::Float(a * b),
764                        ArithOp::Div => {
765                            if b == 0.0 { Value::Null } else { Value::Float(a / b) }
766                        }
767                        ArithOp::Mod => {
768                            if b == 0.0 { Value::Null } else { Value::Float(a % b) }
769                        }
770                    }
771                }
772            }
773        }
774        Expr::Case { whens, else_expr } => {
775            for (condition, result) in whens {
776                if evaluate(condition, row) {
777                    return evaluate_expr(result, row);
778                }
779            }
780            match else_expr {
781                Some(e) => evaluate_expr(e, row),
782                None => Value::Null,
783            }
784        }
785        Expr::CurrentDate => {
786            Value::Date(chrono::Local::now().naive_local().date())
787        }
788        Expr::CurrentTimestamp => {
789            Value::DateTime(chrono::Local::now().naive_local())
790        }
791        Expr::DateAdd { date, days } => {
792            let date_val = evaluate_expr(date, row);
793            let days_val = evaluate_expr(days, row);
794            let n = match &days_val {
795                Value::Int(n) => *n,
796                Value::Float(f) => *f as i64,
797                _ => return Value::Null,
798            };
799            let duration = chrono::Duration::days(n);
800            match date_val {
801                Value::Date(d) => {
802                    match d.checked_add_signed(duration) {
803                        Some(result) => Value::Date(result),
804                        None => Value::Null,
805                    }
806                }
807                Value::DateTime(dt) => {
808                    match dt.checked_add_signed(duration) {
809                        Some(result) => Value::DateTime(result),
810                        None => Value::Null,
811                    }
812                }
813                _ => Value::Null,
814            }
815        }
816        Expr::DateDiff { left, right } => {
817            let lv = evaluate_expr(left, row);
818            let rv = evaluate_expr(right, row);
819            let left_date = match &lv {
820                Value::Date(d) => d.and_hms_opt(0, 0, 0).unwrap(),
821                Value::DateTime(dt) => *dt,
822                _ => return Value::Null,
823            };
824            let right_date = match &rv {
825                Value::Date(d) => d.and_hms_opt(0, 0, 0).unwrap(),
826                Value::DateTime(dt) => *dt,
827                _ => return Value::Null,
828            };
829            Value::Int((left_date - right_date).num_days())
830        }
831        Expr::Aggregate { func, arg, .. } => {
832            // Post-aggregation: look up the pre-computed column name
833            let func_name = match func {
834                AggFunc::Count => "COUNT",
835                AggFunc::Sum => "SUM",
836                AggFunc::Avg => "AVG",
837                AggFunc::Min => "MIN",
838                AggFunc::Max => "MAX",
839            };
840            let col = format!("{}({})", func_name, arg);
841            row.get(&col).cloned().unwrap_or(Value::Null)
842        }
843        Expr::Subquery(_) => Value::Null,
844        Expr::Window { .. } => {
845            let display = expr.display_name();
846            row.get(&display).cloned().unwrap_or(Value::Null)
847        }
848    }
849}
850
851fn evaluate_agg_expr(expr: &Expr, group_rows: &[&Row]) -> Value {
852    match expr {
853        Expr::Aggregate { func, arg, arg_expr } => {
854            compute_aggregate(func, arg, arg_expr.as_deref(), group_rows)
855        }
856        Expr::BinaryOp { left, op, right } => {
857            let lv = evaluate_agg_expr(left, group_rows);
858            let rv = evaluate_agg_expr(right, group_rows);
859            apply_arith_op(op, &lv, &rv)
860        }
861        Expr::UnaryMinus(inner) => {
862            match evaluate_agg_expr(inner, group_rows) {
863                Value::Int(n) => Value::Int(-n),
864                Value::Float(f) => Value::Float(-f),
865                _ => Value::Null,
866            }
867        }
868        other => {
869            if let Some(first) = group_rows.first() {
870                evaluate_expr(other, first)
871            } else {
872                Value::Null
873            }
874        }
875    }
876}
877
878fn apply_arith_op(op: &ArithOp, lv: &Value, rv: &Value) -> Value {
879    if lv.is_null() || rv.is_null() {
880        return Value::Null;
881    }
882    match (lv, rv) {
883        (Value::Int(a), Value::Int(b)) => match op {
884            ArithOp::Add => Value::Int(a.wrapping_add(*b)),
885            ArithOp::Sub => Value::Int(a.wrapping_sub(*b)),
886            ArithOp::Mul => Value::Int(a.wrapping_mul(*b)),
887            ArithOp::Div => if *b == 0 { Value::Null } else { Value::Int(a / b) },
888            ArithOp::Mod => if *b == 0 { Value::Null } else { Value::Int(a % b) },
889        },
890        _ => {
891            let a = match lv {
892                Value::Int(n) => *n as f64,
893                Value::Float(f) => *f,
894                _ => return Value::Null,
895            };
896            let b = match rv {
897                Value::Int(n) => *n as f64,
898                Value::Float(f) => *f,
899                _ => return Value::Null,
900            };
901            match op {
902                ArithOp::Add => Value::Float(a + b),
903                ArithOp::Sub => Value::Float(a - b),
904                ArithOp::Mul => Value::Float(a * b),
905                ArithOp::Div => if b == 0.0 { Value::Null } else { Value::Float(a / b) },
906                ArithOp::Mod => if b == 0.0 { Value::Null } else { Value::Float(a % b) },
907            }
908        }
909    }
910}
911
912fn evaluate_comparison(cmp: &Comparison, row: &Row) -> bool {
913    // If we have expression-based comparison (new path), use it for standard ops
914    if let (Some(left_expr), Some(right_expr)) = (&cmp.left_expr, &cmp.right_expr) {
915        if matches!(cmp.op, CmpOp::Eq | CmpOp::Ne | CmpOp::Lt | CmpOp::Gt | CmpOp::Le | CmpOp::Ge) {
916            let left_val = evaluate_expr(left_expr, row);
917            let right_val = evaluate_expr(right_expr, row);
918
919            // NULL comparison: always false (except IS NULL handled below)
920            if left_val.is_null() || right_val.is_null() {
921                return false;
922            }
923
924            // Coerce for comparison: if types differ, try int→float
925            let ord = compare_model_values(&left_val, &right_val);
926
927            return match cmp.op {
928                CmpOp::Eq => ord == Some(Ordering::Equal),
929                CmpOp::Ne => ord != Some(Ordering::Equal),
930                CmpOp::Lt => ord == Some(Ordering::Less),
931                CmpOp::Gt => ord == Some(Ordering::Greater),
932                CmpOp::Le => matches!(ord, Some(Ordering::Less | Ordering::Equal)),
933                CmpOp::Ge => matches!(ord, Some(Ordering::Greater | Ordering::Equal)),
934                _ => false,
935            };
936        }
937    }
938
939    // Fall back to legacy column-based comparison for IS NULL, IN, LIKE, etc.
940    let actual = row.get(&cmp.column);
941
942    if cmp.op == CmpOp::IsNull {
943        return actual.map_or(true, |v| v.is_null());
944    }
945    if cmp.op == CmpOp::IsNotNull {
946        return actual.map_or(false, |v| !v.is_null());
947    }
948
949    let actual = match actual {
950        Some(v) if !v.is_null() => v,
951        _ => return false,
952    };
953
954    let expected = match &cmp.value {
955        Some(v) => v,
956        None => return false,
957    };
958
959    match cmp.op {
960        CmpOp::Eq => eq_match(actual, expected),
961        CmpOp::Ne => !eq_match(actual, expected),
962        CmpOp::Lt => compare_values(actual, expected) == Some(Ordering::Less),
963        CmpOp::Gt => compare_values(actual, expected) == Some(Ordering::Greater),
964        CmpOp::Le => matches!(compare_values(actual, expected), Some(Ordering::Less | Ordering::Equal)),
965        CmpOp::Ge => matches!(compare_values(actual, expected), Some(Ordering::Greater | Ordering::Equal)),
966        CmpOp::Like => like_match(actual, expected),
967        CmpOp::NotLike => !like_match(actual, expected),
968        CmpOp::In => {
969            if let SqlValue::List(items) = expected {
970                items.iter().any(|v| eq_match(actual, v))
971            } else {
972                eq_match(actual, expected)
973            }
974        }
975        CmpOp::IsNull | CmpOp::IsNotNull => unreachable!(),
976    }
977}
978
979/// Compare two model::Value instances, with int↔float coercion.
980fn compare_model_values(a: &Value, b: &Value) -> Option<Ordering> {
981    match (a, b) {
982        (Value::Int(x), Value::Float(y)) => (*x as f64).partial_cmp(y),
983        (Value::Float(x), Value::Int(y)) => x.partial_cmp(&(*y as f64)),
984        _ => a.partial_cmp(b),
985    }
986}
987
988fn coerce_sql_to_value(sql_val: &SqlValue, target: &Value) -> Value {
989    match sql_val {
990        SqlValue::Null => Value::Null,
991        SqlValue::String(s) => {
992            match target {
993                Value::Int(_) => s.parse::<i64>().map(Value::Int).unwrap_or(Value::String(s.clone())),
994                Value::Float(_) => s.parse::<f64>().map(Value::Float).unwrap_or(Value::String(s.clone())),
995                Value::Date(_) => {
996                    chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d")
997                        .map(Value::Date)
998                        .unwrap_or(Value::String(s.clone()))
999                }
1000                Value::DateTime(_) => {
1001                    chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S")
1002                        .or_else(|_| chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f"))
1003                        .map(Value::DateTime)
1004                        .unwrap_or(Value::String(s.clone()))
1005                }
1006                _ => Value::String(s.clone()),
1007            }
1008        }
1009        SqlValue::Int(n) => {
1010            match target {
1011                Value::Float(_) => Value::Float(*n as f64),
1012                _ => Value::Int(*n),
1013            }
1014        }
1015        SqlValue::Float(f) => Value::Float(*f),
1016        SqlValue::Bool(b) => Value::Bool(*b),
1017        SqlValue::List(_) => Value::Null, // Lists handled separately
1018    }
1019}
1020
1021fn eq_match(actual: &Value, expected: &SqlValue) -> bool {
1022    // Special handling for lists (e.g., categories)
1023    if let Value::List(items) = actual {
1024        if let SqlValue::String(s) = expected {
1025            return items.contains(s);
1026        }
1027    }
1028
1029    let coerced = coerce_sql_to_value(expected, actual);
1030    actual == &coerced
1031}
1032
1033fn like_match(actual: &Value, pattern: &SqlValue) -> bool {
1034    let pattern_str = match pattern {
1035        SqlValue::String(s) => s,
1036        _ => return false,
1037    };
1038
1039    // Convert SQL LIKE to regex
1040    let mut regex_str = String::from("(?is)^");
1041    for ch in pattern_str.chars() {
1042        match ch {
1043            '%' => regex_str.push_str(".*"),
1044            '_' => regex_str.push('.'),
1045            c => {
1046                if regex::escape(&c.to_string()) != c.to_string() {
1047                    regex_str.push_str(&regex::escape(&c.to_string()));
1048                } else {
1049                    regex_str.push(c);
1050                }
1051            }
1052        }
1053    }
1054    regex_str.push('$');
1055
1056    let re = match Regex::new(&regex_str) {
1057        Ok(r) => r,
1058        Err(_) => return false,
1059    };
1060
1061    match actual {
1062        Value::List(items) => items.iter().any(|item| re.is_match(item)),
1063        _ => re.is_match(&actual.to_display_string()),
1064    }
1065}
1066
1067fn compare_values(actual: &Value, expected: &SqlValue) -> Option<Ordering> {
1068    let coerced = coerce_sql_to_value(expected, actual);
1069    actual.partial_cmp(&coerced)
1070}
1071
1072/// Convert a SqlValue to a Value for index lookups (without a target type for coercion).
1073fn sql_value_to_index_value(sv: &SqlValue) -> Value {
1074    match sv {
1075        SqlValue::String(s) => {
1076            // Try datetime first (more specific)
1077            if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
1078                return Value::DateTime(dt);
1079            }
1080            if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f") {
1081                return Value::DateTime(dt);
1082            }
1083            // Try date
1084            if let Ok(d) = chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d") {
1085                return Value::Date(d);
1086            }
1087            Value::String(s.clone())
1088        }
1089        SqlValue::Int(n) => Value::Int(*n),
1090        SqlValue::Float(f) => Value::Float(*f),
1091        SqlValue::Bool(b) => Value::Bool(*b),
1092        SqlValue::Null => Value::Null,
1093        SqlValue::List(_) => Value::Null,
1094    }
1095}
1096
1097/// Try to use B-tree indexes to narrow the candidate row set.
1098/// Returns Some(paths) if the entire WHERE clause could be resolved via index,
1099/// or None if a full scan is needed.
1100fn try_index_filter(
1101    clause: &WhereClause,
1102    index: &crate::index::TableIndex,
1103) -> Option<std::collections::HashSet<String>> {
1104    match clause {
1105        WhereClause::Comparison(cmp) => {
1106            if !index.has_index(&cmp.column) {
1107                return None;
1108            }
1109            match cmp.op {
1110                CmpOp::Eq => {
1111                    let val = sql_value_to_index_value(cmp.value.as_ref()?);
1112                    let paths = index.lookup_eq(&cmp.column, &val);
1113                    Some(paths.into_iter().map(|s| s.to_string()).collect())
1114                }
1115                CmpOp::Lt => {
1116                    let val = sql_value_to_index_value(cmp.value.as_ref()?);
1117                    // exclusive upper bound: use range with max < val
1118                    // lookup_range is inclusive, so we get all <= val then remove exact matches
1119                    let range_paths = index.lookup_range(&cmp.column, None, Some(&val));
1120                    let eq_paths: std::collections::HashSet<&str> = index.lookup_eq(&cmp.column, &val).into_iter().collect();
1121                    Some(range_paths.into_iter().filter(|p| !eq_paths.contains(p)).map(|s| s.to_string()).collect())
1122                }
1123                CmpOp::Gt => {
1124                    let val = sql_value_to_index_value(cmp.value.as_ref()?);
1125                    let range_paths = index.lookup_range(&cmp.column, Some(&val), None);
1126                    let eq_paths: std::collections::HashSet<&str> = index.lookup_eq(&cmp.column, &val).into_iter().collect();
1127                    Some(range_paths.into_iter().filter(|p| !eq_paths.contains(p)).map(|s| s.to_string()).collect())
1128                }
1129                CmpOp::Le => {
1130                    let val = sql_value_to_index_value(cmp.value.as_ref()?);
1131                    let paths = index.lookup_range(&cmp.column, None, Some(&val));
1132                    Some(paths.into_iter().map(|s| s.to_string()).collect())
1133                }
1134                CmpOp::Ge => {
1135                    let val = sql_value_to_index_value(cmp.value.as_ref()?);
1136                    let paths = index.lookup_range(&cmp.column, Some(&val), None);
1137                    Some(paths.into_iter().map(|s| s.to_string()).collect())
1138                }
1139                CmpOp::In => {
1140                    if let Some(SqlValue::List(items)) = &cmp.value {
1141                        let vals: Vec<Value> = items.iter().map(sql_value_to_index_value).collect();
1142                        let paths = index.lookup_in(&cmp.column, &vals);
1143                        Some(paths.into_iter().map(|s| s.to_string()).collect())
1144                    } else {
1145                        None
1146                    }
1147                }
1148                _ => None, // LIKE, IS NULL, etc. can't use index
1149            }
1150        }
1151        WhereClause::BoolOp(bop) => {
1152            let left = try_index_filter(&bop.left, index);
1153            let right = try_index_filter(&bop.right, index);
1154            match bop.op {
1155                BoolOpKind::And => {
1156                    match (left, right) {
1157                        (Some(l), Some(r)) => Some(l.intersection(&r).cloned().collect()),
1158                        (Some(l), None) => Some(l), // narrow with left, scan-verify right
1159                        (None, Some(r)) => Some(r),
1160                        (None, None) => None,
1161                    }
1162                }
1163                BoolOpKind::Or => {
1164                    match (left, right) {
1165                        (Some(l), Some(r)) => Some(l.union(&r).cloned().collect()),
1166                        _ => None, // Can't use index if either side needs full scan
1167                    }
1168                }
1169            }
1170        }
1171    }
1172}
1173
1174/// If an ORDER BY column matches a SELECT alias, replace its expr with the
1175/// aliased expression so sorting uses the computed value.
1176fn resolve_order_aliases(specs: &[OrderSpec], columns: &ColumnList) -> Vec<OrderSpec> {
1177    let named = match columns {
1178        ColumnList::Named(exprs) => exprs,
1179        _ => return specs.to_vec(),
1180    };
1181
1182    // Build alias → expr map (skip window exprs — their values are already in rows)
1183    let alias_map: HashMap<String, &Expr> = named
1184        .iter()
1185        .filter_map(|se| match se {
1186            SelectExpr::Expr { expr, alias: Some(a) } if !expr.contains_window() => {
1187                Some((a.clone(), expr))
1188            }
1189            _ => None,
1190        })
1191        .collect();
1192
1193    specs
1194        .iter()
1195        .map(|spec| {
1196            // If the ORDER BY column name matches a SELECT alias, use that expression
1197            if let Some(expr) = alias_map.get(&spec.column) {
1198                OrderSpec {
1199                    column: spec.column.clone(),
1200                    expr: Some((*expr).clone()),
1201                    descending: spec.descending,
1202                }
1203            } else {
1204                spec.clone()
1205            }
1206        })
1207        .collect()
1208}
1209
1210fn sort_rows(rows: &mut Vec<Row>, specs: &[OrderSpec]) {
1211    rows.sort_by(|a, b| {
1212        for spec in specs {
1213            let (va, vb) = if let Some(ref expr) = spec.expr {
1214                (evaluate_expr(expr, a), evaluate_expr(expr, b))
1215            } else {
1216                (
1217                    a.get(&spec.column).cloned().unwrap_or(Value::Null),
1218                    b.get(&spec.column).cloned().unwrap_or(Value::Null),
1219                )
1220            };
1221
1222            // NULLs sort last
1223            let ordering = match (&va, &vb) {
1224                (Value::Null, Value::Null) => Ordering::Equal,
1225                (Value::Null, _) => Ordering::Greater,
1226                (_, Value::Null) => Ordering::Less,
1227                (a_val, b_val) => {
1228                    compare_model_values(a_val, b_val).unwrap_or(Ordering::Equal)
1229                }
1230            };
1231
1232            let ordering = if spec.descending {
1233                ordering.reverse()
1234            } else {
1235                ordering
1236            };
1237
1238            if ordering != Ordering::Equal {
1239                return ordering;
1240            }
1241        }
1242        Ordering::Equal
1243    });
1244}
1245
1246/// Convert a SqlValue to our model Value (for use in insert/update).
1247pub(crate) fn sql_value_to_value(sql_val: &SqlValue) -> Value {
1248    match sql_val {
1249        SqlValue::Null => Value::Null,
1250        SqlValue::String(s) => Value::String(s.clone()),
1251        SqlValue::Int(n) => Value::Int(*n),
1252        SqlValue::Float(f) => Value::Float(*f),
1253        SqlValue::Bool(b) => Value::Bool(*b),
1254        SqlValue::List(items) => {
1255            let strings: Vec<String> = items
1256                .iter()
1257                .filter_map(|v| match v {
1258                    SqlValue::String(s) => Some(s.clone()),
1259                    _ => None,
1260                })
1261                .collect();
1262            Value::List(strings)
1263        }
1264    }
1265}
1266
1267#[cfg(test)]
1268mod tests {
1269    use super::*;
1270
1271    fn make_rows() -> Vec<Row> {
1272        vec![
1273            Row::from([
1274                ("path".into(), Value::String("a.md".into())),
1275                ("title".into(), Value::String("Alpha".into())),
1276                ("count".into(), Value::Int(10)),
1277            ]),
1278            Row::from([
1279                ("path".into(), Value::String("b.md".into())),
1280                ("title".into(), Value::String("Beta".into())),
1281                ("count".into(), Value::Int(5)),
1282            ]),
1283            Row::from([
1284                ("path".into(), Value::String("c.md".into())),
1285                ("title".into(), Value::String("Gamma".into())),
1286                ("count".into(), Value::Int(20)),
1287            ]),
1288        ]
1289    }
1290
1291    #[test]
1292    fn test_select_all() {
1293        let q = SelectQuery {
1294            distinct: false,
1295            columns: ColumnList::All,
1296            table: "test".into(),
1297            table_alias: None,
1298            subquery: None,
1299            joins: vec![],
1300            where_clause: None,
1301            group_by: None,
1302            having: None,
1303            order_by: None,
1304            limit: None,
1305            ctes: vec![],
1306        };
1307        let (rows, _cols) = execute_inner(&q, &make_rows(), None).unwrap();
1308        assert_eq!(rows.len(), 3);
1309    }
1310
1311    #[test]
1312    fn test_select_nonexistent_column_null_filled() {
1313        // SELECT naming a column absent from the table must keep header and
1314        // rows aligned: the unknown column appears in every row as Null.
1315        let q = parse_query("SELECT title, missing_col, count FROM test").unwrap();
1316        let q = match q {
1317            Statement::Select(s) => s,
1318            _ => panic!("expected SELECT"),
1319        };
1320        let (rows, cols) = execute_inner(&q, &make_rows(), None).unwrap();
1321        assert_eq!(cols, vec!["title", "missing_col", "count"]);
1322        assert_eq!(rows.len(), 3);
1323        for row in &rows {
1324            assert_eq!(row.len(), cols.len(), "row keys must match header length");
1325            for c in &cols {
1326                assert!(row.contains_key(c), "row missing header column {c}");
1327            }
1328            assert_eq!(row.get("missing_col"), Some(&Value::Null));
1329        }
1330    }
1331
1332    #[test]
1333    fn test_select_duplicate_output_column_errors() {
1334        // A result row is keyed by output name; two columns with the same name
1335        // cannot both be represented, so the query must be rejected.
1336        let q = parse_query("SELECT title, title FROM test").unwrap();
1337        let q = match q {
1338            Statement::Select(s) => s,
1339            _ => panic!("expected SELECT"),
1340        };
1341        let err = execute_inner(&q, &make_rows(), None);
1342        assert!(err.is_err());
1343        let msg = err.unwrap_err().to_string();
1344        assert!(msg.contains("duplicate output column"), "got: {msg}");
1345    }
1346
1347    #[test]
1348    fn test_select_all_sparse_rows_aligned() {
1349        // SELECT * unions keys across rows; a row missing an optional field
1350        // must still carry every header column (as Null) so header/rows align.
1351        let rows = vec![
1352            Row::from([
1353                ("path".into(), Value::String("a.md".into())),
1354                ("title".into(), Value::String("Alpha".into())),
1355                ("kill_reason".into(), Value::String("no edge".into())),
1356            ]),
1357            Row::from([
1358                ("path".into(), Value::String("b.md".into())),
1359                ("title".into(), Value::String("Beta".into())),
1360            ]),
1361        ];
1362        let q = parse_query("SELECT * FROM test").unwrap();
1363        let q = match q {
1364            Statement::Select(s) => s,
1365            _ => panic!("expected SELECT"),
1366        };
1367        let (result, cols) = execute_inner(&q, &rows, None).unwrap();
1368        assert!(cols.contains(&"kill_reason".to_string()));
1369        for row in &result {
1370            assert_eq!(row.len(), cols.len(), "row keys must match header length");
1371            for c in &cols {
1372                assert!(row.contains_key(c), "row missing header column {c}");
1373            }
1374        }
1375        // The row that lacked kill_reason now carries it as Null.
1376        let beta = result.iter().find(|r| r.get("path") == Some(&Value::String("b.md".into()))).unwrap();
1377        assert_eq!(beta.get("kill_reason"), Some(&Value::Null));
1378    }
1379
1380    #[test]
1381    fn test_where_gt() {
1382        let q = SelectQuery {
1383            distinct: false,
1384            columns: ColumnList::All,
1385            table: "test".into(),
1386            table_alias: None,
1387            subquery: None,
1388            joins: vec![],
1389            where_clause: Some(WhereClause::Comparison(Comparison {
1390                column: "count".into(),
1391                op: CmpOp::Gt,
1392                value: Some(SqlValue::Int(5)),
1393                left_expr: Some(Expr::Column("count".into())),
1394                right_expr: Some(Expr::Literal(SqlValue::Int(5))),
1395            })),
1396            group_by: None,
1397            having: None,
1398            order_by: None,
1399            limit: None,
1400            ctes: vec![],
1401        };
1402        let (rows, _) = execute_inner(&q, &make_rows(), None).unwrap();
1403        assert_eq!(rows.len(), 2);
1404    }
1405
1406    #[test]
1407    fn test_order_by_desc() {
1408        let q = SelectQuery {
1409            distinct: false,
1410            columns: ColumnList::All,
1411            table: "test".into(),
1412            table_alias: None,
1413            subquery: None,
1414            joins: vec![],
1415            where_clause: None,
1416            group_by: None,
1417            having: None,
1418            order_by: Some(vec![OrderSpec {
1419                column: "count".into(),
1420                expr: Some(Expr::Column("count".into())),
1421                descending: true,
1422            }]),
1423            limit: None,
1424            ctes: vec![],
1425        };
1426        let (rows, _) = execute_inner(&q, &make_rows(), None).unwrap();
1427        assert_eq!(rows[0]["count"], Value::Int(20));
1428        assert_eq!(rows[2]["count"], Value::Int(5));
1429    }
1430
1431    #[test]
1432    fn test_limit() {
1433        let q = SelectQuery {
1434            distinct: false,
1435            columns: ColumnList::All,
1436            table: "test".into(),
1437            table_alias: None,
1438            subquery: None,
1439            joins: vec![],
1440            where_clause: None,
1441            group_by: None,
1442            having: None,
1443            order_by: None,
1444            limit: Some(2),
1445            ctes: vec![],
1446        };
1447        let (rows, _) = execute_inner(&q, &make_rows(), None).unwrap();
1448        assert_eq!(rows.len(), 2);
1449    }
1450
1451    #[test]
1452    fn test_like() {
1453        let q = SelectQuery {
1454            distinct: false,
1455            columns: ColumnList::All,
1456            table: "test".into(),
1457            table_alias: None,
1458            subquery: None,
1459            joins: vec![],
1460            where_clause: Some(WhereClause::Comparison(Comparison {
1461                column: "title".into(),
1462                op: CmpOp::Like,
1463                value: Some(SqlValue::String("%lph%".into())),
1464                left_expr: Some(Expr::Column("title".into())),
1465                right_expr: None,
1466            })),
1467            group_by: None,
1468            having: None,
1469            order_by: None,
1470            limit: None,
1471            ctes: vec![],
1472        };
1473        let (rows, _) = execute_inner(&q, &make_rows(), None).unwrap();
1474        assert_eq!(rows.len(), 1);
1475        assert_eq!(rows[0]["title"], Value::String("Alpha".into()));
1476    }
1477
1478    #[test]
1479    fn test_is_null() {
1480        let mut rows = make_rows();
1481        rows[1].insert("optional".into(), Value::Null);
1482
1483        let q = SelectQuery {
1484            distinct: false,
1485            columns: ColumnList::All,
1486            table: "test".into(),
1487            table_alias: None,
1488            subquery: None,
1489            joins: vec![],
1490            where_clause: Some(WhereClause::Comparison(Comparison {
1491                column: "optional".into(),
1492                op: CmpOp::IsNull,
1493                value: None,
1494                left_expr: Some(Expr::Column("optional".into())),
1495                right_expr: None,
1496            })),
1497            group_by: None,
1498            having: None,
1499            order_by: None,
1500            limit: None,
1501            ctes: vec![],
1502        };
1503        let (result, _) = execute_inner(&q, &rows, None).unwrap();
1504        // All rows where optional is NULL or missing
1505        assert_eq!(result.len(), 3);
1506    }
1507
1508    #[test]
1509    fn test_select_distinct_dedupes_and_projects() {
1510        // #61: DISTINCT returned every row with the column nulled.
1511        let rows = vec![
1512            Row::from([("path".into(), Value::String("1.md".into())), ("strategy".into(), Value::String("a".into()))]),
1513            Row::from([("path".into(), Value::String("2.md".into())), ("strategy".into(), Value::String("b".into()))]),
1514            Row::from([("path".into(), Value::String("3.md".into())), ("strategy".into(), Value::String("a".into()))]),
1515        ];
1516        let q = match parse_query("SELECT DISTINCT strategy FROM backtests").unwrap() {
1517            Statement::Select(s) => s,
1518            _ => panic!("expected SELECT"),
1519        };
1520        let (result, cols) = execute_inner(&q, &rows, None).unwrap();
1521        assert_eq!(cols, vec!["strategy"]);
1522        assert_eq!(result.len(), 2);
1523        let values: Vec<_> = result.iter().map(|r| r.get("strategy").unwrap().clone()).collect();
1524        assert_eq!(values, vec![Value::String("a".into()), Value::String("b".into())]);
1525    }
1526
1527    #[test]
1528    fn test_select_distinct_applies_before_limit() {
1529        // SQL semantics: dedupe first, then LIMIT.
1530        let rows = vec![
1531            Row::from([("path".into(), Value::String("1.md".into())), ("s".into(), Value::String("a".into()))]),
1532            Row::from([("path".into(), Value::String("2.md".into())), ("s".into(), Value::String("a".into()))]),
1533            Row::from([("path".into(), Value::String("3.md".into())), ("s".into(), Value::String("b".into()))]),
1534            Row::from([("path".into(), Value::String("4.md".into())), ("s".into(), Value::String("c".into()))]),
1535        ];
1536        let q = match parse_query("SELECT DISTINCT s FROM t ORDER BY s LIMIT 2").unwrap() {
1537            Statement::Select(s) => s,
1538            _ => panic!("expected SELECT"),
1539        };
1540        let (result, _) = execute_inner(&q, &rows, None).unwrap();
1541        // Naive limit-then-dedupe would give ["a"] only.
1542        assert_eq!(result.len(), 2);
1543        assert_eq!(result[0].get("s"), Some(&Value::String("a".into())));
1544        assert_eq!(result[1].get("s"), Some(&Value::String("b".into())));
1545    }
1546
1547    #[test]
1548    fn test_select_distinct_star() {
1549        let rows = vec![
1550            Row::from([("path".into(), Value::String("1.md".into())), ("s".into(), Value::String("a".into()))]),
1551            Row::from([("path".into(), Value::String("1.md".into())), ("s".into(), Value::String("a".into()))]),
1552            Row::from([("path".into(), Value::String("2.md".into())), ("s".into(), Value::String("b".into()))]),
1553        ];
1554        let q = match parse_query("SELECT DISTINCT * FROM t").unwrap() {
1555            Statement::Select(s) => s,
1556            _ => panic!("expected SELECT"),
1557        };
1558        let (result, _) = execute_inner(&q, &rows, None).unwrap();
1559        assert_eq!(result.len(), 2);
1560    }
1561
1562    #[test]
1563    fn test_where_boolean_literal_filters_rows() {
1564        // #60 end-to-end: `flag = true` must match Bool rows, not evaluate
1565        // against a nonexistent column named "true" and return nothing.
1566        let mut rows = make_rows();
1567        rows[0].insert("flag".into(), Value::Bool(true));
1568        rows[1].insert("flag".into(), Value::Bool(false));
1569        rows[2].insert("flag".into(), Value::Bool(true));
1570
1571        for (sql, expected) in [
1572            ("SELECT path FROM test WHERE flag = true", 2),
1573            ("SELECT path FROM test WHERE flag = FALSE", 1),
1574            ("SELECT path FROM test WHERE flag != true", 1),
1575        ] {
1576            let q = match parse_query(sql).unwrap() {
1577                Statement::Select(s) => s,
1578                _ => panic!("expected SELECT"),
1579            };
1580            let (result, _) = execute_inner(&q, &rows, None).unwrap();
1581            assert_eq!(result.len(), expected, "query: {sql}");
1582        }
1583    }
1584
1585    // ── Expression evaluation tests ─────────────────────────��─────
1586
1587    #[test]
1588    fn test_evaluate_expr_literal() {
1589        let row = Row::new();
1590        assert_eq!(evaluate_expr(&Expr::Literal(SqlValue::Int(42)), &row), Value::Int(42));
1591        assert_eq!(evaluate_expr(&Expr::Literal(SqlValue::Float(3.14)), &row), Value::Float(3.14));
1592        assert_eq!(evaluate_expr(&Expr::Literal(SqlValue::Null), &row), Value::Null);
1593    }
1594
1595    #[test]
1596    fn test_evaluate_expr_column() {
1597        let row = Row::from([("x".into(), Value::Int(10))]);
1598        assert_eq!(evaluate_expr(&Expr::Column("x".into()), &row), Value::Int(10));
1599        assert_eq!(evaluate_expr(&Expr::Column("missing".into()), &row), Value::Null);
1600    }
1601
1602    #[test]
1603    fn test_evaluate_expr_int_arithmetic() {
1604        let row = Row::from([("a".into(), Value::Int(10)), ("b".into(), Value::Int(3))]);
1605        let add = Expr::BinaryOp {
1606            left: Box::new(Expr::Column("a".into())),
1607            op: ArithOp::Add,
1608            right: Box::new(Expr::Column("b".into())),
1609        };
1610        assert_eq!(evaluate_expr(&add, &row), Value::Int(13));
1611
1612        let sub = Expr::BinaryOp {
1613            left: Box::new(Expr::Column("a".into())),
1614            op: ArithOp::Sub,
1615            right: Box::new(Expr::Column("b".into())),
1616        };
1617        assert_eq!(evaluate_expr(&sub, &row), Value::Int(7));
1618
1619        let mul = Expr::BinaryOp {
1620            left: Box::new(Expr::Column("a".into())),
1621            op: ArithOp::Mul,
1622            right: Box::new(Expr::Column("b".into())),
1623        };
1624        assert_eq!(evaluate_expr(&mul, &row), Value::Int(30));
1625
1626        let div = Expr::BinaryOp {
1627            left: Box::new(Expr::Column("a".into())),
1628            op: ArithOp::Div,
1629            right: Box::new(Expr::Column("b".into())),
1630        };
1631        assert_eq!(evaluate_expr(&div, &row), Value::Int(3)); // integer division
1632
1633        let modulo = Expr::BinaryOp {
1634            left: Box::new(Expr::Column("a".into())),
1635            op: ArithOp::Mod,
1636            right: Box::new(Expr::Column("b".into())),
1637        };
1638        assert_eq!(evaluate_expr(&modulo, &row), Value::Int(1));
1639    }
1640
1641    #[test]
1642    fn test_evaluate_expr_float_coercion() {
1643        let row = Row::from([("a".into(), Value::Int(10)), ("b".into(), Value::Float(3.0))]);
1644        let add = Expr::BinaryOp {
1645            left: Box::new(Expr::Column("a".into())),
1646            op: ArithOp::Add,
1647            right: Box::new(Expr::Column("b".into())),
1648        };
1649        assert_eq!(evaluate_expr(&add, &row), Value::Float(13.0));
1650    }
1651
1652    #[test]
1653    fn test_evaluate_expr_null_propagation() {
1654        let row = Row::from([("a".into(), Value::Int(10))]);
1655        let add = Expr::BinaryOp {
1656            left: Box::new(Expr::Column("a".into())),
1657            op: ArithOp::Add,
1658            right: Box::new(Expr::Column("missing".into())),
1659        };
1660        assert_eq!(evaluate_expr(&add, &row), Value::Null);
1661    }
1662
1663    #[test]
1664    fn test_evaluate_expr_div_by_zero() {
1665        let row = Row::from([("a".into(), Value::Int(10)), ("b".into(), Value::Int(0))]);
1666        let div = Expr::BinaryOp {
1667            left: Box::new(Expr::Column("a".into())),
1668            op: ArithOp::Div,
1669            right: Box::new(Expr::Column("b".into())),
1670        };
1671        assert_eq!(evaluate_expr(&div, &row), Value::Null);
1672    }
1673
1674    #[test]
1675    fn test_evaluate_expr_unary_minus() {
1676        let row = Row::from([("x".into(), Value::Int(5))]);
1677        let neg = Expr::UnaryMinus(Box::new(Expr::Column("x".into())));
1678        assert_eq!(evaluate_expr(&neg, &row), Value::Int(-5));
1679    }
1680
1681    #[test]
1682    fn test_select_with_expression() {
1683        // Integration test: SELECT count * 2 AS doubled FROM test
1684        let stmt = crate::query_parser::parse_query(
1685            "SELECT count * 2 AS doubled FROM test"
1686        ).unwrap();
1687        if let crate::query_parser::Statement::Select(q) = stmt {
1688            let (rows, cols) = execute_inner(&q, &make_rows(), None).unwrap();
1689            assert_eq!(cols, vec!["doubled"]);
1690            assert_eq!(rows.len(), 3);
1691            // Rows are: count=10, count=5, count=20
1692            let values: Vec<Value> = rows.iter().map(|r| r["doubled"].clone()).collect();
1693            assert!(values.contains(&Value::Int(20)));
1694            assert!(values.contains(&Value::Int(10)));
1695            assert!(values.contains(&Value::Int(40)));
1696        } else {
1697            panic!("Expected Select");
1698        }
1699    }
1700
1701    #[test]
1702    fn test_where_with_expression() {
1703        // SELECT * FROM test WHERE count * 2 > 15
1704        let stmt = crate::query_parser::parse_query(
1705            "SELECT * FROM test WHERE count * 2 > 15"
1706        ).unwrap();
1707        if let crate::query_parser::Statement::Select(q) = stmt {
1708            let (rows, _) = execute_inner(&q, &make_rows(), None).unwrap();
1709            // count=10 → 20 > 15 ✓, count=5 → 10 > 15 ✗, count=20 → 40 > 15 ✓
1710            assert_eq!(rows.len(), 2);
1711        } else {
1712            panic!("Expected Select");
1713        }
1714    }
1715
1716    #[test]
1717    fn test_order_by_expression() {
1718        // SELECT * FROM test ORDER BY count * -1 ASC (effectively DESC by count)
1719        let stmt = crate::query_parser::parse_query(
1720            "SELECT title, count FROM test ORDER BY count * -1 ASC"
1721        ).unwrap();
1722        if let crate::query_parser::Statement::Select(q) = stmt {
1723            let (rows, _) = execute_inner(&q, &make_rows(), None).unwrap();
1724            // count: 20 → -20, 10 → -10, 5 → -5, ASC means -20, -10, -5
1725            assert_eq!(rows[0]["count"], Value::Int(20));
1726            assert_eq!(rows[1]["count"], Value::Int(10));
1727            assert_eq!(rows[2]["count"], Value::Int(5));
1728        } else {
1729            panic!("Expected Select");
1730        }
1731    }
1732
1733    // ── CASE WHEN evaluation tests ────────────────────────────────
1734
1735    #[test]
1736    fn test_case_when_eval_basic() {
1737        let row = Row::from([("status".into(), Value::String("ACTIVE".into()))]);
1738        let expr = Expr::Case {
1739            whens: vec![(
1740                WhereClause::Comparison(Comparison {
1741                    column: "status".into(),
1742                    op: CmpOp::Eq,
1743                    value: Some(SqlValue::String("ACTIVE".into())),
1744                    left_expr: Some(Expr::Column("status".into())),
1745                    right_expr: Some(Expr::Literal(SqlValue::String("ACTIVE".into()))),
1746                }),
1747                Box::new(Expr::Literal(SqlValue::Int(1))),
1748            )],
1749            else_expr: Some(Box::new(Expr::Literal(SqlValue::Int(0)))),
1750        };
1751        assert_eq!(evaluate_expr(&expr, &row), Value::Int(1));
1752    }
1753
1754    #[test]
1755    fn test_case_when_eval_else() {
1756        let row = Row::from([("status".into(), Value::String("KILLED".into()))]);
1757        let expr = Expr::Case {
1758            whens: vec![(
1759                WhereClause::Comparison(Comparison {
1760                    column: "status".into(),
1761                    op: CmpOp::Eq,
1762                    value: Some(SqlValue::String("ACTIVE".into())),
1763                    left_expr: Some(Expr::Column("status".into())),
1764                    right_expr: Some(Expr::Literal(SqlValue::String("ACTIVE".into()))),
1765                }),
1766                Box::new(Expr::Literal(SqlValue::Int(1))),
1767            )],
1768            else_expr: Some(Box::new(Expr::Literal(SqlValue::Int(0)))),
1769        };
1770        assert_eq!(evaluate_expr(&expr, &row), Value::Int(0));
1771    }
1772
1773    #[test]
1774    fn test_case_when_eval_no_else_null() {
1775        let row = Row::from([("x".into(), Value::Int(99))]);
1776        let expr = Expr::Case {
1777            whens: vec![(
1778                WhereClause::Comparison(Comparison {
1779                    column: "x".into(),
1780                    op: CmpOp::Eq,
1781                    value: Some(SqlValue::Int(1)),
1782                    left_expr: Some(Expr::Column("x".into())),
1783                    right_expr: Some(Expr::Literal(SqlValue::Int(1))),
1784                }),
1785                Box::new(Expr::Literal(SqlValue::String("one".into()))),
1786            )],
1787            else_expr: None,
1788        };
1789        assert_eq!(evaluate_expr(&expr, &row), Value::Null);
1790    }
1791
1792    #[test]
1793    fn test_case_when_in_aggregate_query() {
1794        // SUM(CASE WHEN count > 5 THEN count ELSE 0 END)
1795        // Rows: count=10, count=5, count=20 → should sum 10 + 0 + 20 = 30
1796        let stmt = crate::query_parser::parse_query(
1797            "SELECT SUM(CASE WHEN count > 5 THEN count ELSE 0 END) AS total FROM test"
1798        ).unwrap();
1799        if let crate::query_parser::Statement::Select(q) = stmt {
1800            let (rows, cols) = execute_inner(&q, &make_rows(), None).unwrap();
1801            assert_eq!(cols, vec!["total"]);
1802            assert_eq!(rows.len(), 1);
1803            assert_eq!(rows[0]["total"], Value::Float(30.0));
1804        } else {
1805            panic!("Expected Select");
1806        }
1807    }
1808
1809    #[test]
1810    fn test_case_when_with_unary_minus_in_aggregate() {
1811        // SUM(CASE WHEN title = 'Alpha' THEN count ELSE -count END)
1812        // Alpha: 10, Beta: -5, Gamma: -20 → 10 - 5 - 20 = -15
1813        let stmt = crate::query_parser::parse_query(
1814            "SELECT SUM(CASE WHEN title = 'Alpha' THEN count ELSE -count END) AS net FROM test"
1815        ).unwrap();
1816        if let crate::query_parser::Statement::Select(q) = stmt {
1817            let (rows, _) = execute_inner(&q, &make_rows(), None).unwrap();
1818            assert_eq!(rows.len(), 1);
1819            assert_eq!(rows[0]["net"], Value::Float(-15.0));
1820        } else {
1821            panic!("Expected Select");
1822        }
1823    }
1824
1825    #[test]
1826    fn test_dateadd_with_dict_in_group_by() {
1827        // Simulate a joined row with a dict field, then GROUP BY + DateAdd expr
1828        use indexmap::IndexMap;
1829        let mut params = IndexMap::new();
1830        params.insert("exit_days".to_string(), Value::Int(21));
1831
1832        let rows = vec![
1833            Row::from([
1834                ("o.token".into(), Value::String("BTC".into())),
1835                ("o.event_date".into(), Value::Date(
1836                    chrono::NaiveDate::from_ymd_opt(2026, 1, 1).unwrap()
1837                )),
1838                ("o.size".into(), Value::Int(100)),
1839                ("s.params".into(), Value::Dict(params.clone())),
1840            ]),
1841            Row::from([
1842                ("o.token".into(), Value::String("BTC".into())),
1843                ("o.event_date".into(), Value::Date(
1844                    chrono::NaiveDate::from_ymd_opt(2026, 1, 1).unwrap()
1845                )),
1846                ("o.size".into(), Value::Int(50)),
1847                ("s.params".into(), Value::Dict(params.clone())),
1848            ]),
1849        ];
1850
1851        let q = SelectQuery {
1852            distinct: false,
1853            columns: ColumnList::Named(vec![
1854                SelectExpr::Column("o.token".into()),
1855                SelectExpr::Column("o.event_date".into()),
1856                SelectExpr::Expr {
1857                    expr: Expr::DateAdd {
1858                        date: Box::new(Expr::Column("o.event_date".into())),
1859                        days: Box::new(Expr::Column("s.params.exit_days".into())),
1860                    },
1861                    alias: Some("exit_date".into()),
1862                },
1863                SelectExpr::Aggregate {
1864                    func: AggFunc::Sum,
1865                    arg: "o.size".into(),
1866                    arg_expr: Some(Expr::Column("o.size".into())),
1867                    alias: Some("total".into()),
1868                },
1869            ]),
1870            table: "orders".into(),
1871            table_alias: None,
1872            subquery: None,
1873            joins: vec![],
1874            where_clause: None,
1875            group_by: Some(vec!["o.token".into(), "o.event_date".into()]),
1876            having: None,
1877            order_by: None,
1878            limit: None,
1879            ctes: vec![],
1880        };
1881
1882        let (rows, cols) = execute_inner(&q, &rows, None).unwrap();
1883        assert_eq!(rows.len(), 1);
1884        assert!(cols.contains(&"exit_date".to_string()));
1885        assert_eq!(rows[0]["total"], Value::Float(150.0));
1886        // The key test: exit_date should be 2026-01-22, not Null
1887        assert_eq!(
1888            rows[0]["exit_date"],
1889            Value::Date(chrono::NaiveDate::from_ymd_opt(2026, 1, 22).unwrap())
1890        );
1891    }
1892
1893    #[test]
1894    fn test_aggregate_arithmetic() {
1895        // SUM(count) for all rows = 10 + 5 + 20 = 35
1896        // COUNT(*) = 3
1897        // SUM produces Float, COUNT produces Int → mixed → Float division
1898        let stmt = crate::query_parser::parse_query(
1899            "SELECT SUM(count) / COUNT(*) AS avg_count FROM test"
1900        ).unwrap();
1901        if let crate::query_parser::Statement::Select(q) = stmt {
1902            let (rows, cols) = execute_inner(&q, &make_rows(), None).unwrap();
1903            assert_eq!(cols, vec!["avg_count"]);
1904            assert_eq!(rows.len(), 1);
1905            match &rows[0]["avg_count"] {
1906                Value::Float(f) => assert!((f - 11.666666666666666).abs() < 0.001),
1907                other => panic!("Expected Float, got {:?}", other),
1908            }
1909        } else {
1910            panic!("Expected Select");
1911        }
1912    }
1913
1914    #[test]
1915    fn test_aggregate_subtraction_with_group_by() {
1916        let rows = vec![
1917            {
1918                let mut r = Row::new();
1919                r.insert("token".into(), Value::String("BTC".into()));
1920                r.insert("side".into(), Value::String("BUY".into()));
1921                r.insert("size".into(), Value::Float(100.0));
1922                r
1923            },
1924            {
1925                let mut r = Row::new();
1926                r.insert("token".into(), Value::String("BTC".into()));
1927                r.insert("side".into(), Value::String("SELL".into()));
1928                r.insert("size".into(), Value::Float(60.0));
1929                r
1930            },
1931        ];
1932        let stmt = crate::query_parser::parse_query(
1933            "SELECT token, SUM(CASE WHEN side = 'BUY' THEN size ELSE 0 END) - SUM(CASE WHEN side = 'SELL' THEN size ELSE 0 END) AS net FROM test GROUP BY token"
1934        ).unwrap();
1935        if let crate::query_parser::Statement::Select(q) = stmt {
1936            let (result, _) = execute_inner(&q, &rows, None).unwrap();
1937            assert_eq!(result.len(), 1);
1938            assert_eq!(result[0]["net"], Value::Float(40.0));
1939        } else {
1940            panic!("Expected Select");
1941        }
1942    }
1943
1944    // ── Issue #42: Aggregate subtraction without GROUP BY ──
1945
1946    #[test]
1947    fn test_aggregate_subtraction_no_group() {
1948        // SUM(count) = 10 + 5 + 20 = 35, COUNT(*) = 3, diff = 35 - 3 = 32
1949        let stmt = crate::query_parser::parse_query(
1950            "SELECT SUM(count) - COUNT(*) as diff FROM test"
1951        ).unwrap();
1952        if let crate::query_parser::Statement::Select(q) = stmt {
1953            let (rows, cols) = execute_inner(&q, &make_rows(), None).unwrap();
1954            assert_eq!(cols, vec!["diff"]);
1955            assert_eq!(rows.len(), 1);
1956            assert_eq!(rows[0]["diff"], Value::Float(32.0));
1957        } else {
1958            panic!("Expected Select");
1959        }
1960    }
1961
1962    // ── Issue #42: Aggregate division with GROUP BY ──
1963
1964    #[test]
1965    fn test_aggregate_division_with_group_by() {
1966        let rows = vec![
1967            {
1968                let mut r = Row::new();
1969                r.insert("category".into(), Value::String("A".into()));
1970                r.insert("count".into(), Value::Int(10));
1971                r
1972            },
1973            {
1974                let mut r = Row::new();
1975                r.insert("category".into(), Value::String("A".into()));
1976                r.insert("count".into(), Value::Int(20));
1977                r
1978            },
1979            {
1980                let mut r = Row::new();
1981                r.insert("category".into(), Value::String("B".into()));
1982                r.insert("count".into(), Value::Int(6));
1983                r
1984            },
1985        ];
1986        // Group A: SUM(count)=30, COUNT(*)=2, ratio=15.0
1987        // Group B: SUM(count)=6, COUNT(*)=1, ratio=6.0
1988        let stmt = crate::query_parser::parse_query(
1989            "SELECT category, SUM(count) / COUNT(*) as ratio FROM test GROUP BY category"
1990        ).unwrap();
1991        if let crate::query_parser::Statement::Select(q) = stmt {
1992            let (result, cols) = execute_inner(&q, &rows, None).unwrap();
1993            assert!(cols.contains(&"ratio".to_string()));
1994            assert_eq!(result.len(), 2);
1995            // Find group A and B by category value
1996            let group_a = result.iter().find(|r| r["category"] == Value::String("A".into())).unwrap();
1997            let group_b = result.iter().find(|r| r["category"] == Value::String("B".into())).unwrap();
1998            match &group_a["ratio"] {
1999                Value::Float(f) => assert!((f - 15.0).abs() < 0.001),
2000                other => panic!("Expected Float for group A ratio, got {:?}", other),
2001            }
2002            match &group_b["ratio"] {
2003                Value::Float(f) => assert!((f - 6.0).abs() < 0.001),
2004                other => panic!("Expected Float for group B ratio, got {:?}", other),
2005            }
2006        } else {
2007            panic!("Expected Select");
2008        }
2009    }
2010
2011    // ── Window function tests ────────────────────────────────────
2012
2013    #[test]
2014    fn test_window_row_number() {
2015        let stmt = crate::query_parser::parse_query(
2016            "SELECT title, ROW_NUMBER() OVER (ORDER BY count DESC) AS rn FROM test"
2017        ).unwrap();
2018        if let crate::query_parser::Statement::Select(q) = stmt {
2019            let (rows, cols) = execute_inner(&q, &make_rows(), None).unwrap();
2020            assert_eq!(cols, vec!["title", "rn"]);
2021            assert_eq!(rows.len(), 3);
2022            let by_title: HashMap<String, i64> = rows.iter()
2023                .map(|r| (r["title"].to_display_string(), match &r["rn"] { Value::Int(n) => *n, _ => panic!("Expected Int") }))
2024                .collect();
2025            assert_eq!(by_title["Gamma"], 1); // count=20
2026            assert_eq!(by_title["Alpha"], 2); // count=10
2027            assert_eq!(by_title["Beta"], 3);  // count=5
2028        } else {
2029            panic!("Expected Select");
2030        }
2031    }
2032
2033    #[test]
2034    fn test_window_rank_with_ties() {
2035        let mut rows = make_rows();
2036        rows[0].insert("count".into(), Value::Int(10));
2037        rows[1].insert("count".into(), Value::Int(10));
2038        rows[2].insert("count".into(), Value::Int(5));
2039
2040        let stmt = crate::query_parser::parse_query(
2041            "SELECT title, RANK() OVER (ORDER BY count DESC) AS rnk FROM test"
2042        ).unwrap();
2043        if let crate::query_parser::Statement::Select(q) = stmt {
2044            let (result, _) = execute_inner(&q, &rows, None).unwrap();
2045            let ranks: Vec<i64> = result.iter()
2046                .map(|r| match &r["rnk"] { Value::Int(n) => *n, _ => panic!("Expected Int") })
2047                .collect();
2048            assert!(ranks.contains(&1)); // two tied at rank 1
2049            assert!(ranks.iter().filter(|&&r| r == 1).count() == 2);
2050            assert!(ranks.contains(&3)); // rank 3 (gap after tie)
2051        } else {
2052            panic!("Expected Select");
2053        }
2054    }
2055
2056    #[test]
2057    fn test_window_dense_rank() {
2058        let mut rows = make_rows();
2059        rows[0].insert("count".into(), Value::Int(10));
2060        rows[1].insert("count".into(), Value::Int(10));
2061        rows[2].insert("count".into(), Value::Int(5));
2062
2063        let stmt = crate::query_parser::parse_query(
2064            "SELECT title, DENSE_RANK() OVER (ORDER BY count DESC) AS dr FROM test"
2065        ).unwrap();
2066        if let crate::query_parser::Statement::Select(q) = stmt {
2067            let (result, _) = execute_inner(&q, &rows, None).unwrap();
2068            let ranks: Vec<i64> = result.iter()
2069                .map(|r| match &r["dr"] { Value::Int(n) => *n, _ => panic!("Expected Int") })
2070                .collect();
2071            assert!(ranks.iter().filter(|&&r| r == 1).count() == 2);
2072            assert!(ranks.contains(&2)); // dense rank: no gap
2073            assert!(!ranks.contains(&3));
2074        } else {
2075            panic!("Expected Select");
2076        }
2077    }
2078
2079    #[test]
2080    fn test_window_lag() {
2081        let stmt = crate::query_parser::parse_query(
2082            "SELECT title, LAG(count, 1) OVER (ORDER BY count ASC) AS prev FROM test"
2083        ).unwrap();
2084        if let crate::query_parser::Statement::Select(q) = stmt {
2085            let (rows, _) = execute_inner(&q, &make_rows(), None).unwrap();
2086            // Sorted ASC: Beta(5), Alpha(10), Gamma(20)
2087            // LAG: NULL, 5, 10
2088            let first = rows.iter().find(|r| r["title"] == Value::String("Beta".into())).unwrap();
2089            assert_eq!(first["prev"], Value::Null);
2090            let second = rows.iter().find(|r| r["title"] == Value::String("Alpha".into())).unwrap();
2091            assert_eq!(second["prev"], Value::Int(5));
2092            let third = rows.iter().find(|r| r["title"] == Value::String("Gamma".into())).unwrap();
2093            assert_eq!(third["prev"], Value::Int(10));
2094        } else {
2095            panic!("Expected Select");
2096        }
2097    }
2098
2099    #[test]
2100    fn test_window_lead() {
2101        let stmt = crate::query_parser::parse_query(
2102            "SELECT title, LEAD(count, 1) OVER (ORDER BY count ASC) AS next FROM test"
2103        ).unwrap();
2104        if let crate::query_parser::Statement::Select(q) = stmt {
2105            let (rows, _) = execute_inner(&q, &make_rows(), None).unwrap();
2106            let first = rows.iter().find(|r| r["title"] == Value::String("Beta".into())).unwrap();
2107            assert_eq!(first["next"], Value::Int(10));
2108            let last = rows.iter().find(|r| r["title"] == Value::String("Gamma".into())).unwrap();
2109            assert_eq!(last["next"], Value::Null);
2110        } else {
2111            panic!("Expected Select");
2112        }
2113    }
2114
2115    #[test]
2116    fn test_window_sum_partition() {
2117        let rows = vec![
2118            Row::from([
2119                ("cat".into(), Value::String("A".into())),
2120                ("val".into(), Value::Int(10)),
2121            ]),
2122            Row::from([
2123                ("cat".into(), Value::String("A".into())),
2124                ("val".into(), Value::Int(20)),
2125            ]),
2126            Row::from([
2127                ("cat".into(), Value::String("B".into())),
2128                ("val".into(), Value::Int(5)),
2129            ]),
2130        ];
2131        let stmt = crate::query_parser::parse_query(
2132            "SELECT cat, val, SUM(val) OVER (PARTITION BY cat) AS cat_total FROM test"
2133        ).unwrap();
2134        if let crate::query_parser::Statement::Select(q) = stmt {
2135            let (result, cols) = execute_inner(&q, &rows, None).unwrap();
2136            assert_eq!(cols, vec!["cat", "val", "cat_total"]);
2137            assert_eq!(result.len(), 3);
2138            let a_rows: Vec<_> = result.iter().filter(|r| r["cat"] == Value::String("A".into())).collect();
2139            assert_eq!(a_rows.len(), 2);
2140            for r in &a_rows {
2141                assert_eq!(r["cat_total"], Value::Float(30.0));
2142            }
2143            let b_row = result.iter().find(|r| r["cat"] == Value::String("B".into())).unwrap();
2144            assert_eq!(b_row["cat_total"], Value::Float(5.0));
2145        } else {
2146            panic!("Expected Select");
2147        }
2148    }
2149
2150    #[test]
2151    fn test_window_with_where_order_limit() {
2152        let stmt = crate::query_parser::parse_query(
2153            "SELECT title, ROW_NUMBER() OVER (ORDER BY count DESC) AS rn FROM test WHERE count > 4 ORDER BY rn LIMIT 2"
2154        ).unwrap();
2155        if let crate::query_parser::Statement::Select(q) = stmt {
2156            let (result, _) = execute_inner(&q, &make_rows(), None).unwrap();
2157            assert_eq!(result.len(), 2);
2158            assert_eq!(result[0]["rn"], Value::Int(1));
2159            assert_eq!(result[1]["rn"], Value::Int(2));
2160        } else {
2161            panic!("Expected Select");
2162        }
2163    }
2164}