Skip to main content

citadeldb_sql/
eval.rs

1//! Expression evaluator with SQL three-valued logic.
2
3use crate::error::{Result, SqlError};
4use crate::parser::{BinOp, Expr, UnaryOp};
5use crate::types::{ColumnDef, DataType, Value};
6
7/// Evaluate an expression against a row.
8///
9/// `columns` maps column names to their positions.
10/// `row` is the full row of values (all columns).
11pub fn eval_expr(expr: &Expr, columns: &[ColumnDef], row: &[Value]) -> Result<Value> {
12    match expr {
13        Expr::Literal(v) => Ok(v.clone()),
14
15        Expr::Column(name) => {
16            let lower = name.to_ascii_lowercase();
17            let matches: Vec<usize> = columns.iter().enumerate()
18                .filter(|(_, c)| {
19                    let cn = c.name.to_ascii_lowercase();
20                    cn == lower || cn.ends_with(&format!(".{lower}"))
21                })
22                .map(|(i, _)| i)
23                .collect();
24            match matches.len() {
25                0 => Err(SqlError::ColumnNotFound(name.clone())),
26                1 => Ok(row[matches[0]].clone()),
27                _ => Err(SqlError::AmbiguousColumn(name.clone())),
28            }
29        }
30
31        Expr::QualifiedColumn { table, column } => {
32            let qualified = format!("{}.{}", table.to_ascii_lowercase(), column.to_ascii_lowercase());
33            let idx = columns.iter()
34                .position(|c| c.name.to_ascii_lowercase() == qualified)
35                .or_else(|| {
36                    let lower_col = column.to_ascii_lowercase();
37                    let matches: Vec<usize> = columns.iter().enumerate()
38                        .filter(|(_, c)| c.name.to_ascii_lowercase() == lower_col)
39                        .map(|(i, _)| i)
40                        .collect();
41                    if matches.len() == 1 { Some(matches[0]) } else { None }
42                })
43                .ok_or_else(|| SqlError::ColumnNotFound(format!("{table}.{column}")))?;
44            Ok(row[idx].clone())
45        }
46
47        Expr::BinaryOp { left, op, right } => {
48            let lval = eval_expr(left, columns, row)?;
49            let rval = eval_expr(right, columns, row)?;
50            eval_binary_op(&lval, *op, &rval)
51        }
52
53        Expr::UnaryOp { op, expr } => {
54            let val = eval_expr(expr, columns, row)?;
55            eval_unary_op(*op, &val)
56        }
57
58        Expr::IsNull(e) => {
59            let val = eval_expr(e, columns, row)?;
60            Ok(Value::Boolean(val.is_null()))
61        }
62
63        Expr::IsNotNull(e) => {
64            let val = eval_expr(e, columns, row)?;
65            Ok(Value::Boolean(!val.is_null()))
66        }
67
68        Expr::Function { name, args } => {
69            eval_scalar_function(name, args, columns, row)
70        }
71
72        Expr::CountStar => {
73            Err(SqlError::Unsupported("COUNT(*) in non-aggregate context".into()))
74        }
75
76        Expr::InList { expr: e, list, negated } => {
77            let lhs = eval_expr(e, columns, row)?;
78            eval_in_values(&lhs, list, columns, row, *negated)
79        }
80
81        Expr::InSet { expr: e, values, has_null, negated } => {
82            let lhs = eval_expr(e, columns, row)?;
83            eval_in_set(&lhs, values, *has_null, *negated)
84        }
85
86        Expr::Between { expr: e, low, high, negated } => {
87            let val = eval_expr(e, columns, row)?;
88            let lo = eval_expr(low, columns, row)?;
89            let hi = eval_expr(high, columns, row)?;
90            eval_between(&val, &lo, &hi, *negated)
91        }
92
93        Expr::Like { expr: e, pattern, escape, negated } => {
94            let val = eval_expr(e, columns, row)?;
95            let pat = eval_expr(pattern, columns, row)?;
96            let esc = escape.as_ref()
97                .map(|e| eval_expr(e, columns, row))
98                .transpose()?;
99            eval_like(&val, &pat, esc.as_ref(), *negated)
100        }
101
102        Expr::Case { operand, conditions, else_result } => {
103            eval_case(operand.as_deref(), conditions, else_result.as_deref(), columns, row)
104        }
105
106        Expr::Coalesce(args) => {
107            for arg in args {
108                let val = eval_expr(arg, columns, row)?;
109                if !val.is_null() {
110                    return Ok(val);
111                }
112            }
113            Ok(Value::Null)
114        }
115
116        Expr::Cast { expr: e, data_type } => {
117            let val = eval_expr(e, columns, row)?;
118            eval_cast(&val, *data_type)
119        }
120
121        Expr::InSubquery { .. } | Expr::Exists { .. } | Expr::ScalarSubquery(_) => {
122            Err(SqlError::Unsupported(
123                "subquery not materialized (internal error)".into(),
124            ))
125        }
126
127        Expr::Parameter(n) => {
128            Err(SqlError::Parse(format!("unbound parameter ${n}")))
129        }
130    }
131}
132
133fn eval_binary_op(left: &Value, op: BinOp, right: &Value) -> Result<Value> {
134    // SQL three-valued logic for AND/OR
135    match op {
136        BinOp::And => return eval_and(left, right),
137        BinOp::Or => return eval_or(left, right),
138        _ => {}
139    }
140
141    // NULL propagation for all other ops (including || per SQL standard)
142    if left.is_null() || right.is_null() {
143        return Ok(Value::Null);
144    }
145
146    match op {
147        BinOp::Eq => Ok(Value::Boolean(left == right)),
148        BinOp::NotEq => Ok(Value::Boolean(left != right)),
149        BinOp::Lt => Ok(Value::Boolean(left < right)),
150        BinOp::Gt => Ok(Value::Boolean(left > right)),
151        BinOp::LtEq => Ok(Value::Boolean(left <= right)),
152        BinOp::GtEq => Ok(Value::Boolean(left >= right)),
153        BinOp::Add => eval_arithmetic(left, right, i64::checked_add, |a, b| a + b),
154        BinOp::Sub => eval_arithmetic(left, right, i64::checked_sub, |a, b| a - b),
155        BinOp::Mul => eval_arithmetic(left, right, i64::checked_mul, |a, b| a * b),
156        BinOp::Div => {
157            match right {
158                Value::Integer(0) => return Err(SqlError::DivisionByZero),
159                Value::Real(r) if *r == 0.0 => return Err(SqlError::DivisionByZero),
160                _ => {}
161            }
162            eval_arithmetic(left, right, i64::checked_div, |a, b| a / b)
163        }
164        BinOp::Mod => {
165            match right {
166                Value::Integer(0) => return Err(SqlError::DivisionByZero),
167                Value::Real(r) if *r == 0.0 => return Err(SqlError::DivisionByZero),
168                _ => {}
169            }
170            eval_arithmetic(left, right, i64::checked_rem, |a, b| a % b)
171        }
172        BinOp::Concat => {
173            let ls = value_to_text(left);
174            let rs = value_to_text(right);
175            Ok(Value::Text(format!("{ls}{rs}")))
176        }
177        BinOp::And | BinOp::Or => unreachable!(),
178    }
179}
180
181/// SQL three-valued AND: NULL AND false = false, NULL AND true = NULL
182fn eval_and(left: &Value, right: &Value) -> Result<Value> {
183    let l = to_bool_or_null(left)?;
184    let r = to_bool_or_null(right)?;
185    match (l, r) {
186        (Some(false), _) | (_, Some(false)) => Ok(Value::Boolean(false)),
187        (Some(true), Some(true)) => Ok(Value::Boolean(true)),
188        _ => Ok(Value::Null),
189    }
190}
191
192/// SQL three-valued OR: NULL OR true = true, NULL OR false = NULL
193fn eval_or(left: &Value, right: &Value) -> Result<Value> {
194    let l = to_bool_or_null(left)?;
195    let r = to_bool_or_null(right)?;
196    match (l, r) {
197        (Some(true), _) | (_, Some(true)) => Ok(Value::Boolean(true)),
198        (Some(false), Some(false)) => Ok(Value::Boolean(false)),
199        _ => Ok(Value::Null),
200    }
201}
202
203fn to_bool_or_null(val: &Value) -> Result<Option<bool>> {
204    match val {
205        Value::Boolean(b) => Ok(Some(*b)),
206        Value::Null => Ok(None),
207        Value::Integer(i) => Ok(Some(*i != 0)),
208        _ => Err(SqlError::TypeMismatch {
209            expected: "BOOLEAN".into(),
210            got: format!("{}", val.data_type()),
211        }),
212    }
213}
214
215fn eval_arithmetic(
216    left: &Value,
217    right: &Value,
218    int_op: fn(i64, i64) -> Option<i64>,
219    real_op: fn(f64, f64) -> f64,
220) -> Result<Value> {
221    match (left, right) {
222        (Value::Integer(a), Value::Integer(b)) => {
223            int_op(*a, *b).map(Value::Integer).ok_or(SqlError::IntegerOverflow)
224        }
225        (Value::Real(a), Value::Real(b)) => Ok(Value::Real(real_op(*a, *b))),
226        (Value::Integer(a), Value::Real(b)) => Ok(Value::Real(real_op(*a as f64, *b))),
227        (Value::Real(a), Value::Integer(b)) => Ok(Value::Real(real_op(*a, *b as f64))),
228        _ => Err(SqlError::TypeMismatch {
229            expected: "numeric".into(),
230            got: format!("{} and {}", left.data_type(), right.data_type()),
231        }),
232    }
233}
234
235fn eval_in_values(
236    lhs: &Value,
237    list: &[Expr],
238    columns: &[ColumnDef],
239    row: &[Value],
240    negated: bool,
241) -> Result<Value> {
242    if list.is_empty() {
243        return Ok(Value::Boolean(negated));
244    }
245    if lhs.is_null() {
246        return Ok(Value::Null);
247    }
248    let mut has_null = false;
249    for item in list {
250        let rhs = eval_expr(item, columns, row)?;
251        if rhs.is_null() {
252            has_null = true;
253        } else if lhs == &rhs {
254            return Ok(Value::Boolean(!negated));
255        }
256    }
257    if has_null {
258        Ok(Value::Null)
259    } else {
260        Ok(Value::Boolean(negated))
261    }
262}
263
264fn eval_in_set(
265    lhs: &Value,
266    values: &std::collections::HashSet<Value>,
267    has_null: bool,
268    negated: bool,
269) -> Result<Value> {
270    if values.is_empty() && !has_null {
271        return Ok(Value::Boolean(negated));
272    }
273    if lhs.is_null() {
274        return Ok(Value::Null);
275    }
276    if values.contains(lhs) {
277        return Ok(Value::Boolean(!negated));
278    }
279    if has_null {
280        Ok(Value::Null)
281    } else {
282        Ok(Value::Boolean(negated))
283    }
284}
285
286fn eval_unary_op(op: UnaryOp, val: &Value) -> Result<Value> {
287    if val.is_null() {
288        return Ok(Value::Null);
289    }
290    match op {
291        UnaryOp::Neg => match val {
292            Value::Integer(i) => {
293                i.checked_neg()
294                    .map(Value::Integer)
295                    .ok_or(SqlError::IntegerOverflow)
296            }
297            Value::Real(r) => Ok(Value::Real(-r)),
298            _ => Err(SqlError::TypeMismatch {
299                expected: "numeric".into(),
300                got: format!("{}", val.data_type()),
301            }),
302        },
303        UnaryOp::Not => match val {
304            Value::Boolean(b) => Ok(Value::Boolean(!b)),
305            Value::Integer(i) => Ok(Value::Boolean(*i == 0)),
306            _ => Err(SqlError::TypeMismatch {
307                expected: "BOOLEAN".into(),
308                got: format!("{}", val.data_type()),
309            }),
310        },
311    }
312}
313
314fn value_to_text(val: &Value) -> String {
315    match val {
316        Value::Text(s) => s.clone(),
317        Value::Integer(i) => i.to_string(),
318        Value::Real(r) => {
319            if r.fract() == 0.0 && r.is_finite() {
320                format!("{r:.1}")
321            } else {
322                format!("{r}")
323            }
324        }
325        Value::Boolean(b) => if *b { "TRUE" } else { "FALSE" }.into(),
326        Value::Null => String::new(),
327        Value::Blob(b) => {
328            let mut s = String::with_capacity(b.len() * 2);
329            for byte in b {
330                s.push_str(&format!("{byte:02X}"));
331            }
332            s
333        }
334    }
335}
336
337fn eval_between(val: &Value, low: &Value, high: &Value, negated: bool) -> Result<Value> {
338    if val.is_null() || low.is_null() || high.is_null() {
339        let ge = if val.is_null() || low.is_null() { None } else { Some(*val >= *low) };
340        let le = if val.is_null() || high.is_null() { None } else { Some(*val <= *high) };
341
342        let result = match (ge, le) {
343            (Some(false), _) | (_, Some(false)) => Some(false),
344            (Some(true), Some(true)) => Some(true),
345            _ => None,
346        };
347
348        return match result {
349            Some(b) => Ok(Value::Boolean(if negated { !b } else { b })),
350            None => Ok(Value::Null),
351        };
352    }
353
354    let in_range = *val >= *low && *val <= *high;
355    Ok(Value::Boolean(if negated { !in_range } else { in_range }))
356}
357
358const MAX_LIKE_PATTERN_LEN: usize = 10_000;
359
360fn eval_like(val: &Value, pattern: &Value, escape: Option<&Value>, negated: bool) -> Result<Value> {
361    if val.is_null() || pattern.is_null() {
362        return Ok(Value::Null);
363    }
364    let text = match val {
365        Value::Text(s) => s.as_str(),
366        _ => return Err(SqlError::TypeMismatch {
367            expected: "TEXT".into(),
368            got: val.data_type().to_string(),
369        }),
370    };
371    let pat = match pattern {
372        Value::Text(s) => s.as_str(),
373        _ => return Err(SqlError::TypeMismatch {
374            expected: "TEXT".into(),
375            got: pattern.data_type().to_string(),
376        }),
377    };
378
379    if pat.len() > MAX_LIKE_PATTERN_LEN {
380        return Err(SqlError::InvalidValue(format!(
381            "LIKE pattern too long ({} chars, max {MAX_LIKE_PATTERN_LEN})", pat.len()
382        )));
383    }
384
385    let esc_char = match escape {
386        Some(Value::Text(s)) => {
387            let mut chars = s.chars();
388            let c = chars.next().ok_or_else(|| {
389                SqlError::InvalidValue("ESCAPE must be a single character".into())
390            })?;
391            if chars.next().is_some() {
392                return Err(SqlError::InvalidValue("ESCAPE must be a single character".into()));
393            }
394            Some(c)
395        }
396        Some(Value::Null) => return Ok(Value::Null),
397        Some(_) => return Err(SqlError::TypeMismatch {
398            expected: "TEXT".into(),
399            got: "non-text".into(),
400        }),
401        None => None,
402    };
403
404    let matched = like_match(text, pat, esc_char);
405    Ok(Value::Boolean(if negated { !matched } else { matched }))
406}
407
408fn like_match(text: &str, pattern: &str, escape: Option<char>) -> bool {
409    let t: Vec<char> = text.chars().collect();
410    let p: Vec<char> = pattern.chars().collect();
411    like_match_impl(&t, &p, 0, 0, escape)
412}
413
414fn like_match_impl(t: &[char], p: &[char], mut ti: usize, mut pi: usize, esc: Option<char>) -> bool {
415    let mut star_pi: Option<usize> = None;
416    let mut star_ti: usize = 0;
417
418    while ti < t.len() {
419        if pi < p.len() {
420            if let Some(ec) = esc {
421                if p[pi] == ec && pi + 1 < p.len() {
422                    pi += 1;
423                    let pc_lower = p[pi].to_ascii_lowercase();
424                    let tc_lower = t[ti].to_ascii_lowercase();
425                    if pc_lower == tc_lower {
426                        pi += 1;
427                        ti += 1;
428                        continue;
429                    } else if let Some(sp) = star_pi {
430                        pi = sp + 1;
431                        star_ti += 1;
432                        ti = star_ti;
433                        continue;
434                    } else {
435                        return false;
436                    }
437                }
438            }
439            if p[pi] == '%' {
440                star_pi = Some(pi);
441                star_ti = ti;
442                pi += 1;
443                continue;
444            }
445            if p[pi] == '_' {
446                pi += 1;
447                ti += 1;
448                continue;
449            }
450            if p[pi].to_ascii_lowercase() == t[ti].to_ascii_lowercase() {
451                pi += 1;
452                ti += 1;
453                continue;
454            }
455        }
456        if let Some(sp) = star_pi {
457            pi = sp + 1;
458            star_ti += 1;
459            ti = star_ti;
460        } else {
461            return false;
462        }
463    }
464
465    while pi < p.len() && p[pi] == '%' {
466        pi += 1;
467    }
468    pi == p.len()
469}
470
471fn eval_case(
472    operand: Option<&Expr>,
473    conditions: &[(Expr, Expr)],
474    else_result: Option<&Expr>,
475    columns: &[ColumnDef],
476    row: &[Value],
477) -> Result<Value> {
478    if let Some(op_expr) = operand {
479        let op_val = eval_expr(op_expr, columns, row)?;
480        for (cond, result) in conditions {
481            let cond_val = eval_expr(cond, columns, row)?;
482            if !op_val.is_null() && !cond_val.is_null() && op_val == cond_val {
483                return eval_expr(result, columns, row);
484            }
485        }
486    } else {
487        for (cond, result) in conditions {
488            let cond_val = eval_expr(cond, columns, row)?;
489            if is_truthy(&cond_val) {
490                return eval_expr(result, columns, row);
491            }
492        }
493    }
494    match else_result {
495        Some(e) => eval_expr(e, columns, row),
496        None => Ok(Value::Null),
497    }
498}
499
500fn eval_cast(val: &Value, target: DataType) -> Result<Value> {
501    if val.is_null() {
502        return Ok(Value::Null);
503    }
504    match target {
505        DataType::Integer => match val {
506            Value::Integer(_) => Ok(val.clone()),
507            Value::Real(r) => Ok(Value::Integer(*r as i64)),
508            Value::Boolean(b) => Ok(Value::Integer(if *b { 1 } else { 0 })),
509            Value::Text(s) => s.trim().parse::<i64>()
510                .map(Value::Integer)
511                .or_else(|_| s.trim().parse::<f64>().map(|f| Value::Integer(f as i64)))
512                .map_err(|_| SqlError::InvalidValue(format!("cannot cast '{s}' to INTEGER"))),
513            _ => Err(SqlError::InvalidValue(format!("cannot cast {} to INTEGER", val.data_type()))),
514        },
515        DataType::Real => match val {
516            Value::Real(_) => Ok(val.clone()),
517            Value::Integer(i) => Ok(Value::Real(*i as f64)),
518            Value::Boolean(b) => Ok(Value::Real(if *b { 1.0 } else { 0.0 })),
519            Value::Text(s) => s.trim().parse::<f64>()
520                .map(Value::Real)
521                .map_err(|_| SqlError::InvalidValue(format!("cannot cast '{s}' to REAL"))),
522            _ => Err(SqlError::InvalidValue(format!("cannot cast {} to REAL", val.data_type()))),
523        },
524        DataType::Text => Ok(Value::Text(value_to_text(val))),
525        DataType::Boolean => match val {
526            Value::Boolean(_) => Ok(val.clone()),
527            Value::Integer(i) => Ok(Value::Boolean(*i != 0)),
528            Value::Text(s) => {
529                let lower = s.trim().to_ascii_lowercase();
530                match lower.as_str() {
531                    "true" | "1" | "yes" | "on" => Ok(Value::Boolean(true)),
532                    "false" | "0" | "no" | "off" => Ok(Value::Boolean(false)),
533                    _ => Err(SqlError::InvalidValue(format!("cannot cast '{s}' to BOOLEAN"))),
534                }
535            }
536            _ => Err(SqlError::InvalidValue(format!("cannot cast {} to BOOLEAN", val.data_type()))),
537        },
538        DataType::Blob => match val {
539            Value::Blob(_) => Ok(val.clone()),
540            Value::Text(s) => Ok(Value::Blob(s.as_bytes().to_vec())),
541            _ => Err(SqlError::InvalidValue(format!("cannot cast {} to BLOB", val.data_type()))),
542        },
543        DataType::Null => Ok(Value::Null),
544    }
545}
546
547fn eval_scalar_function(
548    name: &str,
549    args: &[Expr],
550    columns: &[ColumnDef],
551    row: &[Value],
552) -> Result<Value> {
553    let evaluated: Vec<Value> = args.iter()
554        .map(|a| eval_expr(a, columns, row))
555        .collect::<Result<Vec<_>>>()?;
556
557    match name {
558        "LENGTH" => {
559            check_args(name, &evaluated, 1)?;
560            match &evaluated[0] {
561                Value::Null => Ok(Value::Null),
562                Value::Text(s) => Ok(Value::Integer(s.chars().count() as i64)),
563                Value::Blob(b) => Ok(Value::Integer(b.len() as i64)),
564                _ => Ok(Value::Integer(value_to_text(&evaluated[0]).chars().count() as i64)),
565            }
566        }
567        "UPPER" => {
568            check_args(name, &evaluated, 1)?;
569            match &evaluated[0] {
570                Value::Null => Ok(Value::Null),
571                Value::Text(s) => Ok(Value::Text(s.to_ascii_uppercase())),
572                _ => Ok(Value::Text(value_to_text(&evaluated[0]).to_ascii_uppercase())),
573            }
574        }
575        "LOWER" => {
576            check_args(name, &evaluated, 1)?;
577            match &evaluated[0] {
578                Value::Null => Ok(Value::Null),
579                Value::Text(s) => Ok(Value::Text(s.to_ascii_lowercase())),
580                _ => Ok(Value::Text(value_to_text(&evaluated[0]).to_ascii_lowercase())),
581            }
582        }
583        "SUBSTR" | "SUBSTRING" => {
584            if evaluated.len() < 2 || evaluated.len() > 3 {
585                return Err(SqlError::InvalidValue(format!("{name} requires 2 or 3 arguments")));
586            }
587            if evaluated.iter().any(|v| v.is_null()) {
588                return Ok(Value::Null);
589            }
590            let s = value_to_text(&evaluated[0]);
591            let chars: Vec<char> = s.chars().collect();
592            let start = match &evaluated[1] {
593                Value::Integer(i) => *i,
594                _ => return Err(SqlError::TypeMismatch {
595                    expected: "INTEGER".into(),
596                    got: evaluated[1].data_type().to_string(),
597                }),
598            };
599            let len = chars.len() as i64;
600
601            let (begin, count) = if evaluated.len() == 3 {
602                let cnt = match &evaluated[2] {
603                    Value::Integer(i) => *i,
604                    _ => return Err(SqlError::TypeMismatch {
605                        expected: "INTEGER".into(),
606                        got: evaluated[2].data_type().to_string(),
607                    }),
608                };
609                if start >= 1 {
610                    let b = (start - 1).min(len) as usize;
611                    let c = cnt.max(0) as usize;
612                    (b, c)
613                } else if start == 0 {
614                    let c = (cnt - 1).max(0) as usize;
615                    (0usize, c)
616                } else {
617                    let adjusted_cnt = (cnt + start - 1).max(0) as usize;
618                    (0usize, adjusted_cnt)
619                }
620            } else if start >= 1 {
621                let b = (start - 1).min(len) as usize;
622                (b, chars.len() - b)
623            } else if start == 0 {
624                (0usize, chars.len())
625            } else {
626                let b = (len + start).max(0) as usize;
627                (b, chars.len() - b)
628            };
629
630            let result: String = chars.iter().skip(begin).take(count).collect();
631            Ok(Value::Text(result))
632        }
633        "TRIM" | "LTRIM" | "RTRIM" => {
634            if evaluated.is_empty() || evaluated.len() > 2 {
635                return Err(SqlError::InvalidValue(format!("{name} requires 1 or 2 arguments")));
636            }
637            if evaluated[0].is_null() {
638                return Ok(Value::Null);
639            }
640            let s = value_to_text(&evaluated[0]);
641            let trim_chars: Vec<char> = if evaluated.len() == 2 {
642                if evaluated[1].is_null() { return Ok(Value::Null); }
643                value_to_text(&evaluated[1]).chars().collect()
644            } else {
645                vec![' ']
646            };
647            let result = match name {
648                "TRIM" => s.trim_matches(|c: char| trim_chars.contains(&c)).to_string(),
649                "LTRIM" => s.trim_start_matches(|c: char| trim_chars.contains(&c)).to_string(),
650                "RTRIM" => s.trim_end_matches(|c: char| trim_chars.contains(&c)).to_string(),
651                _ => unreachable!(),
652            };
653            Ok(Value::Text(result))
654        }
655        "REPLACE" => {
656            check_args(name, &evaluated, 3)?;
657            if evaluated.iter().any(|v| v.is_null()) {
658                return Ok(Value::Null);
659            }
660            let s = value_to_text(&evaluated[0]);
661            let from = value_to_text(&evaluated[1]);
662            let to = value_to_text(&evaluated[2]);
663            if from.is_empty() {
664                return Ok(Value::Text(s));
665            }
666            Ok(Value::Text(s.replace(&from, &to)))
667        }
668        "INSTR" => {
669            check_args(name, &evaluated, 2)?;
670            if evaluated.iter().any(|v| v.is_null()) {
671                return Ok(Value::Null);
672            }
673            let haystack = value_to_text(&evaluated[0]);
674            let needle = value_to_text(&evaluated[1]);
675            let pos = haystack.find(&needle).map(|i| {
676                haystack[..i].chars().count() as i64 + 1
677            }).unwrap_or(0);
678            Ok(Value::Integer(pos))
679        }
680        "CONCAT" => {
681            if evaluated.is_empty() {
682                return Ok(Value::Text(String::new()));
683            }
684            let mut result = String::new();
685            for v in &evaluated {
686                match v {
687                    Value::Null => {}
688                    _ => result.push_str(&value_to_text(v)),
689                }
690            }
691            Ok(Value::Text(result))
692        }
693        "ABS" => {
694            check_args(name, &evaluated, 1)?;
695            match &evaluated[0] {
696                Value::Null => Ok(Value::Null),
697                Value::Integer(i) => i.checked_abs()
698                    .map(Value::Integer)
699                    .ok_or(SqlError::IntegerOverflow),
700                Value::Real(r) => Ok(Value::Real(r.abs())),
701                _ => Err(SqlError::TypeMismatch {
702                    expected: "numeric".into(),
703                    got: evaluated[0].data_type().to_string(),
704                }),
705            }
706        }
707        "ROUND" => {
708            if evaluated.is_empty() || evaluated.len() > 2 {
709                return Err(SqlError::InvalidValue("ROUND requires 1 or 2 arguments".into()));
710            }
711            if evaluated[0].is_null() {
712                return Ok(Value::Null);
713            }
714            let val = match &evaluated[0] {
715                Value::Integer(i) => *i as f64,
716                Value::Real(r) => *r,
717                _ => return Err(SqlError::TypeMismatch {
718                    expected: "numeric".into(),
719                    got: evaluated[0].data_type().to_string(),
720                }),
721            };
722            let places = if evaluated.len() == 2 {
723                match &evaluated[1] {
724                    Value::Null => return Ok(Value::Null),
725                    Value::Integer(i) => *i,
726                    _ => return Err(SqlError::TypeMismatch {
727                        expected: "INTEGER".into(),
728                        got: evaluated[1].data_type().to_string(),
729                    }),
730                }
731            } else {
732                0
733            };
734            let factor = 10f64.powi(places as i32);
735            let rounded = (val * factor).round() / factor;
736            Ok(Value::Real(rounded))
737        }
738        "CEIL" | "CEILING" => {
739            check_args(name, &evaluated, 1)?;
740            match &evaluated[0] {
741                Value::Null => Ok(Value::Null),
742                Value::Integer(i) => Ok(Value::Integer(*i)),
743                Value::Real(r) => Ok(Value::Integer(r.ceil() as i64)),
744                _ => Err(SqlError::TypeMismatch {
745                    expected: "numeric".into(),
746                    got: evaluated[0].data_type().to_string(),
747                }),
748            }
749        }
750        "FLOOR" => {
751            check_args(name, &evaluated, 1)?;
752            match &evaluated[0] {
753                Value::Null => Ok(Value::Null),
754                Value::Integer(i) => Ok(Value::Integer(*i)),
755                Value::Real(r) => Ok(Value::Integer(r.floor() as i64)),
756                _ => Err(SqlError::TypeMismatch {
757                    expected: "numeric".into(),
758                    got: evaluated[0].data_type().to_string(),
759                }),
760            }
761        }
762        "SIGN" => {
763            check_args(name, &evaluated, 1)?;
764            match &evaluated[0] {
765                Value::Null => Ok(Value::Null),
766                Value::Integer(i) => Ok(Value::Integer(i.signum())),
767                Value::Real(r) => {
768                    if *r > 0.0 { Ok(Value::Integer(1)) }
769                    else if *r < 0.0 { Ok(Value::Integer(-1)) }
770                    else { Ok(Value::Integer(0)) }
771                }
772                _ => Err(SqlError::TypeMismatch {
773                    expected: "numeric".into(),
774                    got: evaluated[0].data_type().to_string(),
775                }),
776            }
777        }
778        "SQRT" => {
779            check_args(name, &evaluated, 1)?;
780            match &evaluated[0] {
781                Value::Null => Ok(Value::Null),
782                Value::Integer(i) => {
783                    if *i < 0 { Ok(Value::Null) }
784                    else { Ok(Value::Real((*i as f64).sqrt())) }
785                }
786                Value::Real(r) => {
787                    if *r < 0.0 { Ok(Value::Null) }
788                    else { Ok(Value::Real(r.sqrt())) }
789                }
790                _ => Err(SqlError::TypeMismatch {
791                    expected: "numeric".into(),
792                    got: evaluated[0].data_type().to_string(),
793                }),
794            }
795        }
796        "RANDOM" => {
797            check_args(name, &evaluated, 0)?;
798            use std::collections::hash_map::DefaultHasher;
799            use std::hash::{Hash, Hasher};
800            use std::time::SystemTime;
801            let mut hasher = DefaultHasher::new();
802            SystemTime::now().hash(&mut hasher);
803            std::thread::current().id().hash(&mut hasher);
804            let mut val = hasher.finish() as i64;
805            if val == i64::MIN { val = i64::MAX; }
806            Ok(Value::Integer(val))
807        }
808        "TYPEOF" => {
809            check_args(name, &evaluated, 1)?;
810            let type_name = match &evaluated[0] {
811                Value::Null => "null",
812                Value::Integer(_) => "integer",
813                Value::Real(_) => "real",
814                Value::Text(_) => "text",
815                Value::Blob(_) => "blob",
816                Value::Boolean(_) => "boolean",
817            };
818            Ok(Value::Text(type_name.into()))
819        }
820        "MIN" => {
821            check_args(name, &evaluated, 2)?;
822            if evaluated[0].is_null() { return Ok(evaluated[1].clone()); }
823            if evaluated[1].is_null() { return Ok(evaluated[0].clone()); }
824            if evaluated[0] <= evaluated[1] {
825                Ok(evaluated[0].clone())
826            } else {
827                Ok(evaluated[1].clone())
828            }
829        }
830        "MAX" => {
831            check_args(name, &evaluated, 2)?;
832            if evaluated[0].is_null() { return Ok(evaluated[1].clone()); }
833            if evaluated[1].is_null() { return Ok(evaluated[0].clone()); }
834            if evaluated[0] >= evaluated[1] {
835                Ok(evaluated[0].clone())
836            } else {
837                Ok(evaluated[1].clone())
838            }
839        }
840        "HEX" => {
841            check_args(name, &evaluated, 1)?;
842            match &evaluated[0] {
843                Value::Null => Ok(Value::Null),
844                Value::Blob(b) => {
845                    let mut s = String::with_capacity(b.len() * 2);
846                    for byte in b { s.push_str(&format!("{byte:02X}")); }
847                    Ok(Value::Text(s))
848                }
849                Value::Text(s) => {
850                    let mut r = String::with_capacity(s.len() * 2);
851                    for byte in s.as_bytes() { r.push_str(&format!("{byte:02X}")); }
852                    Ok(Value::Text(r))
853                }
854                _ => Ok(Value::Text(value_to_text(&evaluated[0]))),
855            }
856        }
857        _ => {
858            Err(SqlError::Unsupported(format!("scalar function: {name}")))
859        }
860    }
861}
862
863fn check_args(name: &str, args: &[Value], expected: usize) -> Result<()> {
864    if args.len() != expected {
865        Err(SqlError::InvalidValue(format!(
866            "{name} requires {expected} argument(s), got {}", args.len()
867        )))
868    } else {
869        Ok(())
870    }
871}
872
873/// Check if an expression result is truthy (for WHERE/HAVING).
874pub fn is_truthy(val: &Value) -> bool {
875    match val {
876        Value::Boolean(b) => *b,
877        Value::Integer(i) => *i != 0,
878        Value::Null => false,
879        _ => true,
880    }
881}
882
883#[cfg(test)]
884mod tests {
885    use super::*;
886    use crate::types::DataType;
887
888    fn test_columns() -> Vec<ColumnDef> {
889        vec![
890            ColumnDef { name: "id".into(), data_type: DataType::Integer, nullable: false, position: 0 },
891            ColumnDef { name: "name".into(), data_type: DataType::Text, nullable: true, position: 1 },
892            ColumnDef { name: "score".into(), data_type: DataType::Real, nullable: true, position: 2 },
893            ColumnDef { name: "active".into(), data_type: DataType::Boolean, nullable: false, position: 3 },
894        ]
895    }
896
897    fn test_row() -> Vec<Value> {
898        vec![
899            Value::Integer(1),
900            Value::Text("Alice".into()),
901            Value::Real(95.5),
902            Value::Boolean(true),
903        ]
904    }
905
906    #[test]
907    fn eval_literal() {
908        let cols = test_columns();
909        let row = test_row();
910        let expr = Expr::Literal(Value::Integer(42));
911        assert_eq!(eval_expr(&expr, &cols, &row).unwrap(), Value::Integer(42));
912    }
913
914    #[test]
915    fn eval_column_ref() {
916        let cols = test_columns();
917        let row = test_row();
918        let expr = Expr::Column("name".into());
919        assert_eq!(eval_expr(&expr, &cols, &row).unwrap(), Value::Text("Alice".into()));
920    }
921
922    #[test]
923    fn eval_column_case_insensitive() {
924        let cols = test_columns();
925        let row = test_row();
926        let expr = Expr::Column("NAME".into());
927        assert_eq!(eval_expr(&expr, &cols, &row).unwrap(), Value::Text("Alice".into()));
928    }
929
930    #[test]
931    fn eval_arithmetic_int() {
932        let cols = test_columns();
933        let row = test_row();
934        let expr = Expr::BinaryOp {
935            left: Box::new(Expr::Column("id".into())),
936            op: BinOp::Add,
937            right: Box::new(Expr::Literal(Value::Integer(10))),
938        };
939        assert_eq!(eval_expr(&expr, &cols, &row).unwrap(), Value::Integer(11));
940    }
941
942    #[test]
943    fn eval_comparison() {
944        let cols = test_columns();
945        let row = test_row();
946        let expr = Expr::BinaryOp {
947            left: Box::new(Expr::Column("score".into())),
948            op: BinOp::Gt,
949            right: Box::new(Expr::Literal(Value::Real(90.0))),
950        };
951        assert_eq!(eval_expr(&expr, &cols, &row).unwrap(), Value::Boolean(true));
952    }
953
954    #[test]
955    fn eval_null_propagation() {
956        let cols = test_columns();
957        let row = vec![Value::Integer(1), Value::Null, Value::Null, Value::Boolean(true)];
958        let expr = Expr::BinaryOp {
959            left: Box::new(Expr::Column("name".into())),
960            op: BinOp::Eq,
961            right: Box::new(Expr::Literal(Value::Text("test".into()))),
962        };
963        assert!(eval_expr(&expr, &cols, &row).unwrap().is_null());
964    }
965
966    #[test]
967    fn eval_and_three_valued() {
968        let cols = test_columns();
969        let row = vec![Value::Integer(1), Value::Null, Value::Null, Value::Boolean(true)];
970
971        // NULL AND false = false
972        let expr = Expr::BinaryOp {
973            left: Box::new(Expr::Column("name".into())),
974            op: BinOp::And,
975            right: Box::new(Expr::Literal(Value::Boolean(false))),
976        };
977        assert_eq!(eval_expr(&expr, &cols, &row).unwrap(), Value::Boolean(false));
978
979        // NULL AND true = NULL
980        let expr = Expr::BinaryOp {
981            left: Box::new(Expr::Column("name".into())),
982            op: BinOp::And,
983            right: Box::new(Expr::Literal(Value::Boolean(true))),
984        };
985        assert!(eval_expr(&expr, &cols, &row).unwrap().is_null());
986    }
987
988    #[test]
989    fn eval_or_three_valued() {
990        let cols = test_columns();
991        let row = vec![Value::Integer(1), Value::Null, Value::Null, Value::Boolean(true)];
992
993        // NULL OR true = true
994        let expr = Expr::BinaryOp {
995            left: Box::new(Expr::Column("name".into())),
996            op: BinOp::Or,
997            right: Box::new(Expr::Literal(Value::Boolean(true))),
998        };
999        assert_eq!(eval_expr(&expr, &cols, &row).unwrap(), Value::Boolean(true));
1000
1001        // NULL OR false = NULL
1002        let expr = Expr::BinaryOp {
1003            left: Box::new(Expr::Column("name".into())),
1004            op: BinOp::Or,
1005            right: Box::new(Expr::Literal(Value::Boolean(false))),
1006        };
1007        assert!(eval_expr(&expr, &cols, &row).unwrap().is_null());
1008    }
1009
1010    #[test]
1011    fn eval_is_null() {
1012        let cols = test_columns();
1013        let row = vec![Value::Integer(1), Value::Null, Value::Null, Value::Boolean(true)];
1014        let expr = Expr::IsNull(Box::new(Expr::Column("name".into())));
1015        assert_eq!(eval_expr(&expr, &cols, &row).unwrap(), Value::Boolean(true));
1016
1017        let expr = Expr::IsNotNull(Box::new(Expr::Column("id".into())));
1018        assert_eq!(eval_expr(&expr, &cols, &row).unwrap(), Value::Boolean(true));
1019    }
1020
1021    #[test]
1022    fn eval_not() {
1023        let cols = test_columns();
1024        let row = test_row();
1025        let expr = Expr::UnaryOp {
1026            op: UnaryOp::Not,
1027            expr: Box::new(Expr::Column("active".into())),
1028        };
1029        assert_eq!(eval_expr(&expr, &cols, &row).unwrap(), Value::Boolean(false));
1030    }
1031
1032    #[test]
1033    fn eval_neg() {
1034        let cols = test_columns();
1035        let row = test_row();
1036        let expr = Expr::UnaryOp {
1037            op: UnaryOp::Neg,
1038            expr: Box::new(Expr::Column("id".into())),
1039        };
1040        assert_eq!(eval_expr(&expr, &cols, &row).unwrap(), Value::Integer(-1));
1041    }
1042
1043    #[test]
1044    fn eval_division_by_zero() {
1045        let cols = test_columns();
1046        let row = test_row();
1047        let expr = Expr::BinaryOp {
1048            left: Box::new(Expr::Column("id".into())),
1049            op: BinOp::Div,
1050            right: Box::new(Expr::Literal(Value::Integer(0))),
1051        };
1052        assert!(matches!(eval_expr(&expr, &cols, &row), Err(SqlError::DivisionByZero)));
1053    }
1054
1055    #[test]
1056    fn eval_mixed_numeric() {
1057        let cols = test_columns();
1058        let row = test_row();
1059        // id (int 1) + score (real 95.5) = real 96.5
1060        let expr = Expr::BinaryOp {
1061            left: Box::new(Expr::Column("id".into())),
1062            op: BinOp::Add,
1063            right: Box::new(Expr::Column("score".into())),
1064        };
1065        assert_eq!(eval_expr(&expr, &cols, &row).unwrap(), Value::Real(96.5));
1066    }
1067
1068    #[test]
1069    fn is_truthy_values() {
1070        assert!(is_truthy(&Value::Boolean(true)));
1071        assert!(!is_truthy(&Value::Boolean(false)));
1072        assert!(!is_truthy(&Value::Null));
1073        assert!(is_truthy(&Value::Integer(1)));
1074        assert!(!is_truthy(&Value::Integer(0)));
1075    }
1076}