Skip to main content

mdql_core/
query_engine.rs

1//! Execute parsed queries over in-memory rows.
2
3use std::cmp::Ordering;
4use std::collections::HashMap;
5
6use regex::Regex;
7
8use crate::errors::MdqlError;
9use crate::model::{Row, Value};
10use crate::query_parser::*;
11use crate::schema::Schema;
12
13pub fn execute_query(
14    query: &SelectQuery,
15    rows: &[Row],
16    _schema: &Schema,
17) -> crate::errors::Result<(Vec<Row>, Vec<String>)> {
18    if let Some(ref sub) = query.subquery {
19        let (sub_rows, _sub_cols) = execute_inner(sub, rows, None)?;
20        return execute_inner(query, &sub_rows, None);
21    }
22    execute_inner(query, rows, None)
23}
24
25#[allow(dead_code)]
26pub(crate) fn execute_query_indexed(
27    query: &SelectQuery,
28    rows: &[Row],
29    schema: &Schema,
30    index: Option<&crate::index::TableIndex>,
31    searcher: Option<&crate::search::TableSearcher>,
32) -> crate::errors::Result<(Vec<Row>, Vec<String>)> {
33    // Pre-compute FTS results for any LIKE clauses on section columns
34    let fts_results = if let (Some(ref wc), Some(searcher)) = (&query.where_clause, searcher) {
35        collect_fts_results(wc, schema, searcher)
36    } else {
37        HashMap::new()
38    };
39
40    execute_with_fts(query, rows, index, &fts_results)
41}
42
43#[allow(dead_code)]
44fn collect_fts_results(
45    clause: &WhereClause,
46    schema: &Schema,
47    searcher: &crate::search::TableSearcher,
48) -> HashMap<(String, String), std::collections::HashSet<String>> {
49    let mut results = HashMap::new();
50    collect_fts_results_inner(clause, schema, searcher, &mut results);
51    results
52}
53
54#[allow(dead_code)]
55fn collect_fts_results_inner(
56    clause: &WhereClause,
57    schema: &Schema,
58    searcher: &crate::search::TableSearcher,
59    results: &mut HashMap<(String, String), std::collections::HashSet<String>>,
60) {
61    match clause {
62        WhereClause::Comparison(cmp) => {
63            if (cmp.op == CmpOp::Like || cmp.op == CmpOp::NotLike) && schema.sections.contains_key(&cmp.column) {
64                if let Some(SqlValue::String(pattern)) = &cmp.value {
65                    // Strip SQL wildcards for Tantivy query
66                    let search_term = pattern.replace('%', " ").replace('_', " ").trim().to_string();
67                    if !search_term.is_empty() {
68                        if let Ok(paths) = searcher.search(&search_term, Some(&cmp.column)) {
69                            let key = (cmp.column.clone(), pattern.clone());
70                            results.insert(key, paths.into_iter().collect());
71                        }
72                    }
73                }
74            }
75        }
76        WhereClause::BoolOp(bop) => {
77            collect_fts_results_inner(&bop.left, schema, searcher, results);
78            collect_fts_results_inner(&bop.right, schema, searcher, results);
79        }
80    }
81}
82
83type FtsResults = HashMap<(String, String), std::collections::HashSet<String>>;
84
85fn execute_with_fts(
86    query: &SelectQuery,
87    rows: &[Row],
88    index: Option<&crate::index::TableIndex>,
89    fts: &FtsResults,
90) -> crate::errors::Result<(Vec<Row>, Vec<String>)> {
91    // Determine available columns
92    let mut all_columns: Vec<String> = Vec::new();
93    let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
94    for r in rows {
95        for k in r.keys() {
96            if seen.insert(k.clone()) {
97                all_columns.push(k.clone());
98            }
99        }
100    }
101
102    // Check if query has aggregates
103    let has_aggregates = match &query.columns {
104        ColumnList::Named(exprs) => exprs.iter().any(|e| e.is_aggregate()),
105        _ => false,
106    };
107
108    // Output column names
109    let columns: Vec<String> = match &query.columns {
110        ColumnList::All => all_columns,
111        ColumnList::Named(exprs) => exprs.iter().map(|e| e.output_name()).collect(),
112    };
113
114    // Reject duplicate output names. A result row is a key->value map, so two
115    // columns with the same output name cannot both be represented — the dict
116    // would collapse them and the header/row lengths would silently disagree.
117    // Require the caller to disambiguate with AS.
118    if let ColumnList::Named(_) = &query.columns {
119        let mut seen = std::collections::HashSet::new();
120        for c in &columns {
121            if !seen.insert(c.as_str()) {
122                return Err(MdqlError::QueryExecution(format!(
123                    "duplicate output column '{}' — give each projection a unique name with AS",
124                    c
125                )));
126            }
127        }
128    }
129
130    // Filter — try index first, fall back to full scan
131    let filtered: Vec<Row> = if let Some(ref wc) = query.where_clause {
132        let candidate_paths = index.and_then(|idx| try_index_filter(wc, idx));
133        if let Some(paths) = candidate_paths {
134            rows.iter()
135                .filter(|r| {
136                    r.get("path")
137                        .and_then(|v| v.as_str())
138                        .map_or(false, |p| paths.contains(p))
139                })
140                .filter(|r| evaluate_with_fts(wc, r, fts))
141                .cloned()
142                .collect()
143        } else {
144            rows.iter()
145                .filter(|r| evaluate_with_fts(wc, r, fts))
146                .cloned()
147                .collect()
148        }
149    } else {
150        rows.to_vec()
151    };
152
153    // Aggregate if needed
154    let mut result = if has_aggregates || query.group_by.is_some() {
155        let exprs = match &query.columns {
156            ColumnList::Named(exprs) => exprs.clone(),
157            _ => return Err(MdqlError::QueryExecution(
158                "SELECT * with GROUP BY is not supported".into(),
159            )),
160        };
161        let group_keys = query.group_by.as_deref().unwrap_or(&[]);
162        aggregate_rows(&filtered, &exprs, group_keys)?
163    } else {
164        filtered
165    };
166
167    // HAVING filter — apply after aggregation
168    if let Some(ref having) = query.having {
169        result.retain(|row| evaluate(having, row));
170    }
171
172    // Window functions — compute after aggregation/HAVING, before ORDER BY
173    let has_windows = match &query.columns {
174        ColumnList::Named(exprs) => exprs.iter().any(|e| match e {
175            SelectExpr::Expr { expr, .. } => expr.contains_window(),
176            _ => false,
177        }),
178        _ => false,
179    };
180    if has_windows {
181        if let ColumnList::Named(ref exprs) = query.columns {
182            compute_windows(&mut result, exprs)?;
183        }
184    }
185
186    // Sort — resolve ORDER BY aliases against SELECT list
187    if let Some(ref order_by) = query.order_by {
188        let resolved = resolve_order_aliases(order_by, &query.columns);
189        sort_rows(&mut result, &resolved);
190    }
191
192    // Limit
193    if let Some(limit) = query.limit {
194        result.truncate(limit as usize);
195    }
196
197    // Project — evaluate expressions and strip to requested columns
198    if !matches!(query.columns, ColumnList::All) {
199        let named_exprs = match &query.columns {
200            ColumnList::Named(exprs) => exprs,
201            _ => unreachable!(),
202        };
203
204        // Compute expression columns first, then retain only requested columns.
205        // Skip if aggregation already computed them (re-evaluating would lose
206        // columns that only existed in pre-aggregation rows, e.g. dict fields).
207        let has_expr_cols = named_exprs.iter().any(|e| matches!(e, SelectExpr::Expr { .. }));
208        let already_aggregated = has_aggregates || query.group_by.is_some();
209        if has_expr_cols && !already_aggregated {
210            for row in &mut result {
211                for expr in named_exprs {
212                    if let SelectExpr::Expr { expr: e, alias } = expr {
213                        if e.contains_window() { continue; }
214                        let name = alias.clone().unwrap_or_else(|| e.display_name());
215                        let val = evaluate_expr(e, row);
216                        row.insert(name, val);
217                    }
218                }
219            }
220        }
221
222        let col_set: std::collections::HashSet<&str> =
223            columns.iter().map(|s| s.as_str()).collect();
224        for row in &mut result {
225            row.retain(|k, _| col_set.contains(k.as_str()));
226        }
227    }
228
229    // Null-fill so every result row carries every header column as a key.
230    // A requested column may be absent from a row because it does not exist
231    // on the table at all, or because it is an optional field/section missing
232    // on that row (and SELECT * unions keys across rows). Inserting Null keeps
233    // the column header aligned with each row dict, so consumers that zip
234    // `columns` with row values stay in sync.
235    for row in &mut result {
236        for col in &columns {
237            if !row.contains_key(col) {
238                row.insert(col.clone(), Value::Null);
239            }
240        }
241    }
242
243    Ok((result, columns))
244}
245
246fn aggregate_rows(
247    rows: &[Row],
248    exprs: &[SelectExpr],
249    group_keys: &[String],
250) -> crate::errors::Result<Vec<Row>> {
251    // Group rows by group_keys
252    let mut groups: Vec<(Vec<Value>, Vec<&Row>)> = Vec::new();
253    let mut key_index: HashMap<Vec<String>, usize> = HashMap::new();
254
255    if group_keys.is_empty() {
256        // No GROUP BY — all rows are one group
257        let all_refs: Vec<&Row> = rows.iter().collect();
258        groups.push((vec![], all_refs));
259    } else {
260        for row in rows {
261            let key: Vec<String> = group_keys
262                .iter()
263                .map(|k| {
264                    row.get(k)
265                        .map(|v| v.to_display_string())
266                        .unwrap_or_default()
267                })
268                .collect();
269            let key_vals: Vec<Value> = group_keys
270                .iter()
271                .map(|k| row.get(k).cloned().unwrap_or(Value::Null))
272                .collect();
273            if let Some(&idx) = key_index.get(&key) {
274                groups[idx].1.push(row);
275            } else {
276                let idx = groups.len();
277                key_index.insert(key, idx);
278                groups.push((key_vals, vec![row]));
279            }
280        }
281    }
282
283    // Compute aggregates per group
284    let mut result = Vec::new();
285    for (key_vals, group_rows) in &groups {
286        let mut out = Row::new();
287
288        // Fill in group key values
289        for (i, k) in group_keys.iter().enumerate() {
290            out.insert(k.clone(), key_vals[i].clone());
291        }
292
293        // Compute each expression
294        for expr in exprs {
295            match expr {
296                SelectExpr::Column(name) => {
297                    // Already filled if it's a group key; otherwise take first row's value
298                    if !out.contains_key(name) {
299                        if let Some(first) = group_rows.first() {
300                            out.insert(
301                                name.clone(),
302                                first.get(name).cloned().unwrap_or(Value::Null),
303                            );
304                        }
305                    }
306                }
307                SelectExpr::Aggregate { func, arg, arg_expr, alias } => {
308                    let out_name = alias
309                        .clone()
310                        .unwrap_or_else(|| expr.output_name());
311                    let val = compute_aggregate(func, arg, arg_expr.as_ref(), group_rows);
312                    out.insert(out_name, val);
313                }
314                SelectExpr::Expr { expr: e, alias } => {
315                    let out_name = alias.clone().unwrap_or_else(|| e.display_name());
316                    if e.contains_aggregate() {
317                        let val = evaluate_agg_expr(e, group_rows);
318                        out.insert(out_name, val);
319                    } else if let Some(first) = group_rows.first() {
320                        let val = evaluate_expr(e, first);
321                        out.insert(out_name, val);
322                    }
323                }
324            }
325        }
326
327        result.push(out);
328    }
329
330    Ok(result)
331}
332
333/// Resolve a per-row value for an aggregate argument.
334/// If `arg_expr` is set, evaluate it; otherwise look up `arg` as a column name.
335fn resolve_agg_value<'a>(arg: &str, arg_expr: Option<&Expr>, row: &'a Row) -> Value {
336    if let Some(expr) = arg_expr {
337        evaluate_expr(expr, row)
338    } else {
339        row.get(arg).cloned().unwrap_or(Value::Null)
340    }
341}
342
343fn compute_aggregate(func: &AggFunc, arg: &str, arg_expr: Option<&Expr>, rows: &[&Row]) -> Value {
344    match func {
345        AggFunc::Count => {
346            if arg == "*" && arg_expr.is_none() {
347                Value::Int(rows.len() as i64)
348            } else {
349                let count = rows
350                    .iter()
351                    .filter(|r| {
352                        let v = resolve_agg_value(arg, arg_expr, r);
353                        !v.is_null()
354                    })
355                    .count();
356                Value::Int(count as i64)
357            }
358        }
359        AggFunc::Sum => {
360            let mut total = 0.0f64;
361            let mut has_any = false;
362            for r in rows {
363                let v = resolve_agg_value(arg, arg_expr, r);
364                match v {
365                    Value::Int(n) => { total += n as f64; has_any = true; }
366                    Value::Float(f) => { total += f; has_any = true; }
367                    _ => {}
368                }
369            }
370            if has_any { Value::Float(total) } else { Value::Null }
371        }
372        AggFunc::Avg => {
373            let mut total = 0.0f64;
374            let mut count = 0usize;
375            for r in rows {
376                let v = resolve_agg_value(arg, arg_expr, r);
377                match v {
378                    Value::Int(n) => { total += n as f64; count += 1; }
379                    Value::Float(f) => { total += f; count += 1; }
380                    _ => {}
381                }
382            }
383            if count > 0 { Value::Float(total / count as f64) } else { Value::Null }
384        }
385        AggFunc::Min => {
386            let mut min_val: Option<Value> = None;
387            for r in rows {
388                let v = resolve_agg_value(arg, arg_expr, r);
389                if v.is_null() { continue; }
390                min_val = Some(match min_val {
391                    None => v,
392                    Some(ref current) => {
393                        if v.partial_cmp(current) == Some(std::cmp::Ordering::Less) {
394                            v
395                        } else {
396                            current.clone()
397                        }
398                    }
399                });
400            }
401            min_val.unwrap_or(Value::Null)
402        }
403        AggFunc::Max => {
404            let mut max_val: Option<Value> = None;
405            for r in rows {
406                let v = resolve_agg_value(arg, arg_expr, r);
407                if v.is_null() { continue; }
408                max_val = Some(match max_val {
409                    None => v,
410                    Some(ref current) => {
411                        if v.partial_cmp(current) == Some(std::cmp::Ordering::Greater) {
412                            v
413                        } else {
414                            current.clone()
415                        }
416                    }
417                });
418            }
419            max_val.unwrap_or(Value::Null)
420        }
421    }
422}
423
424fn compute_windows(rows: &mut Vec<Row>, select_exprs: &[SelectExpr]) -> crate::errors::Result<()> {
425    for se in select_exprs {
426        if let SelectExpr::Expr { expr, alias } = se {
427            if let Expr::Window { func, args, over } = expr {
428                let col_name = alias.clone().unwrap_or_else(|| expr.display_name());
429                compute_single_window(rows, func, args, over, &col_name)?;
430            }
431        }
432    }
433    Ok(())
434}
435
436fn compute_single_window(
437    rows: &mut Vec<Row>,
438    func: &WindowFunc,
439    args: &[Expr],
440    over: &WindowSpec,
441    col_name: &str,
442) -> crate::errors::Result<()> {
443    let mut partitions: Vec<Vec<usize>> = Vec::new();
444    let mut partition_map: HashMap<Vec<String>, usize> = HashMap::new();
445
446    for (i, row) in rows.iter().enumerate() {
447        let key: Vec<String> = over.partition_by.iter()
448            .map(|col| row.get(col).map(|v| v.to_display_string()).unwrap_or_default())
449            .collect();
450        if let Some(&idx) = partition_map.get(&key) {
451            partitions[idx].push(i);
452        } else {
453            let idx = partitions.len();
454            partition_map.insert(key, idx);
455            partitions.push(vec![i]);
456        }
457    }
458
459    for partition in &mut partitions {
460        if !over.order_by.is_empty() {
461            partition.sort_by(|&a, &b| {
462                for spec in &over.order_by {
463                    let (va, vb) = if let Some(ref expr) = spec.expr {
464                        (evaluate_expr(expr, &rows[a]), evaluate_expr(expr, &rows[b]))
465                    } else {
466                        (
467                            rows[a].get(&spec.column).cloned().unwrap_or(Value::Null),
468                            rows[b].get(&spec.column).cloned().unwrap_or(Value::Null),
469                        )
470                    };
471                    let ordering = match (&va, &vb) {
472                        (Value::Null, Value::Null) => Ordering::Equal,
473                        (Value::Null, _) => Ordering::Greater,
474                        (_, Value::Null) => Ordering::Less,
475                        (a_val, b_val) => compare_model_values(a_val, b_val).unwrap_or(Ordering::Equal),
476                    };
477                    let ordering = if spec.descending { ordering.reverse() } else { ordering };
478                    if ordering != Ordering::Equal {
479                        return ordering;
480                    }
481                }
482                Ordering::Equal
483            });
484        }
485    }
486
487    let mut values: Vec<(usize, Value)> = Vec::new();
488
489    for partition in &partitions {
490        match func {
491            WindowFunc::RowNumber => {
492                for (i, &row_idx) in partition.iter().enumerate() {
493                    values.push((row_idx, Value::Int((i + 1) as i64)));
494                }
495            }
496            WindowFunc::Rank => {
497                let mut rank = 1usize;
498                for (i, &row_idx) in partition.iter().enumerate() {
499                    if i > 0 {
500                        let prev_idx = partition[i - 1];
501                        let same = over.order_by.iter().all(|spec| {
502                            let va = if let Some(ref expr) = spec.expr {
503                                evaluate_expr(expr, &rows[prev_idx])
504                            } else {
505                                rows[prev_idx].get(&spec.column).cloned().unwrap_or(Value::Null)
506                            };
507                            let vb = if let Some(ref expr) = spec.expr {
508                                evaluate_expr(expr, &rows[row_idx])
509                            } else {
510                                rows[row_idx].get(&spec.column).cloned().unwrap_or(Value::Null)
511                            };
512                            va == vb
513                        });
514                        if !same {
515                            rank = i + 1;
516                        }
517                    }
518                    values.push((row_idx, Value::Int(rank as i64)));
519                }
520            }
521            WindowFunc::DenseRank => {
522                let mut rank = 1usize;
523                for (i, &row_idx) in partition.iter().enumerate() {
524                    if i > 0 {
525                        let prev_idx = partition[i - 1];
526                        let same = over.order_by.iter().all(|spec| {
527                            let va = if let Some(ref expr) = spec.expr {
528                                evaluate_expr(expr, &rows[prev_idx])
529                            } else {
530                                rows[prev_idx].get(&spec.column).cloned().unwrap_or(Value::Null)
531                            };
532                            let vb = if let Some(ref expr) = spec.expr {
533                                evaluate_expr(expr, &rows[row_idx])
534                            } else {
535                                rows[row_idx].get(&spec.column).cloned().unwrap_or(Value::Null)
536                            };
537                            va == vb
538                        });
539                        if !same {
540                            rank += 1;
541                        }
542                    }
543                    values.push((row_idx, Value::Int(rank as i64)));
544                }
545            }
546            WindowFunc::Lag => {
547                let offset = if args.len() > 1 {
548                    if let Expr::Literal(SqlValue::Int(n)) = &args[1] { *n as usize } else { 1 }
549                } else {
550                    1
551                };
552                for (i, &row_idx) in partition.iter().enumerate() {
553                    let val = if i >= offset && !args.is_empty() {
554                        evaluate_expr(&args[0], &rows[partition[i - offset]])
555                    } else {
556                        Value::Null
557                    };
558                    values.push((row_idx, val));
559                }
560            }
561            WindowFunc::Lead => {
562                let offset = if args.len() > 1 {
563                    if let Expr::Literal(SqlValue::Int(n)) = &args[1] { *n as usize } else { 1 }
564                } else {
565                    1
566                };
567                for (i, &row_idx) in partition.iter().enumerate() {
568                    let val = if i + offset < partition.len() && !args.is_empty() {
569                        evaluate_expr(&args[0], &rows[partition[i + offset]])
570                    } else {
571                        Value::Null
572                    };
573                    values.push((row_idx, val));
574                }
575            }
576            WindowFunc::Agg(agg_func) => {
577                let partition_rows: Vec<&Row> = partition.iter().map(|&i| &rows[i]).collect();
578                let (arg_name, arg_expr_opt) = if args.is_empty() {
579                    ("*".to_string(), None)
580                } else {
581                    (args[0].display_name(), Some(&args[0]))
582                };
583                let agg_val = compute_aggregate(agg_func, &arg_name, arg_expr_opt, &partition_rows);
584                for &row_idx in partition {
585                    values.push((row_idx, agg_val.clone()));
586                }
587            }
588        }
589    }
590
591    for (row_idx, val) in values {
592        rows[row_idx].insert(col_name.to_string(), val);
593    }
594
595    Ok(())
596}
597
598fn evaluate_with_fts(clause: &WhereClause, row: &Row, fts: &FtsResults) -> bool {
599    match clause {
600        WhereClause::BoolOp(bop) => {
601            let left = evaluate_with_fts(&bop.left, row, fts);
602            match bop.op {
603                BoolOpKind::And => left && evaluate_with_fts(&bop.right, row, fts),
604                BoolOpKind::Or => left || evaluate_with_fts(&bop.right, row, fts),
605            }
606        }
607        WhereClause::Comparison(cmp) => {
608            // Check if we have FTS results for this comparison
609            if cmp.op == CmpOp::Like || cmp.op == CmpOp::NotLike {
610                if let Some(SqlValue::String(pattern)) = &cmp.value {
611                    let key = (cmp.column.clone(), pattern.clone());
612                    if let Some(matching_paths) = fts.get(&key) {
613                        let row_path = row.get("path").and_then(|v| v.as_str()).unwrap_or("");
614                        let matched = matching_paths.contains(row_path);
615                        return if cmp.op == CmpOp::Like { matched } else { !matched };
616                    }
617                }
618            }
619            evaluate_comparison(cmp, row)
620        }
621    }
622}
623
624pub use crate::query_join::execute_join_query;
625
626pub(crate) fn execute_inner(
627    query: &SelectQuery,
628    rows: &[Row],
629    index: Option<&crate::index::TableIndex>,
630) -> crate::errors::Result<(Vec<Row>, Vec<String>)> {
631    let empty_fts = HashMap::new();
632    execute_with_fts(query, rows, index, &empty_fts)
633}
634
635pub fn evaluate(clause: &WhereClause, row: &Row) -> bool {
636    match clause {
637        WhereClause::BoolOp(bop) => {
638            let left = evaluate(&bop.left, row);
639            match bop.op {
640                BoolOpKind::And => left && evaluate(&bop.right, row),
641                BoolOpKind::Or => left || evaluate(&bop.right, row),
642            }
643        }
644        WhereClause::Comparison(cmp) => evaluate_comparison(cmp, row),
645    }
646}
647
648/// Evaluate an Expr against a row, returning a Value.
649pub(crate) fn evaluate_expr(expr: &Expr, row: &Row) -> Value {
650    match expr {
651        Expr::Literal(SqlValue::Int(n)) => Value::Int(*n),
652        Expr::Literal(SqlValue::Float(f)) => Value::Float(*f),
653        Expr::Literal(SqlValue::String(s)) => Value::String(s.clone()),
654        Expr::Literal(SqlValue::Bool(b)) => Value::Bool(*b),
655        Expr::Literal(SqlValue::Null) => Value::Null,
656        Expr::Literal(SqlValue::List(_)) => Value::Null,
657        Expr::Column(name) => {
658            if let Some(val) = row.get(name) {
659                return val.clone();
660            }
661            // Try all possible dot splits for dict access (e.g. "s.params.key")
662            for (i, _) in name.match_indices('.') {
663                let dict_col = &name[..i];
664                let dict_key = &name[i + 1..];
665                if let Some(Value::Dict(map)) = row.get(dict_col) {
666                    return map.get(dict_key).cloned().unwrap_or(Value::Null);
667                }
668            }
669            Value::Null
670        }
671        Expr::UnaryMinus(inner) => {
672            match evaluate_expr(inner, row) {
673                Value::Int(n) => Value::Int(-n),
674                Value::Float(f) => Value::Float(-f),
675                Value::Null => Value::Null,
676                _ => Value::Null, // non-numeric → NULL
677            }
678        }
679        Expr::BinaryOp { left, op, right } => {
680            let lv = evaluate_expr(left, row);
681            let rv = evaluate_expr(right, row);
682
683            // NULL propagation: any NULL operand → NULL
684            if lv.is_null() || rv.is_null() {
685                return Value::Null;
686            }
687
688            // Extract numeric values with int→float coercion
689            match (&lv, &rv) {
690                (Value::Int(a), Value::Int(b)) => {
691                    match op {
692                        ArithOp::Add => Value::Int(a.wrapping_add(*b)),
693                        ArithOp::Sub => Value::Int(a.wrapping_sub(*b)),
694                        ArithOp::Mul => Value::Int(a.wrapping_mul(*b)),
695                        ArithOp::Div => {
696                            if *b == 0 { Value::Null } else { Value::Int(a / b) }
697                        }
698                        ArithOp::Mod => {
699                            if *b == 0 { Value::Null } else { Value::Int(a % b) }
700                        }
701                    }
702                }
703                _ => {
704                    // Coerce to float
705                    let a = match &lv {
706                        Value::Int(n) => *n as f64,
707                        Value::Float(f) => *f,
708                        _ => return Value::Null,
709                    };
710                    let b = match &rv {
711                        Value::Int(n) => *n as f64,
712                        Value::Float(f) => *f,
713                        _ => return Value::Null,
714                    };
715                    match op {
716                        ArithOp::Add => Value::Float(a + b),
717                        ArithOp::Sub => Value::Float(a - b),
718                        ArithOp::Mul => Value::Float(a * b),
719                        ArithOp::Div => {
720                            if b == 0.0 { Value::Null } else { Value::Float(a / b) }
721                        }
722                        ArithOp::Mod => {
723                            if b == 0.0 { Value::Null } else { Value::Float(a % b) }
724                        }
725                    }
726                }
727            }
728        }
729        Expr::Case { whens, else_expr } => {
730            for (condition, result) in whens {
731                if evaluate(condition, row) {
732                    return evaluate_expr(result, row);
733                }
734            }
735            match else_expr {
736                Some(e) => evaluate_expr(e, row),
737                None => Value::Null,
738            }
739        }
740        Expr::CurrentDate => {
741            Value::Date(chrono::Local::now().naive_local().date())
742        }
743        Expr::CurrentTimestamp => {
744            Value::DateTime(chrono::Local::now().naive_local())
745        }
746        Expr::DateAdd { date, days } => {
747            let date_val = evaluate_expr(date, row);
748            let days_val = evaluate_expr(days, row);
749            let n = match &days_val {
750                Value::Int(n) => *n,
751                Value::Float(f) => *f as i64,
752                _ => return Value::Null,
753            };
754            let duration = chrono::Duration::days(n);
755            match date_val {
756                Value::Date(d) => {
757                    match d.checked_add_signed(duration) {
758                        Some(result) => Value::Date(result),
759                        None => Value::Null,
760                    }
761                }
762                Value::DateTime(dt) => {
763                    match dt.checked_add_signed(duration) {
764                        Some(result) => Value::DateTime(result),
765                        None => Value::Null,
766                    }
767                }
768                _ => Value::Null,
769            }
770        }
771        Expr::DateDiff { left, right } => {
772            let lv = evaluate_expr(left, row);
773            let rv = evaluate_expr(right, row);
774            let left_date = match &lv {
775                Value::Date(d) => d.and_hms_opt(0, 0, 0).unwrap(),
776                Value::DateTime(dt) => *dt,
777                _ => return Value::Null,
778            };
779            let right_date = match &rv {
780                Value::Date(d) => d.and_hms_opt(0, 0, 0).unwrap(),
781                Value::DateTime(dt) => *dt,
782                _ => return Value::Null,
783            };
784            Value::Int((left_date - right_date).num_days())
785        }
786        Expr::Aggregate { func, arg, .. } => {
787            // Post-aggregation: look up the pre-computed column name
788            let func_name = match func {
789                AggFunc::Count => "COUNT",
790                AggFunc::Sum => "SUM",
791                AggFunc::Avg => "AVG",
792                AggFunc::Min => "MIN",
793                AggFunc::Max => "MAX",
794            };
795            let col = format!("{}({})", func_name, arg);
796            row.get(&col).cloned().unwrap_or(Value::Null)
797        }
798        Expr::Subquery(_) => Value::Null,
799        Expr::Window { .. } => {
800            let display = expr.display_name();
801            row.get(&display).cloned().unwrap_or(Value::Null)
802        }
803    }
804}
805
806fn evaluate_agg_expr(expr: &Expr, group_rows: &[&Row]) -> Value {
807    match expr {
808        Expr::Aggregate { func, arg, arg_expr } => {
809            compute_aggregate(func, arg, arg_expr.as_deref(), group_rows)
810        }
811        Expr::BinaryOp { left, op, right } => {
812            let lv = evaluate_agg_expr(left, group_rows);
813            let rv = evaluate_agg_expr(right, group_rows);
814            apply_arith_op(op, &lv, &rv)
815        }
816        Expr::UnaryMinus(inner) => {
817            match evaluate_agg_expr(inner, group_rows) {
818                Value::Int(n) => Value::Int(-n),
819                Value::Float(f) => Value::Float(-f),
820                _ => Value::Null,
821            }
822        }
823        other => {
824            if let Some(first) = group_rows.first() {
825                evaluate_expr(other, first)
826            } else {
827                Value::Null
828            }
829        }
830    }
831}
832
833fn apply_arith_op(op: &ArithOp, lv: &Value, rv: &Value) -> Value {
834    if lv.is_null() || rv.is_null() {
835        return Value::Null;
836    }
837    match (lv, rv) {
838        (Value::Int(a), Value::Int(b)) => match op {
839            ArithOp::Add => Value::Int(a.wrapping_add(*b)),
840            ArithOp::Sub => Value::Int(a.wrapping_sub(*b)),
841            ArithOp::Mul => Value::Int(a.wrapping_mul(*b)),
842            ArithOp::Div => if *b == 0 { Value::Null } else { Value::Int(a / b) },
843            ArithOp::Mod => if *b == 0 { Value::Null } else { Value::Int(a % b) },
844        },
845        _ => {
846            let a = match lv {
847                Value::Int(n) => *n as f64,
848                Value::Float(f) => *f,
849                _ => return Value::Null,
850            };
851            let b = match rv {
852                Value::Int(n) => *n as f64,
853                Value::Float(f) => *f,
854                _ => return Value::Null,
855            };
856            match op {
857                ArithOp::Add => Value::Float(a + b),
858                ArithOp::Sub => Value::Float(a - b),
859                ArithOp::Mul => Value::Float(a * b),
860                ArithOp::Div => if b == 0.0 { Value::Null } else { Value::Float(a / b) },
861                ArithOp::Mod => if b == 0.0 { Value::Null } else { Value::Float(a % b) },
862            }
863        }
864    }
865}
866
867fn evaluate_comparison(cmp: &Comparison, row: &Row) -> bool {
868    // If we have expression-based comparison (new path), use it for standard ops
869    if let (Some(left_expr), Some(right_expr)) = (&cmp.left_expr, &cmp.right_expr) {
870        if matches!(cmp.op, CmpOp::Eq | CmpOp::Ne | CmpOp::Lt | CmpOp::Gt | CmpOp::Le | CmpOp::Ge) {
871            let left_val = evaluate_expr(left_expr, row);
872            let right_val = evaluate_expr(right_expr, row);
873
874            // NULL comparison: always false (except IS NULL handled below)
875            if left_val.is_null() || right_val.is_null() {
876                return false;
877            }
878
879            // Coerce for comparison: if types differ, try int→float
880            let ord = compare_model_values(&left_val, &right_val);
881
882            return match cmp.op {
883                CmpOp::Eq => ord == Some(Ordering::Equal),
884                CmpOp::Ne => ord != Some(Ordering::Equal),
885                CmpOp::Lt => ord == Some(Ordering::Less),
886                CmpOp::Gt => ord == Some(Ordering::Greater),
887                CmpOp::Le => matches!(ord, Some(Ordering::Less | Ordering::Equal)),
888                CmpOp::Ge => matches!(ord, Some(Ordering::Greater | Ordering::Equal)),
889                _ => false,
890            };
891        }
892    }
893
894    // Fall back to legacy column-based comparison for IS NULL, IN, LIKE, etc.
895    let actual = row.get(&cmp.column);
896
897    if cmp.op == CmpOp::IsNull {
898        return actual.map_or(true, |v| v.is_null());
899    }
900    if cmp.op == CmpOp::IsNotNull {
901        return actual.map_or(false, |v| !v.is_null());
902    }
903
904    let actual = match actual {
905        Some(v) if !v.is_null() => v,
906        _ => return false,
907    };
908
909    let expected = match &cmp.value {
910        Some(v) => v,
911        None => return false,
912    };
913
914    match cmp.op {
915        CmpOp::Eq => eq_match(actual, expected),
916        CmpOp::Ne => !eq_match(actual, expected),
917        CmpOp::Lt => compare_values(actual, expected) == Some(Ordering::Less),
918        CmpOp::Gt => compare_values(actual, expected) == Some(Ordering::Greater),
919        CmpOp::Le => matches!(compare_values(actual, expected), Some(Ordering::Less | Ordering::Equal)),
920        CmpOp::Ge => matches!(compare_values(actual, expected), Some(Ordering::Greater | Ordering::Equal)),
921        CmpOp::Like => like_match(actual, expected),
922        CmpOp::NotLike => !like_match(actual, expected),
923        CmpOp::In => {
924            if let SqlValue::List(items) = expected {
925                items.iter().any(|v| eq_match(actual, v))
926            } else {
927                eq_match(actual, expected)
928            }
929        }
930        CmpOp::IsNull | CmpOp::IsNotNull => unreachable!(),
931    }
932}
933
934/// Compare two model::Value instances, with int↔float coercion.
935fn compare_model_values(a: &Value, b: &Value) -> Option<Ordering> {
936    match (a, b) {
937        (Value::Int(x), Value::Float(y)) => (*x as f64).partial_cmp(y),
938        (Value::Float(x), Value::Int(y)) => x.partial_cmp(&(*y as f64)),
939        _ => a.partial_cmp(b),
940    }
941}
942
943fn coerce_sql_to_value(sql_val: &SqlValue, target: &Value) -> Value {
944    match sql_val {
945        SqlValue::Null => Value::Null,
946        SqlValue::String(s) => {
947            match target {
948                Value::Int(_) => s.parse::<i64>().map(Value::Int).unwrap_or(Value::String(s.clone())),
949                Value::Float(_) => s.parse::<f64>().map(Value::Float).unwrap_or(Value::String(s.clone())),
950                Value::Date(_) => {
951                    chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d")
952                        .map(Value::Date)
953                        .unwrap_or(Value::String(s.clone()))
954                }
955                Value::DateTime(_) => {
956                    chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S")
957                        .or_else(|_| chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f"))
958                        .map(Value::DateTime)
959                        .unwrap_or(Value::String(s.clone()))
960                }
961                _ => Value::String(s.clone()),
962            }
963        }
964        SqlValue::Int(n) => {
965            match target {
966                Value::Float(_) => Value::Float(*n as f64),
967                _ => Value::Int(*n),
968            }
969        }
970        SqlValue::Float(f) => Value::Float(*f),
971        SqlValue::Bool(b) => Value::Bool(*b),
972        SqlValue::List(_) => Value::Null, // Lists handled separately
973    }
974}
975
976fn eq_match(actual: &Value, expected: &SqlValue) -> bool {
977    // Special handling for lists (e.g., categories)
978    if let Value::List(items) = actual {
979        if let SqlValue::String(s) = expected {
980            return items.contains(s);
981        }
982    }
983
984    let coerced = coerce_sql_to_value(expected, actual);
985    actual == &coerced
986}
987
988fn like_match(actual: &Value, pattern: &SqlValue) -> bool {
989    let pattern_str = match pattern {
990        SqlValue::String(s) => s,
991        _ => return false,
992    };
993
994    // Convert SQL LIKE to regex
995    let mut regex_str = String::from("(?is)^");
996    for ch in pattern_str.chars() {
997        match ch {
998            '%' => regex_str.push_str(".*"),
999            '_' => regex_str.push('.'),
1000            c => {
1001                if regex::escape(&c.to_string()) != c.to_string() {
1002                    regex_str.push_str(&regex::escape(&c.to_string()));
1003                } else {
1004                    regex_str.push(c);
1005                }
1006            }
1007        }
1008    }
1009    regex_str.push('$');
1010
1011    let re = match Regex::new(&regex_str) {
1012        Ok(r) => r,
1013        Err(_) => return false,
1014    };
1015
1016    match actual {
1017        Value::List(items) => items.iter().any(|item| re.is_match(item)),
1018        _ => re.is_match(&actual.to_display_string()),
1019    }
1020}
1021
1022fn compare_values(actual: &Value, expected: &SqlValue) -> Option<Ordering> {
1023    let coerced = coerce_sql_to_value(expected, actual);
1024    actual.partial_cmp(&coerced)
1025}
1026
1027/// Convert a SqlValue to a Value for index lookups (without a target type for coercion).
1028fn sql_value_to_index_value(sv: &SqlValue) -> Value {
1029    match sv {
1030        SqlValue::String(s) => {
1031            // Try datetime first (more specific)
1032            if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") {
1033                return Value::DateTime(dt);
1034            }
1035            if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f") {
1036                return Value::DateTime(dt);
1037            }
1038            // Try date
1039            if let Ok(d) = chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d") {
1040                return Value::Date(d);
1041            }
1042            Value::String(s.clone())
1043        }
1044        SqlValue::Int(n) => Value::Int(*n),
1045        SqlValue::Float(f) => Value::Float(*f),
1046        SqlValue::Bool(b) => Value::Bool(*b),
1047        SqlValue::Null => Value::Null,
1048        SqlValue::List(_) => Value::Null,
1049    }
1050}
1051
1052/// Try to use B-tree indexes to narrow the candidate row set.
1053/// Returns Some(paths) if the entire WHERE clause could be resolved via index,
1054/// or None if a full scan is needed.
1055fn try_index_filter(
1056    clause: &WhereClause,
1057    index: &crate::index::TableIndex,
1058) -> Option<std::collections::HashSet<String>> {
1059    match clause {
1060        WhereClause::Comparison(cmp) => {
1061            if !index.has_index(&cmp.column) {
1062                return None;
1063            }
1064            match cmp.op {
1065                CmpOp::Eq => {
1066                    let val = sql_value_to_index_value(cmp.value.as_ref()?);
1067                    let paths = index.lookup_eq(&cmp.column, &val);
1068                    Some(paths.into_iter().map(|s| s.to_string()).collect())
1069                }
1070                CmpOp::Lt => {
1071                    let val = sql_value_to_index_value(cmp.value.as_ref()?);
1072                    // exclusive upper bound: use range with max < val
1073                    // lookup_range is inclusive, so we get all <= val then remove exact matches
1074                    let range_paths = index.lookup_range(&cmp.column, None, Some(&val));
1075                    let eq_paths: std::collections::HashSet<&str> = index.lookup_eq(&cmp.column, &val).into_iter().collect();
1076                    Some(range_paths.into_iter().filter(|p| !eq_paths.contains(p)).map(|s| s.to_string()).collect())
1077                }
1078                CmpOp::Gt => {
1079                    let val = sql_value_to_index_value(cmp.value.as_ref()?);
1080                    let range_paths = index.lookup_range(&cmp.column, Some(&val), None);
1081                    let eq_paths: std::collections::HashSet<&str> = index.lookup_eq(&cmp.column, &val).into_iter().collect();
1082                    Some(range_paths.into_iter().filter(|p| !eq_paths.contains(p)).map(|s| s.to_string()).collect())
1083                }
1084                CmpOp::Le => {
1085                    let val = sql_value_to_index_value(cmp.value.as_ref()?);
1086                    let paths = index.lookup_range(&cmp.column, None, Some(&val));
1087                    Some(paths.into_iter().map(|s| s.to_string()).collect())
1088                }
1089                CmpOp::Ge => {
1090                    let val = sql_value_to_index_value(cmp.value.as_ref()?);
1091                    let paths = index.lookup_range(&cmp.column, Some(&val), None);
1092                    Some(paths.into_iter().map(|s| s.to_string()).collect())
1093                }
1094                CmpOp::In => {
1095                    if let Some(SqlValue::List(items)) = &cmp.value {
1096                        let vals: Vec<Value> = items.iter().map(sql_value_to_index_value).collect();
1097                        let paths = index.lookup_in(&cmp.column, &vals);
1098                        Some(paths.into_iter().map(|s| s.to_string()).collect())
1099                    } else {
1100                        None
1101                    }
1102                }
1103                _ => None, // LIKE, IS NULL, etc. can't use index
1104            }
1105        }
1106        WhereClause::BoolOp(bop) => {
1107            let left = try_index_filter(&bop.left, index);
1108            let right = try_index_filter(&bop.right, index);
1109            match bop.op {
1110                BoolOpKind::And => {
1111                    match (left, right) {
1112                        (Some(l), Some(r)) => Some(l.intersection(&r).cloned().collect()),
1113                        (Some(l), None) => Some(l), // narrow with left, scan-verify right
1114                        (None, Some(r)) => Some(r),
1115                        (None, None) => None,
1116                    }
1117                }
1118                BoolOpKind::Or => {
1119                    match (left, right) {
1120                        (Some(l), Some(r)) => Some(l.union(&r).cloned().collect()),
1121                        _ => None, // Can't use index if either side needs full scan
1122                    }
1123                }
1124            }
1125        }
1126    }
1127}
1128
1129/// If an ORDER BY column matches a SELECT alias, replace its expr with the
1130/// aliased expression so sorting uses the computed value.
1131fn resolve_order_aliases(specs: &[OrderSpec], columns: &ColumnList) -> Vec<OrderSpec> {
1132    let named = match columns {
1133        ColumnList::Named(exprs) => exprs,
1134        _ => return specs.to_vec(),
1135    };
1136
1137    // Build alias → expr map (skip window exprs — their values are already in rows)
1138    let alias_map: HashMap<String, &Expr> = named
1139        .iter()
1140        .filter_map(|se| match se {
1141            SelectExpr::Expr { expr, alias: Some(a) } if !expr.contains_window() => {
1142                Some((a.clone(), expr))
1143            }
1144            _ => None,
1145        })
1146        .collect();
1147
1148    specs
1149        .iter()
1150        .map(|spec| {
1151            // If the ORDER BY column name matches a SELECT alias, use that expression
1152            if let Some(expr) = alias_map.get(&spec.column) {
1153                OrderSpec {
1154                    column: spec.column.clone(),
1155                    expr: Some((*expr).clone()),
1156                    descending: spec.descending,
1157                }
1158            } else {
1159                spec.clone()
1160            }
1161        })
1162        .collect()
1163}
1164
1165fn sort_rows(rows: &mut Vec<Row>, specs: &[OrderSpec]) {
1166    rows.sort_by(|a, b| {
1167        for spec in specs {
1168            let (va, vb) = if let Some(ref expr) = spec.expr {
1169                (evaluate_expr(expr, a), evaluate_expr(expr, b))
1170            } else {
1171                (
1172                    a.get(&spec.column).cloned().unwrap_or(Value::Null),
1173                    b.get(&spec.column).cloned().unwrap_or(Value::Null),
1174                )
1175            };
1176
1177            // NULLs sort last
1178            let ordering = match (&va, &vb) {
1179                (Value::Null, Value::Null) => Ordering::Equal,
1180                (Value::Null, _) => Ordering::Greater,
1181                (_, Value::Null) => Ordering::Less,
1182                (a_val, b_val) => {
1183                    compare_model_values(a_val, b_val).unwrap_or(Ordering::Equal)
1184                }
1185            };
1186
1187            let ordering = if spec.descending {
1188                ordering.reverse()
1189            } else {
1190                ordering
1191            };
1192
1193            if ordering != Ordering::Equal {
1194                return ordering;
1195            }
1196        }
1197        Ordering::Equal
1198    });
1199}
1200
1201/// Convert a SqlValue to our model Value (for use in insert/update).
1202pub(crate) fn sql_value_to_value(sql_val: &SqlValue) -> Value {
1203    match sql_val {
1204        SqlValue::Null => Value::Null,
1205        SqlValue::String(s) => Value::String(s.clone()),
1206        SqlValue::Int(n) => Value::Int(*n),
1207        SqlValue::Float(f) => Value::Float(*f),
1208        SqlValue::Bool(b) => Value::Bool(*b),
1209        SqlValue::List(items) => {
1210            let strings: Vec<String> = items
1211                .iter()
1212                .filter_map(|v| match v {
1213                    SqlValue::String(s) => Some(s.clone()),
1214                    _ => None,
1215                })
1216                .collect();
1217            Value::List(strings)
1218        }
1219    }
1220}
1221
1222#[cfg(test)]
1223mod tests {
1224    use super::*;
1225
1226    fn make_rows() -> Vec<Row> {
1227        vec![
1228            Row::from([
1229                ("path".into(), Value::String("a.md".into())),
1230                ("title".into(), Value::String("Alpha".into())),
1231                ("count".into(), Value::Int(10)),
1232            ]),
1233            Row::from([
1234                ("path".into(), Value::String("b.md".into())),
1235                ("title".into(), Value::String("Beta".into())),
1236                ("count".into(), Value::Int(5)),
1237            ]),
1238            Row::from([
1239                ("path".into(), Value::String("c.md".into())),
1240                ("title".into(), Value::String("Gamma".into())),
1241                ("count".into(), Value::Int(20)),
1242            ]),
1243        ]
1244    }
1245
1246    #[test]
1247    fn test_select_all() {
1248        let q = SelectQuery {
1249            columns: ColumnList::All,
1250            table: "test".into(),
1251            table_alias: None,
1252            subquery: None,
1253            joins: vec![],
1254            where_clause: None,
1255            group_by: None,
1256            having: None,
1257            order_by: None,
1258            limit: None,
1259            ctes: vec![],
1260        };
1261        let (rows, _cols) = execute_inner(&q, &make_rows(), None).unwrap();
1262        assert_eq!(rows.len(), 3);
1263    }
1264
1265    #[test]
1266    fn test_select_nonexistent_column_null_filled() {
1267        // SELECT naming a column absent from the table must keep header and
1268        // rows aligned: the unknown column appears in every row as Null.
1269        let q = parse_query("SELECT title, missing_col, count FROM test").unwrap();
1270        let q = match q {
1271            Statement::Select(s) => s,
1272            _ => panic!("expected SELECT"),
1273        };
1274        let (rows, cols) = execute_inner(&q, &make_rows(), None).unwrap();
1275        assert_eq!(cols, vec!["title", "missing_col", "count"]);
1276        assert_eq!(rows.len(), 3);
1277        for row in &rows {
1278            assert_eq!(row.len(), cols.len(), "row keys must match header length");
1279            for c in &cols {
1280                assert!(row.contains_key(c), "row missing header column {c}");
1281            }
1282            assert_eq!(row.get("missing_col"), Some(&Value::Null));
1283        }
1284    }
1285
1286    #[test]
1287    fn test_select_duplicate_output_column_errors() {
1288        // A result row is keyed by output name; two columns with the same name
1289        // cannot both be represented, so the query must be rejected.
1290        let q = parse_query("SELECT title, title FROM test").unwrap();
1291        let q = match q {
1292            Statement::Select(s) => s,
1293            _ => panic!("expected SELECT"),
1294        };
1295        let err = execute_inner(&q, &make_rows(), None);
1296        assert!(err.is_err());
1297        let msg = err.unwrap_err().to_string();
1298        assert!(msg.contains("duplicate output column"), "got: {msg}");
1299    }
1300
1301    #[test]
1302    fn test_select_all_sparse_rows_aligned() {
1303        // SELECT * unions keys across rows; a row missing an optional field
1304        // must still carry every header column (as Null) so header/rows align.
1305        let rows = vec![
1306            Row::from([
1307                ("path".into(), Value::String("a.md".into())),
1308                ("title".into(), Value::String("Alpha".into())),
1309                ("kill_reason".into(), Value::String("no edge".into())),
1310            ]),
1311            Row::from([
1312                ("path".into(), Value::String("b.md".into())),
1313                ("title".into(), Value::String("Beta".into())),
1314            ]),
1315        ];
1316        let q = parse_query("SELECT * FROM test").unwrap();
1317        let q = match q {
1318            Statement::Select(s) => s,
1319            _ => panic!("expected SELECT"),
1320        };
1321        let (result, cols) = execute_inner(&q, &rows, None).unwrap();
1322        assert!(cols.contains(&"kill_reason".to_string()));
1323        for row in &result {
1324            assert_eq!(row.len(), cols.len(), "row keys must match header length");
1325            for c in &cols {
1326                assert!(row.contains_key(c), "row missing header column {c}");
1327            }
1328        }
1329        // The row that lacked kill_reason now carries it as Null.
1330        let beta = result.iter().find(|r| r.get("path") == Some(&Value::String("b.md".into()))).unwrap();
1331        assert_eq!(beta.get("kill_reason"), Some(&Value::Null));
1332    }
1333
1334    #[test]
1335    fn test_where_gt() {
1336        let q = SelectQuery {
1337            columns: ColumnList::All,
1338            table: "test".into(),
1339            table_alias: None,
1340            subquery: None,
1341            joins: vec![],
1342            where_clause: Some(WhereClause::Comparison(Comparison {
1343                column: "count".into(),
1344                op: CmpOp::Gt,
1345                value: Some(SqlValue::Int(5)),
1346                left_expr: Some(Expr::Column("count".into())),
1347                right_expr: Some(Expr::Literal(SqlValue::Int(5))),
1348            })),
1349            group_by: None,
1350            having: None,
1351            order_by: None,
1352            limit: None,
1353            ctes: vec![],
1354        };
1355        let (rows, _) = execute_inner(&q, &make_rows(), None).unwrap();
1356        assert_eq!(rows.len(), 2);
1357    }
1358
1359    #[test]
1360    fn test_order_by_desc() {
1361        let q = SelectQuery {
1362            columns: ColumnList::All,
1363            table: "test".into(),
1364            table_alias: None,
1365            subquery: None,
1366            joins: vec![],
1367            where_clause: None,
1368            group_by: None,
1369            having: None,
1370            order_by: Some(vec![OrderSpec {
1371                column: "count".into(),
1372                expr: Some(Expr::Column("count".into())),
1373                descending: true,
1374            }]),
1375            limit: None,
1376            ctes: vec![],
1377        };
1378        let (rows, _) = execute_inner(&q, &make_rows(), None).unwrap();
1379        assert_eq!(rows[0]["count"], Value::Int(20));
1380        assert_eq!(rows[2]["count"], Value::Int(5));
1381    }
1382
1383    #[test]
1384    fn test_limit() {
1385        let q = SelectQuery {
1386            columns: ColumnList::All,
1387            table: "test".into(),
1388            table_alias: None,
1389            subquery: None,
1390            joins: vec![],
1391            where_clause: None,
1392            group_by: None,
1393            having: None,
1394            order_by: None,
1395            limit: Some(2),
1396            ctes: vec![],
1397        };
1398        let (rows, _) = execute_inner(&q, &make_rows(), None).unwrap();
1399        assert_eq!(rows.len(), 2);
1400    }
1401
1402    #[test]
1403    fn test_like() {
1404        let q = SelectQuery {
1405            columns: ColumnList::All,
1406            table: "test".into(),
1407            table_alias: None,
1408            subquery: None,
1409            joins: vec![],
1410            where_clause: Some(WhereClause::Comparison(Comparison {
1411                column: "title".into(),
1412                op: CmpOp::Like,
1413                value: Some(SqlValue::String("%lph%".into())),
1414                left_expr: Some(Expr::Column("title".into())),
1415                right_expr: None,
1416            })),
1417            group_by: None,
1418            having: None,
1419            order_by: None,
1420            limit: None,
1421            ctes: vec![],
1422        };
1423        let (rows, _) = execute_inner(&q, &make_rows(), None).unwrap();
1424        assert_eq!(rows.len(), 1);
1425        assert_eq!(rows[0]["title"], Value::String("Alpha".into()));
1426    }
1427
1428    #[test]
1429    fn test_is_null() {
1430        let mut rows = make_rows();
1431        rows[1].insert("optional".into(), Value::Null);
1432
1433        let q = SelectQuery {
1434            columns: ColumnList::All,
1435            table: "test".into(),
1436            table_alias: None,
1437            subquery: None,
1438            joins: vec![],
1439            where_clause: Some(WhereClause::Comparison(Comparison {
1440                column: "optional".into(),
1441                op: CmpOp::IsNull,
1442                value: None,
1443                left_expr: Some(Expr::Column("optional".into())),
1444                right_expr: None,
1445            })),
1446            group_by: None,
1447            having: None,
1448            order_by: None,
1449            limit: None,
1450            ctes: vec![],
1451        };
1452        let (result, _) = execute_inner(&q, &rows, None).unwrap();
1453        // All rows where optional is NULL or missing
1454        assert_eq!(result.len(), 3);
1455    }
1456
1457    #[test]
1458    fn test_where_boolean_literal_filters_rows() {
1459        // #60 end-to-end: `flag = true` must match Bool rows, not evaluate
1460        // against a nonexistent column named "true" and return nothing.
1461        let mut rows = make_rows();
1462        rows[0].insert("flag".into(), Value::Bool(true));
1463        rows[1].insert("flag".into(), Value::Bool(false));
1464        rows[2].insert("flag".into(), Value::Bool(true));
1465
1466        for (sql, expected) in [
1467            ("SELECT path FROM test WHERE flag = true", 2),
1468            ("SELECT path FROM test WHERE flag = FALSE", 1),
1469            ("SELECT path FROM test WHERE flag != true", 1),
1470        ] {
1471            let q = match parse_query(sql).unwrap() {
1472                Statement::Select(s) => s,
1473                _ => panic!("expected SELECT"),
1474            };
1475            let (result, _) = execute_inner(&q, &rows, None).unwrap();
1476            assert_eq!(result.len(), expected, "query: {sql}");
1477        }
1478    }
1479
1480    // ── Expression evaluation tests ─────────────────────────��─────
1481
1482    #[test]
1483    fn test_evaluate_expr_literal() {
1484        let row = Row::new();
1485        assert_eq!(evaluate_expr(&Expr::Literal(SqlValue::Int(42)), &row), Value::Int(42));
1486        assert_eq!(evaluate_expr(&Expr::Literal(SqlValue::Float(3.14)), &row), Value::Float(3.14));
1487        assert_eq!(evaluate_expr(&Expr::Literal(SqlValue::Null), &row), Value::Null);
1488    }
1489
1490    #[test]
1491    fn test_evaluate_expr_column() {
1492        let row = Row::from([("x".into(), Value::Int(10))]);
1493        assert_eq!(evaluate_expr(&Expr::Column("x".into()), &row), Value::Int(10));
1494        assert_eq!(evaluate_expr(&Expr::Column("missing".into()), &row), Value::Null);
1495    }
1496
1497    #[test]
1498    fn test_evaluate_expr_int_arithmetic() {
1499        let row = Row::from([("a".into(), Value::Int(10)), ("b".into(), Value::Int(3))]);
1500        let add = Expr::BinaryOp {
1501            left: Box::new(Expr::Column("a".into())),
1502            op: ArithOp::Add,
1503            right: Box::new(Expr::Column("b".into())),
1504        };
1505        assert_eq!(evaluate_expr(&add, &row), Value::Int(13));
1506
1507        let sub = Expr::BinaryOp {
1508            left: Box::new(Expr::Column("a".into())),
1509            op: ArithOp::Sub,
1510            right: Box::new(Expr::Column("b".into())),
1511        };
1512        assert_eq!(evaluate_expr(&sub, &row), Value::Int(7));
1513
1514        let mul = Expr::BinaryOp {
1515            left: Box::new(Expr::Column("a".into())),
1516            op: ArithOp::Mul,
1517            right: Box::new(Expr::Column("b".into())),
1518        };
1519        assert_eq!(evaluate_expr(&mul, &row), Value::Int(30));
1520
1521        let div = Expr::BinaryOp {
1522            left: Box::new(Expr::Column("a".into())),
1523            op: ArithOp::Div,
1524            right: Box::new(Expr::Column("b".into())),
1525        };
1526        assert_eq!(evaluate_expr(&div, &row), Value::Int(3)); // integer division
1527
1528        let modulo = Expr::BinaryOp {
1529            left: Box::new(Expr::Column("a".into())),
1530            op: ArithOp::Mod,
1531            right: Box::new(Expr::Column("b".into())),
1532        };
1533        assert_eq!(evaluate_expr(&modulo, &row), Value::Int(1));
1534    }
1535
1536    #[test]
1537    fn test_evaluate_expr_float_coercion() {
1538        let row = Row::from([("a".into(), Value::Int(10)), ("b".into(), Value::Float(3.0))]);
1539        let add = Expr::BinaryOp {
1540            left: Box::new(Expr::Column("a".into())),
1541            op: ArithOp::Add,
1542            right: Box::new(Expr::Column("b".into())),
1543        };
1544        assert_eq!(evaluate_expr(&add, &row), Value::Float(13.0));
1545    }
1546
1547    #[test]
1548    fn test_evaluate_expr_null_propagation() {
1549        let row = Row::from([("a".into(), Value::Int(10))]);
1550        let add = Expr::BinaryOp {
1551            left: Box::new(Expr::Column("a".into())),
1552            op: ArithOp::Add,
1553            right: Box::new(Expr::Column("missing".into())),
1554        };
1555        assert_eq!(evaluate_expr(&add, &row), Value::Null);
1556    }
1557
1558    #[test]
1559    fn test_evaluate_expr_div_by_zero() {
1560        let row = Row::from([("a".into(), Value::Int(10)), ("b".into(), Value::Int(0))]);
1561        let div = Expr::BinaryOp {
1562            left: Box::new(Expr::Column("a".into())),
1563            op: ArithOp::Div,
1564            right: Box::new(Expr::Column("b".into())),
1565        };
1566        assert_eq!(evaluate_expr(&div, &row), Value::Null);
1567    }
1568
1569    #[test]
1570    fn test_evaluate_expr_unary_minus() {
1571        let row = Row::from([("x".into(), Value::Int(5))]);
1572        let neg = Expr::UnaryMinus(Box::new(Expr::Column("x".into())));
1573        assert_eq!(evaluate_expr(&neg, &row), Value::Int(-5));
1574    }
1575
1576    #[test]
1577    fn test_select_with_expression() {
1578        // Integration test: SELECT count * 2 AS doubled FROM test
1579        let stmt = crate::query_parser::parse_query(
1580            "SELECT count * 2 AS doubled FROM test"
1581        ).unwrap();
1582        if let crate::query_parser::Statement::Select(q) = stmt {
1583            let (rows, cols) = execute_inner(&q, &make_rows(), None).unwrap();
1584            assert_eq!(cols, vec!["doubled"]);
1585            assert_eq!(rows.len(), 3);
1586            // Rows are: count=10, count=5, count=20
1587            let values: Vec<Value> = rows.iter().map(|r| r["doubled"].clone()).collect();
1588            assert!(values.contains(&Value::Int(20)));
1589            assert!(values.contains(&Value::Int(10)));
1590            assert!(values.contains(&Value::Int(40)));
1591        } else {
1592            panic!("Expected Select");
1593        }
1594    }
1595
1596    #[test]
1597    fn test_where_with_expression() {
1598        // SELECT * FROM test WHERE count * 2 > 15
1599        let stmt = crate::query_parser::parse_query(
1600            "SELECT * FROM test WHERE count * 2 > 15"
1601        ).unwrap();
1602        if let crate::query_parser::Statement::Select(q) = stmt {
1603            let (rows, _) = execute_inner(&q, &make_rows(), None).unwrap();
1604            // count=10 → 20 > 15 ✓, count=5 → 10 > 15 ✗, count=20 → 40 > 15 ✓
1605            assert_eq!(rows.len(), 2);
1606        } else {
1607            panic!("Expected Select");
1608        }
1609    }
1610
1611    #[test]
1612    fn test_order_by_expression() {
1613        // SELECT * FROM test ORDER BY count * -1 ASC (effectively DESC by count)
1614        let stmt = crate::query_parser::parse_query(
1615            "SELECT title, count FROM test ORDER BY count * -1 ASC"
1616        ).unwrap();
1617        if let crate::query_parser::Statement::Select(q) = stmt {
1618            let (rows, _) = execute_inner(&q, &make_rows(), None).unwrap();
1619            // count: 20 → -20, 10 → -10, 5 → -5, ASC means -20, -10, -5
1620            assert_eq!(rows[0]["count"], Value::Int(20));
1621            assert_eq!(rows[1]["count"], Value::Int(10));
1622            assert_eq!(rows[2]["count"], Value::Int(5));
1623        } else {
1624            panic!("Expected Select");
1625        }
1626    }
1627
1628    // ── CASE WHEN evaluation tests ────────────────────────────────
1629
1630    #[test]
1631    fn test_case_when_eval_basic() {
1632        let row = Row::from([("status".into(), Value::String("ACTIVE".into()))]);
1633        let expr = Expr::Case {
1634            whens: vec![(
1635                WhereClause::Comparison(Comparison {
1636                    column: "status".into(),
1637                    op: CmpOp::Eq,
1638                    value: Some(SqlValue::String("ACTIVE".into())),
1639                    left_expr: Some(Expr::Column("status".into())),
1640                    right_expr: Some(Expr::Literal(SqlValue::String("ACTIVE".into()))),
1641                }),
1642                Box::new(Expr::Literal(SqlValue::Int(1))),
1643            )],
1644            else_expr: Some(Box::new(Expr::Literal(SqlValue::Int(0)))),
1645        };
1646        assert_eq!(evaluate_expr(&expr, &row), Value::Int(1));
1647    }
1648
1649    #[test]
1650    fn test_case_when_eval_else() {
1651        let row = Row::from([("status".into(), Value::String("KILLED".into()))]);
1652        let expr = Expr::Case {
1653            whens: vec![(
1654                WhereClause::Comparison(Comparison {
1655                    column: "status".into(),
1656                    op: CmpOp::Eq,
1657                    value: Some(SqlValue::String("ACTIVE".into())),
1658                    left_expr: Some(Expr::Column("status".into())),
1659                    right_expr: Some(Expr::Literal(SqlValue::String("ACTIVE".into()))),
1660                }),
1661                Box::new(Expr::Literal(SqlValue::Int(1))),
1662            )],
1663            else_expr: Some(Box::new(Expr::Literal(SqlValue::Int(0)))),
1664        };
1665        assert_eq!(evaluate_expr(&expr, &row), Value::Int(0));
1666    }
1667
1668    #[test]
1669    fn test_case_when_eval_no_else_null() {
1670        let row = Row::from([("x".into(), Value::Int(99))]);
1671        let expr = Expr::Case {
1672            whens: vec![(
1673                WhereClause::Comparison(Comparison {
1674                    column: "x".into(),
1675                    op: CmpOp::Eq,
1676                    value: Some(SqlValue::Int(1)),
1677                    left_expr: Some(Expr::Column("x".into())),
1678                    right_expr: Some(Expr::Literal(SqlValue::Int(1))),
1679                }),
1680                Box::new(Expr::Literal(SqlValue::String("one".into()))),
1681            )],
1682            else_expr: None,
1683        };
1684        assert_eq!(evaluate_expr(&expr, &row), Value::Null);
1685    }
1686
1687    #[test]
1688    fn test_case_when_in_aggregate_query() {
1689        // SUM(CASE WHEN count > 5 THEN count ELSE 0 END)
1690        // Rows: count=10, count=5, count=20 → should sum 10 + 0 + 20 = 30
1691        let stmt = crate::query_parser::parse_query(
1692            "SELECT SUM(CASE WHEN count > 5 THEN count ELSE 0 END) AS total FROM test"
1693        ).unwrap();
1694        if let crate::query_parser::Statement::Select(q) = stmt {
1695            let (rows, cols) = execute_inner(&q, &make_rows(), None).unwrap();
1696            assert_eq!(cols, vec!["total"]);
1697            assert_eq!(rows.len(), 1);
1698            assert_eq!(rows[0]["total"], Value::Float(30.0));
1699        } else {
1700            panic!("Expected Select");
1701        }
1702    }
1703
1704    #[test]
1705    fn test_case_when_with_unary_minus_in_aggregate() {
1706        // SUM(CASE WHEN title = 'Alpha' THEN count ELSE -count END)
1707        // Alpha: 10, Beta: -5, Gamma: -20 → 10 - 5 - 20 = -15
1708        let stmt = crate::query_parser::parse_query(
1709            "SELECT SUM(CASE WHEN title = 'Alpha' THEN count ELSE -count END) AS net FROM test"
1710        ).unwrap();
1711        if let crate::query_parser::Statement::Select(q) = stmt {
1712            let (rows, _) = execute_inner(&q, &make_rows(), None).unwrap();
1713            assert_eq!(rows.len(), 1);
1714            assert_eq!(rows[0]["net"], Value::Float(-15.0));
1715        } else {
1716            panic!("Expected Select");
1717        }
1718    }
1719
1720    #[test]
1721    fn test_dateadd_with_dict_in_group_by() {
1722        // Simulate a joined row with a dict field, then GROUP BY + DateAdd expr
1723        use indexmap::IndexMap;
1724        let mut params = IndexMap::new();
1725        params.insert("exit_days".to_string(), Value::Int(21));
1726
1727        let rows = vec![
1728            Row::from([
1729                ("o.token".into(), Value::String("BTC".into())),
1730                ("o.event_date".into(), Value::Date(
1731                    chrono::NaiveDate::from_ymd_opt(2026, 1, 1).unwrap()
1732                )),
1733                ("o.size".into(), Value::Int(100)),
1734                ("s.params".into(), Value::Dict(params.clone())),
1735            ]),
1736            Row::from([
1737                ("o.token".into(), Value::String("BTC".into())),
1738                ("o.event_date".into(), Value::Date(
1739                    chrono::NaiveDate::from_ymd_opt(2026, 1, 1).unwrap()
1740                )),
1741                ("o.size".into(), Value::Int(50)),
1742                ("s.params".into(), Value::Dict(params.clone())),
1743            ]),
1744        ];
1745
1746        let q = SelectQuery {
1747            columns: ColumnList::Named(vec![
1748                SelectExpr::Column("o.token".into()),
1749                SelectExpr::Column("o.event_date".into()),
1750                SelectExpr::Expr {
1751                    expr: Expr::DateAdd {
1752                        date: Box::new(Expr::Column("o.event_date".into())),
1753                        days: Box::new(Expr::Column("s.params.exit_days".into())),
1754                    },
1755                    alias: Some("exit_date".into()),
1756                },
1757                SelectExpr::Aggregate {
1758                    func: AggFunc::Sum,
1759                    arg: "o.size".into(),
1760                    arg_expr: Some(Expr::Column("o.size".into())),
1761                    alias: Some("total".into()),
1762                },
1763            ]),
1764            table: "orders".into(),
1765            table_alias: None,
1766            subquery: None,
1767            joins: vec![],
1768            where_clause: None,
1769            group_by: Some(vec!["o.token".into(), "o.event_date".into()]),
1770            having: None,
1771            order_by: None,
1772            limit: None,
1773            ctes: vec![],
1774        };
1775
1776        let (rows, cols) = execute_inner(&q, &rows, None).unwrap();
1777        assert_eq!(rows.len(), 1);
1778        assert!(cols.contains(&"exit_date".to_string()));
1779        assert_eq!(rows[0]["total"], Value::Float(150.0));
1780        // The key test: exit_date should be 2026-01-22, not Null
1781        assert_eq!(
1782            rows[0]["exit_date"],
1783            Value::Date(chrono::NaiveDate::from_ymd_opt(2026, 1, 22).unwrap())
1784        );
1785    }
1786
1787    #[test]
1788    fn test_aggregate_arithmetic() {
1789        // SUM(count) for all rows = 10 + 5 + 20 = 35
1790        // COUNT(*) = 3
1791        // SUM produces Float, COUNT produces Int → mixed → Float division
1792        let stmt = crate::query_parser::parse_query(
1793            "SELECT SUM(count) / COUNT(*) AS avg_count FROM test"
1794        ).unwrap();
1795        if let crate::query_parser::Statement::Select(q) = stmt {
1796            let (rows, cols) = execute_inner(&q, &make_rows(), None).unwrap();
1797            assert_eq!(cols, vec!["avg_count"]);
1798            assert_eq!(rows.len(), 1);
1799            match &rows[0]["avg_count"] {
1800                Value::Float(f) => assert!((f - 11.666666666666666).abs() < 0.001),
1801                other => panic!("Expected Float, got {:?}", other),
1802            }
1803        } else {
1804            panic!("Expected Select");
1805        }
1806    }
1807
1808    #[test]
1809    fn test_aggregate_subtraction_with_group_by() {
1810        let rows = vec![
1811            {
1812                let mut r = Row::new();
1813                r.insert("token".into(), Value::String("BTC".into()));
1814                r.insert("side".into(), Value::String("BUY".into()));
1815                r.insert("size".into(), Value::Float(100.0));
1816                r
1817            },
1818            {
1819                let mut r = Row::new();
1820                r.insert("token".into(), Value::String("BTC".into()));
1821                r.insert("side".into(), Value::String("SELL".into()));
1822                r.insert("size".into(), Value::Float(60.0));
1823                r
1824            },
1825        ];
1826        let stmt = crate::query_parser::parse_query(
1827            "SELECT token, SUM(CASE WHEN side = 'BUY' THEN size ELSE 0 END) - SUM(CASE WHEN side = 'SELL' THEN size ELSE 0 END) AS net FROM test GROUP BY token"
1828        ).unwrap();
1829        if let crate::query_parser::Statement::Select(q) = stmt {
1830            let (result, _) = execute_inner(&q, &rows, None).unwrap();
1831            assert_eq!(result.len(), 1);
1832            assert_eq!(result[0]["net"], Value::Float(40.0));
1833        } else {
1834            panic!("Expected Select");
1835        }
1836    }
1837
1838    // ── Issue #42: Aggregate subtraction without GROUP BY ──
1839
1840    #[test]
1841    fn test_aggregate_subtraction_no_group() {
1842        // SUM(count) = 10 + 5 + 20 = 35, COUNT(*) = 3, diff = 35 - 3 = 32
1843        let stmt = crate::query_parser::parse_query(
1844            "SELECT SUM(count) - COUNT(*) as diff FROM test"
1845        ).unwrap();
1846        if let crate::query_parser::Statement::Select(q) = stmt {
1847            let (rows, cols) = execute_inner(&q, &make_rows(), None).unwrap();
1848            assert_eq!(cols, vec!["diff"]);
1849            assert_eq!(rows.len(), 1);
1850            assert_eq!(rows[0]["diff"], Value::Float(32.0));
1851        } else {
1852            panic!("Expected Select");
1853        }
1854    }
1855
1856    // ── Issue #42: Aggregate division with GROUP BY ──
1857
1858    #[test]
1859    fn test_aggregate_division_with_group_by() {
1860        let rows = vec![
1861            {
1862                let mut r = Row::new();
1863                r.insert("category".into(), Value::String("A".into()));
1864                r.insert("count".into(), Value::Int(10));
1865                r
1866            },
1867            {
1868                let mut r = Row::new();
1869                r.insert("category".into(), Value::String("A".into()));
1870                r.insert("count".into(), Value::Int(20));
1871                r
1872            },
1873            {
1874                let mut r = Row::new();
1875                r.insert("category".into(), Value::String("B".into()));
1876                r.insert("count".into(), Value::Int(6));
1877                r
1878            },
1879        ];
1880        // Group A: SUM(count)=30, COUNT(*)=2, ratio=15.0
1881        // Group B: SUM(count)=6, COUNT(*)=1, ratio=6.0
1882        let stmt = crate::query_parser::parse_query(
1883            "SELECT category, SUM(count) / COUNT(*) as ratio FROM test GROUP BY category"
1884        ).unwrap();
1885        if let crate::query_parser::Statement::Select(q) = stmt {
1886            let (result, cols) = execute_inner(&q, &rows, None).unwrap();
1887            assert!(cols.contains(&"ratio".to_string()));
1888            assert_eq!(result.len(), 2);
1889            // Find group A and B by category value
1890            let group_a = result.iter().find(|r| r["category"] == Value::String("A".into())).unwrap();
1891            let group_b = result.iter().find(|r| r["category"] == Value::String("B".into())).unwrap();
1892            match &group_a["ratio"] {
1893                Value::Float(f) => assert!((f - 15.0).abs() < 0.001),
1894                other => panic!("Expected Float for group A ratio, got {:?}", other),
1895            }
1896            match &group_b["ratio"] {
1897                Value::Float(f) => assert!((f - 6.0).abs() < 0.001),
1898                other => panic!("Expected Float for group B ratio, got {:?}", other),
1899            }
1900        } else {
1901            panic!("Expected Select");
1902        }
1903    }
1904
1905    // ── Window function tests ────────────────────────────────────
1906
1907    #[test]
1908    fn test_window_row_number() {
1909        let stmt = crate::query_parser::parse_query(
1910            "SELECT title, ROW_NUMBER() OVER (ORDER BY count DESC) AS rn FROM test"
1911        ).unwrap();
1912        if let crate::query_parser::Statement::Select(q) = stmt {
1913            let (rows, cols) = execute_inner(&q, &make_rows(), None).unwrap();
1914            assert_eq!(cols, vec!["title", "rn"]);
1915            assert_eq!(rows.len(), 3);
1916            let by_title: HashMap<String, i64> = rows.iter()
1917                .map(|r| (r["title"].to_display_string(), match &r["rn"] { Value::Int(n) => *n, _ => panic!("Expected Int") }))
1918                .collect();
1919            assert_eq!(by_title["Gamma"], 1); // count=20
1920            assert_eq!(by_title["Alpha"], 2); // count=10
1921            assert_eq!(by_title["Beta"], 3);  // count=5
1922        } else {
1923            panic!("Expected Select");
1924        }
1925    }
1926
1927    #[test]
1928    fn test_window_rank_with_ties() {
1929        let mut rows = make_rows();
1930        rows[0].insert("count".into(), Value::Int(10));
1931        rows[1].insert("count".into(), Value::Int(10));
1932        rows[2].insert("count".into(), Value::Int(5));
1933
1934        let stmt = crate::query_parser::parse_query(
1935            "SELECT title, RANK() OVER (ORDER BY count DESC) AS rnk FROM test"
1936        ).unwrap();
1937        if let crate::query_parser::Statement::Select(q) = stmt {
1938            let (result, _) = execute_inner(&q, &rows, None).unwrap();
1939            let ranks: Vec<i64> = result.iter()
1940                .map(|r| match &r["rnk"] { Value::Int(n) => *n, _ => panic!("Expected Int") })
1941                .collect();
1942            assert!(ranks.contains(&1)); // two tied at rank 1
1943            assert!(ranks.iter().filter(|&&r| r == 1).count() == 2);
1944            assert!(ranks.contains(&3)); // rank 3 (gap after tie)
1945        } else {
1946            panic!("Expected Select");
1947        }
1948    }
1949
1950    #[test]
1951    fn test_window_dense_rank() {
1952        let mut rows = make_rows();
1953        rows[0].insert("count".into(), Value::Int(10));
1954        rows[1].insert("count".into(), Value::Int(10));
1955        rows[2].insert("count".into(), Value::Int(5));
1956
1957        let stmt = crate::query_parser::parse_query(
1958            "SELECT title, DENSE_RANK() OVER (ORDER BY count DESC) AS dr FROM test"
1959        ).unwrap();
1960        if let crate::query_parser::Statement::Select(q) = stmt {
1961            let (result, _) = execute_inner(&q, &rows, None).unwrap();
1962            let ranks: Vec<i64> = result.iter()
1963                .map(|r| match &r["dr"] { Value::Int(n) => *n, _ => panic!("Expected Int") })
1964                .collect();
1965            assert!(ranks.iter().filter(|&&r| r == 1).count() == 2);
1966            assert!(ranks.contains(&2)); // dense rank: no gap
1967            assert!(!ranks.contains(&3));
1968        } else {
1969            panic!("Expected Select");
1970        }
1971    }
1972
1973    #[test]
1974    fn test_window_lag() {
1975        let stmt = crate::query_parser::parse_query(
1976            "SELECT title, LAG(count, 1) OVER (ORDER BY count ASC) AS prev FROM test"
1977        ).unwrap();
1978        if let crate::query_parser::Statement::Select(q) = stmt {
1979            let (rows, _) = execute_inner(&q, &make_rows(), None).unwrap();
1980            // Sorted ASC: Beta(5), Alpha(10), Gamma(20)
1981            // LAG: NULL, 5, 10
1982            let first = rows.iter().find(|r| r["title"] == Value::String("Beta".into())).unwrap();
1983            assert_eq!(first["prev"], Value::Null);
1984            let second = rows.iter().find(|r| r["title"] == Value::String("Alpha".into())).unwrap();
1985            assert_eq!(second["prev"], Value::Int(5));
1986            let third = rows.iter().find(|r| r["title"] == Value::String("Gamma".into())).unwrap();
1987            assert_eq!(third["prev"], Value::Int(10));
1988        } else {
1989            panic!("Expected Select");
1990        }
1991    }
1992
1993    #[test]
1994    fn test_window_lead() {
1995        let stmt = crate::query_parser::parse_query(
1996            "SELECT title, LEAD(count, 1) OVER (ORDER BY count ASC) AS next FROM test"
1997        ).unwrap();
1998        if let crate::query_parser::Statement::Select(q) = stmt {
1999            let (rows, _) = execute_inner(&q, &make_rows(), None).unwrap();
2000            let first = rows.iter().find(|r| r["title"] == Value::String("Beta".into())).unwrap();
2001            assert_eq!(first["next"], Value::Int(10));
2002            let last = rows.iter().find(|r| r["title"] == Value::String("Gamma".into())).unwrap();
2003            assert_eq!(last["next"], Value::Null);
2004        } else {
2005            panic!("Expected Select");
2006        }
2007    }
2008
2009    #[test]
2010    fn test_window_sum_partition() {
2011        let rows = vec![
2012            Row::from([
2013                ("cat".into(), Value::String("A".into())),
2014                ("val".into(), Value::Int(10)),
2015            ]),
2016            Row::from([
2017                ("cat".into(), Value::String("A".into())),
2018                ("val".into(), Value::Int(20)),
2019            ]),
2020            Row::from([
2021                ("cat".into(), Value::String("B".into())),
2022                ("val".into(), Value::Int(5)),
2023            ]),
2024        ];
2025        let stmt = crate::query_parser::parse_query(
2026            "SELECT cat, val, SUM(val) OVER (PARTITION BY cat) AS cat_total FROM test"
2027        ).unwrap();
2028        if let crate::query_parser::Statement::Select(q) = stmt {
2029            let (result, cols) = execute_inner(&q, &rows, None).unwrap();
2030            assert_eq!(cols, vec!["cat", "val", "cat_total"]);
2031            assert_eq!(result.len(), 3);
2032            let a_rows: Vec<_> = result.iter().filter(|r| r["cat"] == Value::String("A".into())).collect();
2033            assert_eq!(a_rows.len(), 2);
2034            for r in &a_rows {
2035                assert_eq!(r["cat_total"], Value::Float(30.0));
2036            }
2037            let b_row = result.iter().find(|r| r["cat"] == Value::String("B".into())).unwrap();
2038            assert_eq!(b_row["cat_total"], Value::Float(5.0));
2039        } else {
2040            panic!("Expected Select");
2041        }
2042    }
2043
2044    #[test]
2045    fn test_window_with_where_order_limit() {
2046        let stmt = crate::query_parser::parse_query(
2047            "SELECT title, ROW_NUMBER() OVER (ORDER BY count DESC) AS rn FROM test WHERE count > 4 ORDER BY rn LIMIT 2"
2048        ).unwrap();
2049        if let crate::query_parser::Statement::Select(q) = stmt {
2050            let (result, _) = execute_inner(&q, &make_rows(), None).unwrap();
2051            assert_eq!(result.len(), 2);
2052            assert_eq!(result[0]["rn"], Value::Int(1));
2053            assert_eq!(result[1]["rn"], Value::Int(2));
2054        } else {
2055            panic!("Expected Select");
2056        }
2057    }
2058}