Skip to main content

sqlrite/sql/
executor.rs

1//! Query executors — evaluate parsed SQL statements against the in-memory
2//! storage and produce formatted output.
3
4use std::cmp::Ordering;
5
6use prettytable::{Cell as PrintCell, Row as PrintRow, Table as PrintTable};
7use sqlparser::ast::{
8    AssignmentTarget, BinaryOperator, CreateIndex, Delete, Expr, FromTable, Statement, TableFactor,
9    TableWithJoins, UnaryOperator, Update,
10};
11
12use crate::error::{Result, SQLRiteError};
13use crate::sql::db::database::Database;
14use crate::sql::db::secondary_index::{IndexOrigin, SecondaryIndex};
15use crate::sql::db::table::{DataType, Table, Value};
16use crate::sql::parser::select::{OrderByClause, Projection, SelectQuery};
17
18/// Executes a parsed `SelectQuery` against the database and returns a
19/// human-readable rendering of the result set (prettytable). Also returns
20/// the number of rows produced, for the top-level status message.
21/// Structured result of a SELECT: column names in projection order,
22/// and each matching row as a `Vec<Value>` aligned with the columns.
23/// Phase 5a introduced this so the public `Connection` / `Statement`
24/// API has typed rows to yield; the existing `execute_select` that
25/// returns pre-rendered text is now a thin wrapper on top.
26pub struct SelectResult {
27    pub columns: Vec<String>,
28    pub rows: Vec<Vec<Value>>,
29}
30
31/// Executes a SELECT and returns structured rows. The typed rows are
32/// what the new public API streams to callers; the REPL / Tauri app
33/// pre-render into a prettytable via `execute_select`.
34pub fn execute_select_rows(query: SelectQuery, db: &Database) -> Result<SelectResult> {
35    let table = db
36        .get_table(query.table_name.clone())
37        .map_err(|_| SQLRiteError::Internal(format!("Table '{}' not found", query.table_name)))?;
38
39    // Resolve projection to a concrete ordered column list.
40    let projected_cols: Vec<String> = match &query.projection {
41        Projection::All => table.column_names(),
42        Projection::Columns(cols) => {
43            for c in cols {
44                if !table.contains_column(c.to_string()) {
45                    return Err(SQLRiteError::Internal(format!(
46                        "Column '{c}' does not exist on table '{}'",
47                        query.table_name
48                    )));
49                }
50            }
51            cols.clone()
52        }
53    };
54
55    // Collect matching rowids. If the WHERE is the shape `col = literal`
56    // and `col` has a secondary index, probe the index for an O(log N)
57    // seek; otherwise fall back to the full table scan.
58    let matching = match select_rowids(table, query.selection.as_ref())? {
59        RowidSource::IndexProbe(rowids) => rowids,
60        RowidSource::FullScan => {
61            let mut out = Vec::new();
62            for rowid in table.rowids() {
63                if let Some(expr) = &query.selection {
64                    if !eval_predicate(expr, table, rowid)? {
65                        continue;
66                    }
67                }
68                out.push(rowid);
69            }
70            out
71        }
72    };
73    let mut matching = matching;
74
75    // Sort before applying LIMIT, matching SQL semantics.
76    if let Some(order) = &query.order_by {
77        sort_rowids(&mut matching, table, order)?;
78    }
79
80    if let Some(n) = query.limit {
81        matching.truncate(n);
82    }
83
84    // Build typed rows. Missing cells surface as `Value::Null` — that
85    // maps a column-not-present-for-this-rowid case onto the public
86    // `Row::get` → `Option<T>` surface cleanly.
87    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(matching.len());
88    for rowid in &matching {
89        let row: Vec<Value> = projected_cols
90            .iter()
91            .map(|col| table.get_value(col, *rowid).unwrap_or(Value::Null))
92            .collect();
93        rows.push(row);
94    }
95
96    Ok(SelectResult {
97        columns: projected_cols,
98        rows,
99    })
100}
101
102/// Executes a SELECT and returns `(rendered_table, row_count)`. The
103/// REPL and Tauri app use this to keep the table-printing behaviour
104/// the engine has always shipped. Structured callers use
105/// `execute_select_rows` instead.
106pub fn execute_select(query: SelectQuery, db: &Database) -> Result<(String, usize)> {
107    let result = execute_select_rows(query, db)?;
108    let row_count = result.rows.len();
109
110    let mut print_table = PrintTable::new();
111    let header_cells: Vec<PrintCell> = result.columns.iter().map(|c| PrintCell::new(c)).collect();
112    print_table.add_row(PrintRow::new(header_cells));
113
114    for row in &result.rows {
115        let cells: Vec<PrintCell> = row
116            .iter()
117            .map(|v| PrintCell::new(&v.to_display_string()))
118            .collect();
119        print_table.add_row(PrintRow::new(cells));
120    }
121
122    Ok((print_table.to_string(), row_count))
123}
124
125/// Executes a DELETE statement. Returns the number of rows removed.
126pub fn execute_delete(stmt: &Statement, db: &mut Database) -> Result<usize> {
127    let Statement::Delete(Delete {
128        from, selection, ..
129    }) = stmt
130    else {
131        return Err(SQLRiteError::Internal(
132            "execute_delete called on a non-DELETE statement".to_string(),
133        ));
134    };
135
136    let tables = match from {
137        FromTable::WithFromKeyword(t) | FromTable::WithoutKeyword(t) => t,
138    };
139    let table_name = extract_single_table_name(tables)?;
140
141    // Compute matching rowids with an immutable borrow, then mutate.
142    let matching: Vec<i64> = {
143        let table = db
144            .get_table(table_name.clone())
145            .map_err(|_| SQLRiteError::Internal(format!("Table '{table_name}' not found")))?;
146        match select_rowids(table, selection.as_ref())? {
147            RowidSource::IndexProbe(rowids) => rowids,
148            RowidSource::FullScan => {
149                let mut out = Vec::new();
150                for rowid in table.rowids() {
151                    if let Some(expr) = selection {
152                        if !eval_predicate(expr, table, rowid)? {
153                            continue;
154                        }
155                    }
156                    out.push(rowid);
157                }
158                out
159            }
160        }
161    };
162
163    let table = db.get_table_mut(table_name)?;
164    for rowid in &matching {
165        table.delete_row(*rowid);
166    }
167    Ok(matching.len())
168}
169
170/// Executes an UPDATE statement. Returns the number of rows updated.
171pub fn execute_update(stmt: &Statement, db: &mut Database) -> Result<usize> {
172    let Statement::Update(Update {
173        table,
174        assignments,
175        from,
176        selection,
177        ..
178    }) = stmt
179    else {
180        return Err(SQLRiteError::Internal(
181            "execute_update called on a non-UPDATE statement".to_string(),
182        ));
183    };
184
185    if from.is_some() {
186        return Err(SQLRiteError::NotImplemented(
187            "UPDATE ... FROM is not supported yet".to_string(),
188        ));
189    }
190
191    let table_name = extract_table_name(table)?;
192
193    // Resolve assignment targets to plain column names and verify they exist.
194    let mut parsed_assignments: Vec<(String, Expr)> = Vec::with_capacity(assignments.len());
195    {
196        let tbl = db
197            .get_table(table_name.clone())
198            .map_err(|_| SQLRiteError::Internal(format!("Table '{table_name}' not found")))?;
199        for a in assignments {
200            let col = match &a.target {
201                AssignmentTarget::ColumnName(name) => name
202                    .0
203                    .last()
204                    .map(|p| p.to_string())
205                    .ok_or_else(|| SQLRiteError::Internal("empty column name".to_string()))?,
206                AssignmentTarget::Tuple(_) => {
207                    return Err(SQLRiteError::NotImplemented(
208                        "tuple assignment targets are not supported".to_string(),
209                    ));
210                }
211            };
212            if !tbl.contains_column(col.clone()) {
213                return Err(SQLRiteError::Internal(format!(
214                    "UPDATE references unknown column '{col}'"
215                )));
216            }
217            parsed_assignments.push((col, a.value.clone()));
218        }
219    }
220
221    // Gather matching rowids + the new values to write for each assignment, under
222    // an immutable borrow. Uses the index-probe fast path when the WHERE is
223    // `col = literal` on an indexed column.
224    let work: Vec<(i64, Vec<(String, Value)>)> = {
225        let tbl = db.get_table(table_name.clone())?;
226        let matched_rowids: Vec<i64> = match select_rowids(tbl, selection.as_ref())? {
227            RowidSource::IndexProbe(rowids) => rowids,
228            RowidSource::FullScan => {
229                let mut out = Vec::new();
230                for rowid in tbl.rowids() {
231                    if let Some(expr) = selection {
232                        if !eval_predicate(expr, tbl, rowid)? {
233                            continue;
234                        }
235                    }
236                    out.push(rowid);
237                }
238                out
239            }
240        };
241        let mut rows_to_update = Vec::new();
242        for rowid in matched_rowids {
243            let mut values = Vec::with_capacity(parsed_assignments.len());
244            for (col, expr) in &parsed_assignments {
245                // UPDATE's RHS is evaluated in the context of the row being updated,
246                // so column references on the right resolve to the current row's values.
247                let v = eval_expr(expr, tbl, rowid)?;
248                values.push((col.clone(), v));
249            }
250            rows_to_update.push((rowid, values));
251        }
252        rows_to_update
253    };
254
255    let tbl = db.get_table_mut(table_name)?;
256    for (rowid, values) in &work {
257        for (col, v) in values {
258            tbl.set_value(col, *rowid, v.clone())?;
259        }
260    }
261    Ok(work.len())
262}
263
264/// Handles `CREATE INDEX [UNIQUE] <name> ON <table> (<column>)`. Single-
265/// column indexes only; multi-column / composite indexes are future work.
266/// Returns the (possibly synthesized) index name for the status message.
267pub fn execute_create_index(stmt: &Statement, db: &mut Database) -> Result<String> {
268    let Statement::CreateIndex(CreateIndex {
269        name,
270        table_name,
271        columns,
272        unique,
273        if_not_exists,
274        predicate,
275        ..
276    }) = stmt
277    else {
278        return Err(SQLRiteError::Internal(
279            "execute_create_index called on a non-CREATE-INDEX statement".to_string(),
280        ));
281    };
282
283    if predicate.is_some() {
284        return Err(SQLRiteError::NotImplemented(
285            "partial indexes (CREATE INDEX ... WHERE) are not supported yet".to_string(),
286        ));
287    }
288
289    if columns.len() != 1 {
290        return Err(SQLRiteError::NotImplemented(format!(
291            "multi-column indexes are not supported yet ({} columns given)",
292            columns.len()
293        )));
294    }
295
296    let index_name = name.as_ref().map(|n| n.to_string()).ok_or_else(|| {
297        SQLRiteError::NotImplemented(
298            "anonymous CREATE INDEX (no name) is not supported — give it a name".to_string(),
299        )
300    })?;
301
302    let table_name_str = table_name.to_string();
303    let column_name = match &columns[0].column.expr {
304        Expr::Identifier(ident) => ident.value.clone(),
305        Expr::CompoundIdentifier(parts) => parts
306            .last()
307            .map(|p| p.value.clone())
308            .ok_or_else(|| SQLRiteError::Internal("empty compound identifier".to_string()))?,
309        other => {
310            return Err(SQLRiteError::NotImplemented(format!(
311                "CREATE INDEX only supports simple column references, got {other:?}"
312            )));
313        }
314    };
315
316    // Validate: table exists, column exists, type is indexable, name is unique.
317    let (datatype, existing_rowids_and_values): (DataType, Vec<(i64, Value)>) = {
318        let table = db.get_table(table_name_str.clone()).map_err(|_| {
319            SQLRiteError::General(format!(
320                "CREATE INDEX references unknown table '{table_name_str}'"
321            ))
322        })?;
323        if !table.contains_column(column_name.clone()) {
324            return Err(SQLRiteError::General(format!(
325                "CREATE INDEX references unknown column '{column_name}' on table '{table_name_str}'"
326            )));
327        }
328        let col = table
329            .columns
330            .iter()
331            .find(|c| c.column_name == column_name)
332            .expect("we just verified the column exists");
333        if table.index_by_name(&index_name).is_some() {
334            if *if_not_exists {
335                return Ok(index_name);
336            }
337            return Err(SQLRiteError::General(format!(
338                "index '{index_name}' already exists"
339            )));
340        }
341        let datatype = clone_datatype(&col.datatype);
342
343        // Snapshot (rowid, value) pairs so we can populate the index after
344        // it's attached. Doing this under the immutable borrow of the table
345        // means the mutable attach below can proceed without aliasing.
346        let mut pairs = Vec::new();
347        for rowid in table.rowids() {
348            if let Some(v) = table.get_value(&column_name, rowid) {
349                pairs.push((rowid, v));
350            }
351        }
352        (datatype, pairs)
353    };
354
355    // Build the index.
356    let mut idx = SecondaryIndex::new(
357        index_name.clone(),
358        table_name_str.clone(),
359        column_name.clone(),
360        &datatype,
361        *unique,
362        IndexOrigin::Explicit,
363    )?;
364
365    // Populate from the existing rows. UNIQUE violations here mean the
366    // existing data already breaks the new index's constraint — a common
367    // source of user confusion, so be explicit.
368    for (rowid, v) in &existing_rowids_and_values {
369        if *unique && idx.would_violate_unique(v) {
370            return Err(SQLRiteError::General(format!(
371                "cannot create UNIQUE index '{index_name}': column '{column_name}' \
372                 already contains the duplicate value {}",
373                v.to_display_string()
374            )));
375        }
376        idx.insert(v, *rowid)?;
377    }
378
379    // Attach to the table.
380    let table_mut = db.get_table_mut(table_name_str)?;
381    table_mut.secondary_indexes.push(idx);
382    Ok(index_name)
383}
384
385/// Cheap clone helper — `DataType` intentionally doesn't derive `Clone`
386/// because the enum has no ergonomic reason to be cloneable elsewhere.
387fn clone_datatype(dt: &DataType) -> DataType {
388    match dt {
389        DataType::Integer => DataType::Integer,
390        DataType::Text => DataType::Text,
391        DataType::Real => DataType::Real,
392        DataType::Bool => DataType::Bool,
393        DataType::Vector(dim) => DataType::Vector(*dim),
394        DataType::None => DataType::None,
395        DataType::Invalid => DataType::Invalid,
396    }
397}
398
399fn extract_single_table_name(tables: &[TableWithJoins]) -> Result<String> {
400    if tables.len() != 1 {
401        return Err(SQLRiteError::NotImplemented(
402            "multi-table DELETE is not supported yet".to_string(),
403        ));
404    }
405    extract_table_name(&tables[0])
406}
407
408fn extract_table_name(twj: &TableWithJoins) -> Result<String> {
409    if !twj.joins.is_empty() {
410        return Err(SQLRiteError::NotImplemented(
411            "JOIN is not supported yet".to_string(),
412        ));
413    }
414    match &twj.relation {
415        TableFactor::Table { name, .. } => Ok(name.to_string()),
416        _ => Err(SQLRiteError::NotImplemented(
417            "only plain table references are supported".to_string(),
418        )),
419    }
420}
421
422/// Tells the executor how to produce its candidate rowid list.
423enum RowidSource {
424    /// The WHERE was simple enough to probe a secondary index directly.
425    /// The `Vec` already contains exactly the rows the index matched;
426    /// no further WHERE evaluation is needed (the probe is precise).
427    IndexProbe(Vec<i64>),
428    /// No applicable index; caller falls back to walking `table.rowids()`
429    /// and evaluating the WHERE on each row.
430    FullScan,
431}
432
433/// Try to satisfy `WHERE` with an index probe. Currently supports the
434/// simplest shape: a single `col = literal` (or `literal = col`) where
435/// `col` is on a secondary index. AND/OR/range predicates fall back to
436/// full scan — those can be layered on later without changing the caller.
437fn select_rowids(table: &Table, selection: Option<&Expr>) -> Result<RowidSource> {
438    let Some(expr) = selection else {
439        return Ok(RowidSource::FullScan);
440    };
441    let Some((col, literal)) = try_extract_equality(expr) else {
442        return Ok(RowidSource::FullScan);
443    };
444    let Some(idx) = table.index_for_column(&col) else {
445        return Ok(RowidSource::FullScan);
446    };
447
448    // Convert the literal into a runtime Value. If the literal type doesn't
449    // match the column's index we still need correct semantics — evaluate
450    // the WHERE against every row. Fall back to full scan.
451    let literal_value = match convert_literal(&literal) {
452        Ok(v) => v,
453        Err(_) => return Ok(RowidSource::FullScan),
454    };
455
456    // Index lookup returns the full list of rowids matching this equality
457    // predicate. For unique indexes that's at most one; for non-unique it
458    // can be many.
459    let mut rowids = idx.lookup(&literal_value);
460    rowids.sort_unstable();
461    Ok(RowidSource::IndexProbe(rowids))
462}
463
464/// Recognizes `expr` as a simple equality on a column reference against a
465/// literal. Returns `(column_name, literal_value)` if the shape matches;
466/// `None` otherwise. Accepts both `col = literal` and `literal = col`.
467fn try_extract_equality(expr: &Expr) -> Option<(String, sqlparser::ast::Value)> {
468    // Peel off Nested parens so `WHERE (x = 1)` is recognized too.
469    let peeled = match expr {
470        Expr::Nested(inner) => inner.as_ref(),
471        other => other,
472    };
473    let Expr::BinaryOp { left, op, right } = peeled else {
474        return None;
475    };
476    if !matches!(op, BinaryOperator::Eq) {
477        return None;
478    }
479    let col_from = |e: &Expr| -> Option<String> {
480        match e {
481            Expr::Identifier(ident) => Some(ident.value.clone()),
482            Expr::CompoundIdentifier(parts) => parts.last().map(|p| p.value.clone()),
483            _ => None,
484        }
485    };
486    let literal_from = |e: &Expr| -> Option<sqlparser::ast::Value> {
487        if let Expr::Value(v) = e {
488            Some(v.value.clone())
489        } else {
490            None
491        }
492    };
493    if let (Some(c), Some(l)) = (col_from(left), literal_from(right)) {
494        return Some((c, l));
495    }
496    if let (Some(l), Some(c)) = (literal_from(left), col_from(right)) {
497        return Some((c, l));
498    }
499    None
500}
501
502fn sort_rowids(rowids: &mut [i64], table: &Table, order: &OrderByClause) -> Result<()> {
503    if !table.contains_column(order.column.clone()) {
504        return Err(SQLRiteError::Internal(format!(
505            "ORDER BY references unknown column '{}'",
506            order.column
507        )));
508    }
509    rowids.sort_by(|a, b| {
510        let va = table.get_value(&order.column, *a);
511        let vb = table.get_value(&order.column, *b);
512        let ord = compare_values(va.as_ref(), vb.as_ref());
513        if order.ascending { ord } else { ord.reverse() }
514    });
515    Ok(())
516}
517
518fn compare_values(a: Option<&Value>, b: Option<&Value>) -> Ordering {
519    match (a, b) {
520        (None, None) => Ordering::Equal,
521        (None, _) => Ordering::Less,
522        (_, None) => Ordering::Greater,
523        (Some(a), Some(b)) => match (a, b) {
524            (Value::Null, Value::Null) => Ordering::Equal,
525            (Value::Null, _) => Ordering::Less,
526            (_, Value::Null) => Ordering::Greater,
527            (Value::Integer(x), Value::Integer(y)) => x.cmp(y),
528            (Value::Real(x), Value::Real(y)) => x.partial_cmp(y).unwrap_or(Ordering::Equal),
529            (Value::Integer(x), Value::Real(y)) => {
530                (*x as f64).partial_cmp(y).unwrap_or(Ordering::Equal)
531            }
532            (Value::Real(x), Value::Integer(y)) => {
533                x.partial_cmp(&(*y as f64)).unwrap_or(Ordering::Equal)
534            }
535            (Value::Text(x), Value::Text(y)) => x.cmp(y),
536            (Value::Bool(x), Value::Bool(y)) => x.cmp(y),
537            // Cross-type fallback: stringify and compare; keeps ORDER BY total.
538            (x, y) => x.to_display_string().cmp(&y.to_display_string()),
539        },
540    }
541}
542
543/// Returns `true` if the row at `rowid` matches the predicate expression.
544pub fn eval_predicate(expr: &Expr, table: &Table, rowid: i64) -> Result<bool> {
545    let v = eval_expr(expr, table, rowid)?;
546    match v {
547        Value::Bool(b) => Ok(b),
548        Value::Null => Ok(false), // SQL NULL in a WHERE is treated as false
549        Value::Integer(i) => Ok(i != 0),
550        other => Err(SQLRiteError::Internal(format!(
551            "WHERE clause must evaluate to boolean, got {}",
552            other.to_display_string()
553        ))),
554    }
555}
556
557fn eval_expr(expr: &Expr, table: &Table, rowid: i64) -> Result<Value> {
558    match expr {
559        Expr::Nested(inner) => eval_expr(inner, table, rowid),
560
561        Expr::Identifier(ident) => Ok(table.get_value(&ident.value, rowid).unwrap_or(Value::Null)),
562
563        Expr::CompoundIdentifier(parts) => {
564            // Accept `table.col` — we only have one table in scope, so ignore the qualifier.
565            let col = parts
566                .last()
567                .map(|i| i.value.as_str())
568                .ok_or_else(|| SQLRiteError::Internal("empty compound identifier".to_string()))?;
569            Ok(table.get_value(col, rowid).unwrap_or(Value::Null))
570        }
571
572        Expr::Value(v) => convert_literal(&v.value),
573
574        Expr::UnaryOp { op, expr } => {
575            let inner = eval_expr(expr, table, rowid)?;
576            match op {
577                UnaryOperator::Not => match inner {
578                    Value::Bool(b) => Ok(Value::Bool(!b)),
579                    Value::Null => Ok(Value::Null),
580                    other => Err(SQLRiteError::Internal(format!(
581                        "NOT applied to non-boolean value: {}",
582                        other.to_display_string()
583                    ))),
584                },
585                UnaryOperator::Minus => match inner {
586                    Value::Integer(i) => Ok(Value::Integer(-i)),
587                    Value::Real(f) => Ok(Value::Real(-f)),
588                    Value::Null => Ok(Value::Null),
589                    other => Err(SQLRiteError::Internal(format!(
590                        "unary minus on non-numeric value: {}",
591                        other.to_display_string()
592                    ))),
593                },
594                UnaryOperator::Plus => Ok(inner),
595                other => Err(SQLRiteError::NotImplemented(format!(
596                    "unary operator {other:?} is not supported"
597                ))),
598            }
599        }
600
601        Expr::BinaryOp { left, op, right } => match op {
602            BinaryOperator::And => {
603                let l = eval_expr(left, table, rowid)?;
604                let r = eval_expr(right, table, rowid)?;
605                Ok(Value::Bool(as_bool(&l)? && as_bool(&r)?))
606            }
607            BinaryOperator::Or => {
608                let l = eval_expr(left, table, rowid)?;
609                let r = eval_expr(right, table, rowid)?;
610                Ok(Value::Bool(as_bool(&l)? || as_bool(&r)?))
611            }
612            cmp @ (BinaryOperator::Eq
613            | BinaryOperator::NotEq
614            | BinaryOperator::Lt
615            | BinaryOperator::LtEq
616            | BinaryOperator::Gt
617            | BinaryOperator::GtEq) => {
618                let l = eval_expr(left, table, rowid)?;
619                let r = eval_expr(right, table, rowid)?;
620                // Any comparison involving NULL is unknown → false in a WHERE.
621                if matches!(l, Value::Null) || matches!(r, Value::Null) {
622                    return Ok(Value::Bool(false));
623                }
624                let ord = compare_values(Some(&l), Some(&r));
625                let result = match cmp {
626                    BinaryOperator::Eq => ord == Ordering::Equal,
627                    BinaryOperator::NotEq => ord != Ordering::Equal,
628                    BinaryOperator::Lt => ord == Ordering::Less,
629                    BinaryOperator::LtEq => ord != Ordering::Greater,
630                    BinaryOperator::Gt => ord == Ordering::Greater,
631                    BinaryOperator::GtEq => ord != Ordering::Less,
632                    _ => unreachable!(),
633                };
634                Ok(Value::Bool(result))
635            }
636            arith @ (BinaryOperator::Plus
637            | BinaryOperator::Minus
638            | BinaryOperator::Multiply
639            | BinaryOperator::Divide
640            | BinaryOperator::Modulo) => {
641                let l = eval_expr(left, table, rowid)?;
642                let r = eval_expr(right, table, rowid)?;
643                eval_arith(arith, &l, &r)
644            }
645            BinaryOperator::StringConcat => {
646                let l = eval_expr(left, table, rowid)?;
647                let r = eval_expr(right, table, rowid)?;
648                if matches!(l, Value::Null) || matches!(r, Value::Null) {
649                    return Ok(Value::Null);
650                }
651                Ok(Value::Text(format!(
652                    "{}{}",
653                    l.to_display_string(),
654                    r.to_display_string()
655                )))
656            }
657            other => Err(SQLRiteError::NotImplemented(format!(
658                "binary operator {other:?} is not supported yet"
659            ))),
660        },
661
662        other => Err(SQLRiteError::NotImplemented(format!(
663            "unsupported expression in WHERE/projection: {other:?}"
664        ))),
665    }
666}
667
668/// Evaluates an integer/real arithmetic op. NULL on either side propagates.
669/// Mixed Integer/Real promotes to Real. Divide/Modulo by zero → error.
670fn eval_arith(op: &BinaryOperator, l: &Value, r: &Value) -> Result<Value> {
671    if matches!(l, Value::Null) || matches!(r, Value::Null) {
672        return Ok(Value::Null);
673    }
674    match (l, r) {
675        (Value::Integer(a), Value::Integer(b)) => match op {
676            BinaryOperator::Plus => Ok(Value::Integer(a.wrapping_add(*b))),
677            BinaryOperator::Minus => Ok(Value::Integer(a.wrapping_sub(*b))),
678            BinaryOperator::Multiply => Ok(Value::Integer(a.wrapping_mul(*b))),
679            BinaryOperator::Divide => {
680                if *b == 0 {
681                    Err(SQLRiteError::General("division by zero".to_string()))
682                } else {
683                    Ok(Value::Integer(a / b))
684                }
685            }
686            BinaryOperator::Modulo => {
687                if *b == 0 {
688                    Err(SQLRiteError::General("modulo by zero".to_string()))
689                } else {
690                    Ok(Value::Integer(a % b))
691                }
692            }
693            _ => unreachable!(),
694        },
695        // Anything involving a Real promotes both sides to f64.
696        (a, b) => {
697            let af = as_number(a)?;
698            let bf = as_number(b)?;
699            match op {
700                BinaryOperator::Plus => Ok(Value::Real(af + bf)),
701                BinaryOperator::Minus => Ok(Value::Real(af - bf)),
702                BinaryOperator::Multiply => Ok(Value::Real(af * bf)),
703                BinaryOperator::Divide => {
704                    if bf == 0.0 {
705                        Err(SQLRiteError::General("division by zero".to_string()))
706                    } else {
707                        Ok(Value::Real(af / bf))
708                    }
709                }
710                BinaryOperator::Modulo => {
711                    if bf == 0.0 {
712                        Err(SQLRiteError::General("modulo by zero".to_string()))
713                    } else {
714                        Ok(Value::Real(af % bf))
715                    }
716                }
717                _ => unreachable!(),
718            }
719        }
720    }
721}
722
723fn as_number(v: &Value) -> Result<f64> {
724    match v {
725        Value::Integer(i) => Ok(*i as f64),
726        Value::Real(f) => Ok(*f),
727        Value::Bool(b) => Ok(if *b { 1.0 } else { 0.0 }),
728        other => Err(SQLRiteError::General(format!(
729            "arithmetic on non-numeric value '{}'",
730            other.to_display_string()
731        ))),
732    }
733}
734
735fn as_bool(v: &Value) -> Result<bool> {
736    match v {
737        Value::Bool(b) => Ok(*b),
738        Value::Null => Ok(false),
739        Value::Integer(i) => Ok(*i != 0),
740        other => Err(SQLRiteError::Internal(format!(
741            "expected boolean, got {}",
742            other.to_display_string()
743        ))),
744    }
745}
746
747fn convert_literal(v: &sqlparser::ast::Value) -> Result<Value> {
748    use sqlparser::ast::Value as AstValue;
749    match v {
750        AstValue::Number(n, _) => {
751            if let Ok(i) = n.parse::<i64>() {
752                Ok(Value::Integer(i))
753            } else if let Ok(f) = n.parse::<f64>() {
754                Ok(Value::Real(f))
755            } else {
756                Err(SQLRiteError::Internal(format!(
757                    "could not parse numeric literal '{n}'"
758                )))
759            }
760        }
761        AstValue::SingleQuotedString(s) => Ok(Value::Text(s.clone())),
762        AstValue::Boolean(b) => Ok(Value::Bool(*b)),
763        AstValue::Null => Ok(Value::Null),
764        other => Err(SQLRiteError::NotImplemented(format!(
765            "unsupported literal value: {other:?}"
766        ))),
767    }
768}