Skip to main content

sqlrite/sql/
executor.rs

1//! Query executors — evaluate parsed SQL statements against the in-memory
2//! storage and produce formatted output.
3
4use std::cmp::Ordering;
5
6use prettytable::{Cell as PrintCell, Row as PrintRow, Table as PrintTable};
7use sqlparser::ast::{
8    AlterTable, AlterTableOperation, AssignmentTarget, BinaryOperator, CreateIndex, Delete, Expr,
9    FromTable, FunctionArg, FunctionArgExpr, FunctionArguments, IndexType, ObjectName,
10    ObjectNamePart, RenameTableNameKind, Statement, TableFactor, TableWithJoins, UnaryOperator,
11    Update, Value as AstValue,
12};
13
14use crate::error::{Result, SQLRiteError};
15use crate::sql::db::database::Database;
16use crate::sql::db::secondary_index::{IndexOrigin, SecondaryIndex};
17use crate::sql::db::table::{
18    DataType, FtsIndexEntry, HnswIndexEntry, Table, Value, parse_vector_literal,
19};
20use crate::sql::fts::{Bm25Params, PostingList};
21use crate::sql::hnsw::{DistanceMetric, HnswIndex};
22use crate::sql::parser::select::{OrderByClause, Projection, SelectQuery};
23
24/// Executes a parsed `SelectQuery` against the database and returns a
25/// human-readable rendering of the result set (prettytable). Also returns
26/// the number of rows produced, for the top-level status message.
27/// Structured result of a SELECT: column names in projection order,
28/// and each matching row as a `Vec<Value>` aligned with the columns.
29/// Phase 5a introduced this so the public `Connection` / `Statement`
30/// API has typed rows to yield; the existing `execute_select` that
31/// returns pre-rendered text is now a thin wrapper on top.
32pub struct SelectResult {
33    pub columns: Vec<String>,
34    pub rows: Vec<Vec<Value>>,
35}
36
37/// Executes a SELECT and returns structured rows. The typed rows are
38/// what the new public API streams to callers; the REPL / Tauri app
39/// pre-render into a prettytable via `execute_select`.
40pub fn execute_select_rows(query: SelectQuery, db: &Database) -> Result<SelectResult> {
41    let table = db
42        .get_table(query.table_name.clone())
43        .map_err(|_| SQLRiteError::Internal(format!("Table '{}' not found", query.table_name)))?;
44
45    // Resolve projection to a concrete ordered column list.
46    let projected_cols: Vec<String> = match &query.projection {
47        Projection::All => table.column_names(),
48        Projection::Columns(cols) => {
49            for c in cols {
50                if !table.contains_column(c.to_string()) {
51                    return Err(SQLRiteError::Internal(format!(
52                        "Column '{c}' does not exist on table '{}'",
53                        query.table_name
54                    )));
55                }
56            }
57            cols.clone()
58        }
59    };
60
61    // Collect matching rowids. If the WHERE is the shape `col = literal`
62    // and `col` has a secondary index, probe the index for an O(log N)
63    // seek; otherwise fall back to the full table scan.
64    let matching = match select_rowids(table, query.selection.as_ref())? {
65        RowidSource::IndexProbe(rowids) => rowids,
66        RowidSource::FullScan => {
67            let mut out = Vec::new();
68            for rowid in table.rowids() {
69                if let Some(expr) = &query.selection {
70                    if !eval_predicate(expr, table, rowid)? {
71                        continue;
72                    }
73                }
74                out.push(rowid);
75            }
76            out
77        }
78    };
79    let mut matching = matching;
80
81    // Phase 7c — bounded-heap top-k optimization.
82    //
83    // The naive "ORDER BY <expr>" path (Phase 7b) sorts every matching
84    // rowid: O(N log N) sort_by + a truncate. For KNN queries
85    //
86    //     SELECT id FROM docs
87    //     ORDER BY vec_distance_l2(embedding, [...])
88    //     LIMIT 10;
89    //
90    // N is the table row count and k is the LIMIT. With a bounded
91    // max-heap of size k we can find the top-k in O(N log k) — same
92    // sort_by-per-row cost on the heap operations, but k is typically
93    // 10-100 while N can be millions.
94    //
95    // Phase 7d.2 — HNSW ANN probe.
96    //
97    // Even better than the bounded heap: if the ORDER BY expression is
98    // exactly `vec_distance_l2(<col>, <bracket-array literal>)` AND
99    // `<col>` has an HNSW index attached, skip the linear scan
100    // entirely and probe the graph in O(log N). Approximate but
101    // typically ≥ 0.95 recall (verified by the recall tests in
102    // src/sql/hnsw.rs).
103    //
104    // We branch in cases:
105    //   1. ORDER BY + LIMIT k matches the HNSW probe pattern  → graph probe.
106    //   2. ORDER BY + LIMIT k matches the FTS probe pattern   → posting probe.
107    //   3. ORDER BY + LIMIT k where k < |matching|            → bounded heap (7c).
108    //   4. ORDER BY without LIMIT, or LIMIT >= |matching|     → full sort.
109    //   5. LIMIT without ORDER BY                              → just truncate.
110    match (&query.order_by, query.limit) {
111        (Some(order), Some(k)) if try_hnsw_probe(table, &order.expr, k).is_some() => {
112            matching = try_hnsw_probe(table, &order.expr, k).unwrap();
113        }
114        (Some(order), Some(k))
115            if try_fts_probe(table, &order.expr, order.ascending, k).is_some() =>
116        {
117            matching = try_fts_probe(table, &order.expr, order.ascending, k).unwrap();
118        }
119        (Some(order), Some(k)) if k < matching.len() => {
120            matching = select_topk(&matching, table, order, k)?;
121        }
122        (Some(order), _) => {
123            sort_rowids(&mut matching, table, order)?;
124            if let Some(k) = query.limit {
125                matching.truncate(k);
126            }
127        }
128        (None, Some(k)) => {
129            matching.truncate(k);
130        }
131        (None, None) => {}
132    }
133
134    // Build typed rows. Missing cells surface as `Value::Null` — that
135    // maps a column-not-present-for-this-rowid case onto the public
136    // `Row::get` → `Option<T>` surface cleanly.
137    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(matching.len());
138    for rowid in &matching {
139        let row: Vec<Value> = projected_cols
140            .iter()
141            .map(|col| table.get_value(col, *rowid).unwrap_or(Value::Null))
142            .collect();
143        rows.push(row);
144    }
145
146    Ok(SelectResult {
147        columns: projected_cols,
148        rows,
149    })
150}
151
152/// Executes a SELECT and returns `(rendered_table, row_count)`. The
153/// REPL and Tauri app use this to keep the table-printing behaviour
154/// the engine has always shipped. Structured callers use
155/// `execute_select_rows` instead.
156pub fn execute_select(query: SelectQuery, db: &Database) -> Result<(String, usize)> {
157    let result = execute_select_rows(query, db)?;
158    let row_count = result.rows.len();
159
160    let mut print_table = PrintTable::new();
161    let header_cells: Vec<PrintCell> = result.columns.iter().map(|c| PrintCell::new(c)).collect();
162    print_table.add_row(PrintRow::new(header_cells));
163
164    for row in &result.rows {
165        let cells: Vec<PrintCell> = row
166            .iter()
167            .map(|v| PrintCell::new(&v.to_display_string()))
168            .collect();
169        print_table.add_row(PrintRow::new(cells));
170    }
171
172    Ok((print_table.to_string(), row_count))
173}
174
175/// Executes a DELETE statement. Returns the number of rows removed.
176pub fn execute_delete(stmt: &Statement, db: &mut Database) -> Result<usize> {
177    let Statement::Delete(Delete {
178        from, selection, ..
179    }) = stmt
180    else {
181        return Err(SQLRiteError::Internal(
182            "execute_delete called on a non-DELETE statement".to_string(),
183        ));
184    };
185
186    let tables = match from {
187        FromTable::WithFromKeyword(t) | FromTable::WithoutKeyword(t) => t,
188    };
189    let table_name = extract_single_table_name(tables)?;
190
191    // Compute matching rowids with an immutable borrow, then mutate.
192    let matching: Vec<i64> = {
193        let table = db
194            .get_table(table_name.clone())
195            .map_err(|_| SQLRiteError::Internal(format!("Table '{table_name}' not found")))?;
196        match select_rowids(table, selection.as_ref())? {
197            RowidSource::IndexProbe(rowids) => rowids,
198            RowidSource::FullScan => {
199                let mut out = Vec::new();
200                for rowid in table.rowids() {
201                    if let Some(expr) = selection {
202                        if !eval_predicate(expr, table, rowid)? {
203                            continue;
204                        }
205                    }
206                    out.push(rowid);
207                }
208                out
209            }
210        }
211    };
212
213    let table = db.get_table_mut(table_name)?;
214    for rowid in &matching {
215        table.delete_row(*rowid);
216    }
217    // Phase 7d.3 — any DELETE invalidates every HNSW index on this
218    // table (the deleted node could still appear in other nodes'
219    // neighbor lists, breaking subsequent searches). Mark dirty so
220    // the next save rebuilds from current rows before serializing.
221    //
222    // Phase 8b — same posture for FTS indexes (Q7 — rebuild-on-save
223    // mirrors HNSW). The deleted rowid still appears in posting
224    // lists; leaving it would surface zombie hits in future queries.
225    if !matching.is_empty() {
226        for entry in &mut table.hnsw_indexes {
227            entry.needs_rebuild = true;
228        }
229        for entry in &mut table.fts_indexes {
230            entry.needs_rebuild = true;
231        }
232    }
233    Ok(matching.len())
234}
235
236/// Executes an UPDATE statement. Returns the number of rows updated.
237pub fn execute_update(stmt: &Statement, db: &mut Database) -> Result<usize> {
238    let Statement::Update(Update {
239        table,
240        assignments,
241        from,
242        selection,
243        ..
244    }) = stmt
245    else {
246        return Err(SQLRiteError::Internal(
247            "execute_update called on a non-UPDATE statement".to_string(),
248        ));
249    };
250
251    if from.is_some() {
252        return Err(SQLRiteError::NotImplemented(
253            "UPDATE ... FROM is not supported yet".to_string(),
254        ));
255    }
256
257    let table_name = extract_table_name(table)?;
258
259    // Resolve assignment targets to plain column names and verify they exist.
260    let mut parsed_assignments: Vec<(String, Expr)> = Vec::with_capacity(assignments.len());
261    {
262        let tbl = db
263            .get_table(table_name.clone())
264            .map_err(|_| SQLRiteError::Internal(format!("Table '{table_name}' not found")))?;
265        for a in assignments {
266            let col = match &a.target {
267                AssignmentTarget::ColumnName(name) => name
268                    .0
269                    .last()
270                    .map(|p| p.to_string())
271                    .ok_or_else(|| SQLRiteError::Internal("empty column name".to_string()))?,
272                AssignmentTarget::Tuple(_) => {
273                    return Err(SQLRiteError::NotImplemented(
274                        "tuple assignment targets are not supported".to_string(),
275                    ));
276                }
277            };
278            if !tbl.contains_column(col.clone()) {
279                return Err(SQLRiteError::Internal(format!(
280                    "UPDATE references unknown column '{col}'"
281                )));
282            }
283            parsed_assignments.push((col, a.value.clone()));
284        }
285    }
286
287    // Gather matching rowids + the new values to write for each assignment, under
288    // an immutable borrow. Uses the index-probe fast path when the WHERE is
289    // `col = literal` on an indexed column.
290    let work: Vec<(i64, Vec<(String, Value)>)> = {
291        let tbl = db.get_table(table_name.clone())?;
292        let matched_rowids: Vec<i64> = match select_rowids(tbl, selection.as_ref())? {
293            RowidSource::IndexProbe(rowids) => rowids,
294            RowidSource::FullScan => {
295                let mut out = Vec::new();
296                for rowid in tbl.rowids() {
297                    if let Some(expr) = selection {
298                        if !eval_predicate(expr, tbl, rowid)? {
299                            continue;
300                        }
301                    }
302                    out.push(rowid);
303                }
304                out
305            }
306        };
307        let mut rows_to_update = Vec::new();
308        for rowid in matched_rowids {
309            let mut values = Vec::with_capacity(parsed_assignments.len());
310            for (col, expr) in &parsed_assignments {
311                // UPDATE's RHS is evaluated in the context of the row being updated,
312                // so column references on the right resolve to the current row's values.
313                let v = eval_expr(expr, tbl, rowid)?;
314                values.push((col.clone(), v));
315            }
316            rows_to_update.push((rowid, values));
317        }
318        rows_to_update
319    };
320
321    let tbl = db.get_table_mut(table_name)?;
322    for (rowid, values) in &work {
323        for (col, v) in values {
324            tbl.set_value(col, *rowid, v.clone())?;
325        }
326    }
327
328    // Phase 7d.3 — UPDATE may have changed a vector column that an
329    // HNSW index covers. Mark every covering index dirty so save
330    // rebuilds from current rows. (Updates that only touched
331    // non-vector columns also mark dirty, which is over-conservative
332    // but harmless — the rebuild walks rows anyway, and the cost is
333    // only paid on save.)
334    //
335    // Phase 8b — same shape for FTS indexes covering updated TEXT cols.
336    if !work.is_empty() {
337        let updated_columns: std::collections::HashSet<&str> = work
338            .iter()
339            .flat_map(|(_, values)| values.iter().map(|(c, _)| c.as_str()))
340            .collect();
341        for entry in &mut tbl.hnsw_indexes {
342            if updated_columns.contains(entry.column_name.as_str()) {
343                entry.needs_rebuild = true;
344            }
345        }
346        for entry in &mut tbl.fts_indexes {
347            if updated_columns.contains(entry.column_name.as_str()) {
348                entry.needs_rebuild = true;
349            }
350        }
351    }
352    Ok(work.len())
353}
354
355/// Handles `CREATE INDEX [UNIQUE] <name> ON <table> [USING <method>] (<column>)`.
356/// Single-column indexes only.
357///
358/// Two flavours, branching on the optional `USING <method>` clause:
359///   - **No USING, or `USING btree`**: regular B-Tree secondary index
360///     (Phase 3e). Indexable types: Integer, Text.
361///   - **`USING hnsw`**: HNSW ANN index (Phase 7d.2). Indexable types:
362///     Vector(N) only. Distance metric is L2 by default; cosine and
363///     dot variants are deferred to Phase 7d.x.
364///
365/// Returns the (possibly synthesized) index name for the status message.
366pub fn execute_create_index(stmt: &Statement, db: &mut Database) -> Result<String> {
367    let Statement::CreateIndex(CreateIndex {
368        name,
369        table_name,
370        columns,
371        using,
372        unique,
373        if_not_exists,
374        predicate,
375        ..
376    }) = stmt
377    else {
378        return Err(SQLRiteError::Internal(
379            "execute_create_index called on a non-CREATE-INDEX statement".to_string(),
380        ));
381    };
382
383    if predicate.is_some() {
384        return Err(SQLRiteError::NotImplemented(
385            "partial indexes (CREATE INDEX ... WHERE) are not supported yet".to_string(),
386        ));
387    }
388
389    if columns.len() != 1 {
390        return Err(SQLRiteError::NotImplemented(format!(
391            "multi-column indexes are not supported yet ({} columns given)",
392            columns.len()
393        )));
394    }
395
396    let index_name = name.as_ref().map(|n| n.to_string()).ok_or_else(|| {
397        SQLRiteError::NotImplemented(
398            "anonymous CREATE INDEX (no name) is not supported — give it a name".to_string(),
399        )
400    })?;
401
402    // Detect USING <method>. The `using` field on CreateIndex covers the
403    // pre-column form `CREATE INDEX … USING hnsw (col)`. (sqlparser also
404    // accepts a post-column form `… (col) USING hnsw` and parks that in
405    // `index_options`; we don't bother with it — the canonical form is
406    // pre-column and matches PG/pgvector convention.)
407    let method = match using {
408        Some(IndexType::Custom(ident)) if ident.value.eq_ignore_ascii_case("hnsw") => {
409            IndexMethod::Hnsw
410        }
411        Some(IndexType::Custom(ident)) if ident.value.eq_ignore_ascii_case("fts") => {
412            IndexMethod::Fts
413        }
414        Some(IndexType::Custom(ident)) if ident.value.eq_ignore_ascii_case("btree") => {
415            IndexMethod::Btree
416        }
417        Some(other) => {
418            return Err(SQLRiteError::NotImplemented(format!(
419                "CREATE INDEX … USING {other:?} is not supported \
420                 (try `hnsw`, `fts`, or no USING clause)"
421            )));
422        }
423        None => IndexMethod::Btree,
424    };
425
426    let table_name_str = table_name.to_string();
427    let column_name = match &columns[0].column.expr {
428        Expr::Identifier(ident) => ident.value.clone(),
429        Expr::CompoundIdentifier(parts) => parts
430            .last()
431            .map(|p| p.value.clone())
432            .ok_or_else(|| SQLRiteError::Internal("empty compound identifier".to_string()))?,
433        other => {
434            return Err(SQLRiteError::NotImplemented(format!(
435                "CREATE INDEX only supports simple column references, got {other:?}"
436            )));
437        }
438    };
439
440    // Validate: table exists, column exists, type matches the index method,
441    // name is unique across both index kinds. Snapshot (rowid, value) pairs
442    // up front under the immutable borrow so the mutable attach later
443    // doesn't fight over `self`.
444    let (datatype, existing_rowids_and_values): (DataType, Vec<(i64, Value)>) = {
445        let table = db.get_table(table_name_str.clone()).map_err(|_| {
446            SQLRiteError::General(format!(
447                "CREATE INDEX references unknown table '{table_name_str}'"
448            ))
449        })?;
450        if !table.contains_column(column_name.clone()) {
451            return Err(SQLRiteError::General(format!(
452                "CREATE INDEX references unknown column '{column_name}' on table '{table_name_str}'"
453            )));
454        }
455        let col = table
456            .columns
457            .iter()
458            .find(|c| c.column_name == column_name)
459            .expect("we just verified the column exists");
460
461        // Name uniqueness check spans ALL index kinds — btree, hnsw, and
462        // fts share one namespace per table.
463        if table.index_by_name(&index_name).is_some()
464            || table.hnsw_indexes.iter().any(|i| i.name == index_name)
465            || table.fts_indexes.iter().any(|i| i.name == index_name)
466        {
467            if *if_not_exists {
468                return Ok(index_name);
469            }
470            return Err(SQLRiteError::General(format!(
471                "index '{index_name}' already exists"
472            )));
473        }
474        let datatype = clone_datatype(&col.datatype);
475
476        let mut pairs = Vec::new();
477        for rowid in table.rowids() {
478            if let Some(v) = table.get_value(&column_name, rowid) {
479                pairs.push((rowid, v));
480            }
481        }
482        (datatype, pairs)
483    };
484
485    match method {
486        IndexMethod::Btree => create_btree_index(
487            db,
488            &table_name_str,
489            &index_name,
490            &column_name,
491            &datatype,
492            *unique,
493            &existing_rowids_and_values,
494        ),
495        IndexMethod::Hnsw => create_hnsw_index(
496            db,
497            &table_name_str,
498            &index_name,
499            &column_name,
500            &datatype,
501            *unique,
502            &existing_rowids_and_values,
503        ),
504        IndexMethod::Fts => create_fts_index(
505            db,
506            &table_name_str,
507            &index_name,
508            &column_name,
509            &datatype,
510            *unique,
511            &existing_rowids_and_values,
512        ),
513    }
514}
515
516/// Executes `DROP TABLE [IF EXISTS] <name>;`. Mirrors SQLite's single-target
517/// shape: sqlparser parses `DROP TABLE a, b` as one statement with
518/// `names: vec![a, b]`, but we reject the multi-target form to keep error
519/// semantics simple (no partial-failure rollback).
520///
521/// On success the table — and every index attached to it — disappears from
522/// the in-memory `Database`. The next auto-save rebuilds `sqlrite_master`
523/// from scratch and simply doesn't write a row for the dropped table or
524/// its indexes; pages previously occupied by them become orphans on disk
525/// (no free-list yet — file size doesn't shrink until a future VACUUM).
526pub fn execute_drop_table(
527    names: &[ObjectName],
528    if_exists: bool,
529    db: &mut Database,
530) -> Result<usize> {
531    if names.len() != 1 {
532        return Err(SQLRiteError::NotImplemented(
533            "DROP TABLE supports a single table per statement".to_string(),
534        ));
535    }
536    let name = names[0].to_string();
537
538    if name == crate::sql::pager::MASTER_TABLE_NAME {
539        return Err(SQLRiteError::General(format!(
540            "'{}' is a reserved name used by the internal schema catalog",
541            crate::sql::pager::MASTER_TABLE_NAME
542        )));
543    }
544
545    if !db.contains_table(name.clone()) {
546        return if if_exists {
547            Ok(0)
548        } else {
549            Err(SQLRiteError::General(format!(
550                "Table '{name}' does not exist"
551            )))
552        };
553    }
554
555    db.tables.remove(&name);
556    Ok(1)
557}
558
559/// Executes `DROP INDEX [IF EXISTS] <name>;`. The statement does not name a
560/// table, so we walk every table looking for the index across all three
561/// index families (B-Tree secondary, HNSW, FTS).
562///
563/// Refuses to drop auto-indexes (`origin == IndexOrigin::Auto`) — those are
564/// invariants of the table's PRIMARY KEY / UNIQUE constraints and should
565/// only disappear when the column or table they depend on is dropped.
566/// SQLite has the same rule for its `sqlite_autoindex_*` indexes.
567pub fn execute_drop_index(
568    names: &[ObjectName],
569    if_exists: bool,
570    db: &mut Database,
571) -> Result<usize> {
572    if names.len() != 1 {
573        return Err(SQLRiteError::NotImplemented(
574            "DROP INDEX supports a single index per statement".to_string(),
575        ));
576    }
577    let name = names[0].to_string();
578
579    for table in db.tables.values_mut() {
580        if let Some(secondary) = table.secondary_indexes.iter().find(|i| i.name == name) {
581            if secondary.origin == IndexOrigin::Auto {
582                return Err(SQLRiteError::General(format!(
583                    "cannot drop auto-created index '{name}' (drop the column or table instead)"
584                )));
585            }
586            table.secondary_indexes.retain(|i| i.name != name);
587            return Ok(1);
588        }
589        if table.hnsw_indexes.iter().any(|i| i.name == name) {
590            table.hnsw_indexes.retain(|i| i.name != name);
591            return Ok(1);
592        }
593        if table.fts_indexes.iter().any(|i| i.name == name) {
594            table.fts_indexes.retain(|i| i.name != name);
595            return Ok(1);
596        }
597    }
598
599    if if_exists {
600        Ok(0)
601    } else {
602        Err(SQLRiteError::General(format!(
603            "Index '{name}' does not exist"
604        )))
605    }
606}
607
608/// Executes `ALTER TABLE [IF EXISTS] <name> <op>;` for one operation per
609/// statement. Supports four sub-operations matching SQLite:
610///
611///   - `RENAME TO <new>`
612///   - `RENAME COLUMN <old> TO <new>`
613///   - `ADD COLUMN <coldef>` (NOT NULL requires DEFAULT on a non-empty table;
614///     PK / UNIQUE constraints rejected — would need backfill + uniqueness)
615///   - `DROP COLUMN <name>` (refuses PK column and only-column)
616///
617/// Multi-operation ALTER (`ALTER TABLE foo RENAME TO bar, ADD COLUMN x ...`)
618/// is rejected; SQLite forbids it too.
619pub fn execute_alter_table(alter: AlterTable, db: &mut Database) -> Result<String> {
620    let table_name = alter.name.to_string();
621
622    if table_name == crate::sql::pager::MASTER_TABLE_NAME {
623        return Err(SQLRiteError::General(format!(
624            "'{}' is a reserved name used by the internal schema catalog",
625            crate::sql::pager::MASTER_TABLE_NAME
626        )));
627    }
628
629    if !db.contains_table(table_name.clone()) {
630        return if alter.if_exists {
631            Ok("ALTER TABLE: no-op (table does not exist)".to_string())
632        } else {
633            Err(SQLRiteError::General(format!(
634                "Table '{table_name}' does not exist"
635            )))
636        };
637    }
638
639    if alter.operations.len() != 1 {
640        return Err(SQLRiteError::NotImplemented(
641            "ALTER TABLE supports one operation per statement".to_string(),
642        ));
643    }
644
645    match &alter.operations[0] {
646        AlterTableOperation::RenameTable { table_name: kind } => {
647            let new_name = match kind {
648                RenameTableNameKind::To(name) => name.to_string(),
649                RenameTableNameKind::As(_) => {
650                    return Err(SQLRiteError::NotImplemented(
651                        "ALTER TABLE ... RENAME AS (MySQL-only) is not supported; use RENAME TO"
652                            .to_string(),
653                    ));
654                }
655            };
656            alter_rename_table(db, &table_name, &new_name)?;
657            Ok(format!(
658                "ALTER TABLE '{table_name}' RENAME TO '{new_name}' executed."
659            ))
660        }
661        AlterTableOperation::RenameColumn {
662            old_column_name,
663            new_column_name,
664        } => {
665            let old = old_column_name.value.clone();
666            let new = new_column_name.value.clone();
667            db.get_table_mut(table_name.clone())?
668                .rename_column(&old, &new)?;
669            Ok(format!(
670                "ALTER TABLE '{table_name}' RENAME COLUMN '{old}' TO '{new}' executed."
671            ))
672        }
673        AlterTableOperation::AddColumn {
674            column_def,
675            if_not_exists,
676            ..
677        } => {
678            let parsed = crate::sql::parser::create::parse_one_column(column_def)?;
679            let table = db.get_table_mut(table_name.clone())?;
680            if *if_not_exists && table.contains_column(parsed.name.clone()) {
681                return Ok(format!(
682                    "ALTER TABLE '{table_name}' ADD COLUMN: no-op (column '{}' already exists)",
683                    parsed.name
684                ));
685            }
686            let col_name = parsed.name.clone();
687            table.add_column(parsed)?;
688            Ok(format!(
689                "ALTER TABLE '{table_name}' ADD COLUMN '{col_name}' executed."
690            ))
691        }
692        AlterTableOperation::DropColumn {
693            column_names,
694            if_exists,
695            ..
696        } => {
697            if column_names.len() != 1 {
698                return Err(SQLRiteError::NotImplemented(
699                    "ALTER TABLE DROP COLUMN supports a single column per statement".to_string(),
700                ));
701            }
702            let col_name = column_names[0].value.clone();
703            let table = db.get_table_mut(table_name.clone())?;
704            if *if_exists && !table.contains_column(col_name.clone()) {
705                return Ok(format!(
706                    "ALTER TABLE '{table_name}' DROP COLUMN: no-op (column '{col_name}' does not exist)"
707                ));
708            }
709            table.drop_column(&col_name)?;
710            Ok(format!(
711                "ALTER TABLE '{table_name}' DROP COLUMN '{col_name}' executed."
712            ))
713        }
714        other => Err(SQLRiteError::NotImplemented(format!(
715            "ALTER TABLE operation {other:?} is not supported"
716        ))),
717    }
718}
719
720/// Renames a table in `db.tables`. Updates `tb_name`, every secondary
721/// index's `table_name` field, and any auto-index whose name embedded
722/// the old table name. HNSW / FTS index entries don't carry a
723/// `table_name` field — they're addressed implicitly via the `Table`
724/// they live inside, so they move with the rename for free.
725fn alter_rename_table(db: &mut Database, old: &str, new: &str) -> Result<()> {
726    if new == crate::sql::pager::MASTER_TABLE_NAME {
727        return Err(SQLRiteError::General(format!(
728            "'{}' is a reserved name used by the internal schema catalog",
729            crate::sql::pager::MASTER_TABLE_NAME
730        )));
731    }
732    if old == new {
733        return Ok(());
734    }
735    if db.contains_table(new.to_string()) {
736        return Err(SQLRiteError::General(format!(
737            "target table '{new}' already exists"
738        )));
739    }
740
741    let mut table = db
742        .tables
743        .remove(old)
744        .ok_or_else(|| SQLRiteError::General(format!("Table '{old}' does not exist")))?;
745    table.tb_name = new.to_string();
746    for idx in table.secondary_indexes.iter_mut() {
747        idx.table_name = new.to_string();
748        if idx.origin == IndexOrigin::Auto
749            && idx.name == SecondaryIndex::auto_name(old, &idx.column_name)
750        {
751            idx.name = SecondaryIndex::auto_name(new, &idx.column_name);
752        }
753    }
754    db.tables.insert(new.to_string(), table);
755    Ok(())
756}
757
758/// `USING <method>` choices recognized by `execute_create_index`. A
759/// missing USING clause defaults to `Btree` so existing CREATE INDEX
760/// statements (Phase 3e) keep working unchanged.
761#[derive(Debug, Clone, Copy)]
762enum IndexMethod {
763    Btree,
764    Hnsw,
765    /// Phase 8b — full-text inverted index over a TEXT column.
766    Fts,
767}
768
769/// Builds a Phase 3e B-Tree secondary index and attaches it to the table.
770fn create_btree_index(
771    db: &mut Database,
772    table_name: &str,
773    index_name: &str,
774    column_name: &str,
775    datatype: &DataType,
776    unique: bool,
777    existing: &[(i64, Value)],
778) -> Result<String> {
779    let mut idx = SecondaryIndex::new(
780        index_name.to_string(),
781        table_name.to_string(),
782        column_name.to_string(),
783        datatype,
784        unique,
785        IndexOrigin::Explicit,
786    )?;
787
788    // Populate from existing rows. UNIQUE violations here mean the
789    // existing data already breaks the new index's constraint — a
790    // common source of user confusion, so be explicit.
791    for (rowid, v) in existing {
792        if unique && idx.would_violate_unique(v) {
793            return Err(SQLRiteError::General(format!(
794                "cannot create UNIQUE index '{index_name}': column '{column_name}' \
795                 already contains the duplicate value {}",
796                v.to_display_string()
797            )));
798        }
799        idx.insert(v, *rowid)?;
800    }
801
802    let table_mut = db.get_table_mut(table_name.to_string())?;
803    table_mut.secondary_indexes.push(idx);
804    Ok(index_name.to_string())
805}
806
807/// Builds a Phase 7d.2 HNSW index and attaches it to the table.
808fn create_hnsw_index(
809    db: &mut Database,
810    table_name: &str,
811    index_name: &str,
812    column_name: &str,
813    datatype: &DataType,
814    unique: bool,
815    existing: &[(i64, Value)],
816) -> Result<String> {
817    // HNSW only makes sense on VECTOR columns. Reject anything else
818    // with a clear message — this is the most likely user error.
819    let dim = match datatype {
820        DataType::Vector(d) => *d,
821        other => {
822            return Err(SQLRiteError::General(format!(
823                "USING hnsw requires a VECTOR column; '{column_name}' is {other}"
824            )));
825        }
826    };
827
828    if unique {
829        return Err(SQLRiteError::General(
830            "UNIQUE has no meaning for HNSW indexes".to_string(),
831        ));
832    }
833
834    // Build the in-memory graph. Distance metric is L2 by default
835    // (Phase 7d.2 doesn't yet expose a knob for picking cosine/dot —
836    // see `docs/phase-7-plan.md` for the deferral).
837    //
838    // Seed: hash the index name so different indexes get different
839    // graph topologies, but the same index always gets the same one
840    // — useful when debugging recall / index size.
841    let seed = hash_str_to_seed(index_name);
842    let mut idx = HnswIndex::new(DistanceMetric::L2, seed);
843
844    // Snapshot the (rowid, vector) pairs into a side map so the
845    // get_vec closure below can serve them by id without re-borrowing
846    // the table (we're already holding `existing` — flatten it).
847    let mut vec_map: std::collections::HashMap<i64, Vec<f32>> =
848        std::collections::HashMap::with_capacity(existing.len());
849    for (rowid, v) in existing {
850        match v {
851            Value::Vector(vec) => {
852                if vec.len() != dim {
853                    return Err(SQLRiteError::Internal(format!(
854                        "row {rowid} stores a {}-dim vector in column '{column_name}' \
855                         declared as VECTOR({dim}) — schema invariant violated",
856                        vec.len()
857                    )));
858                }
859                vec_map.insert(*rowid, vec.clone());
860            }
861            // Non-vector values (theoretical NULL, type coercion bug)
862            // get skipped — they wouldn't have a sensible graph
863            // position anyway.
864            _ => continue,
865        }
866    }
867
868    for (rowid, _) in existing {
869        if let Some(v) = vec_map.get(rowid) {
870            let v_clone = v.clone();
871            idx.insert(*rowid, &v_clone, |id| {
872                vec_map.get(&id).cloned().unwrap_or_default()
873            });
874        }
875    }
876
877    let table_mut = db.get_table_mut(table_name.to_string())?;
878    table_mut.hnsw_indexes.push(HnswIndexEntry {
879        name: index_name.to_string(),
880        column_name: column_name.to_string(),
881        index: idx,
882        // Freshly built — no DELETE/UPDATE has invalidated it yet.
883        needs_rebuild: false,
884    });
885    Ok(index_name.to_string())
886}
887
888/// Builds a Phase 8b FTS inverted index and attaches it to the table.
889/// Mirrors [`create_hnsw_index`] in shape: validate column type,
890/// tokenize each existing row's text into the in-memory posting list,
891/// push an `FtsIndexEntry`.
892fn create_fts_index(
893    db: &mut Database,
894    table_name: &str,
895    index_name: &str,
896    column_name: &str,
897    datatype: &DataType,
898    unique: bool,
899    existing: &[(i64, Value)],
900) -> Result<String> {
901    // FTS is a TEXT-only feature for the MVP. JSON columns share the
902    // Row::Text storage but their content is structured — full-text
903    // indexing JSON keys + values would need a different design (and
904    // is out of scope per the Phase 8 plan's "Out of scope" section).
905    match datatype {
906        DataType::Text => {}
907        other => {
908            return Err(SQLRiteError::General(format!(
909                "USING fts requires a TEXT column; '{column_name}' is {other}"
910            )));
911        }
912    }
913
914    if unique {
915        return Err(SQLRiteError::General(
916            "UNIQUE has no meaning for FTS indexes".to_string(),
917        ));
918    }
919
920    let mut idx = PostingList::new();
921    for (rowid, v) in existing {
922        if let Value::Text(text) = v {
923            idx.insert(*rowid, text);
924        }
925        // Non-text values (Null, type coercion bugs) get skipped — same
926        // posture as create_hnsw_index for non-vector values.
927    }
928
929    let table_mut = db.get_table_mut(table_name.to_string())?;
930    table_mut.fts_indexes.push(FtsIndexEntry {
931        name: index_name.to_string(),
932        column_name: column_name.to_string(),
933        index: idx,
934        needs_rebuild: false,
935    });
936    Ok(index_name.to_string())
937}
938
939/// Stable, deterministic hash of a string into a u64 RNG seed. FNV-1a;
940/// avoids pulling in `std::hash::DefaultHasher` (which is randomized
941/// per process).
942fn hash_str_to_seed(s: &str) -> u64 {
943    let mut h: u64 = 0xCBF29CE484222325;
944    for b in s.as_bytes() {
945        h ^= *b as u64;
946        h = h.wrapping_mul(0x100000001B3);
947    }
948    h
949}
950
951/// Cheap clone helper — `DataType` intentionally doesn't derive `Clone`
952/// because the enum has no ergonomic reason to be cloneable elsewhere.
953fn clone_datatype(dt: &DataType) -> DataType {
954    match dt {
955        DataType::Integer => DataType::Integer,
956        DataType::Text => DataType::Text,
957        DataType::Real => DataType::Real,
958        DataType::Bool => DataType::Bool,
959        DataType::Vector(dim) => DataType::Vector(*dim),
960        DataType::Json => DataType::Json,
961        DataType::None => DataType::None,
962        DataType::Invalid => DataType::Invalid,
963    }
964}
965
966fn extract_single_table_name(tables: &[TableWithJoins]) -> Result<String> {
967    if tables.len() != 1 {
968        return Err(SQLRiteError::NotImplemented(
969            "multi-table DELETE is not supported yet".to_string(),
970        ));
971    }
972    extract_table_name(&tables[0])
973}
974
975fn extract_table_name(twj: &TableWithJoins) -> Result<String> {
976    if !twj.joins.is_empty() {
977        return Err(SQLRiteError::NotImplemented(
978            "JOIN is not supported yet".to_string(),
979        ));
980    }
981    match &twj.relation {
982        TableFactor::Table { name, .. } => Ok(name.to_string()),
983        _ => Err(SQLRiteError::NotImplemented(
984            "only plain table references are supported".to_string(),
985        )),
986    }
987}
988
989/// Tells the executor how to produce its candidate rowid list.
990enum RowidSource {
991    /// The WHERE was simple enough to probe a secondary index directly.
992    /// The `Vec` already contains exactly the rows the index matched;
993    /// no further WHERE evaluation is needed (the probe is precise).
994    IndexProbe(Vec<i64>),
995    /// No applicable index; caller falls back to walking `table.rowids()`
996    /// and evaluating the WHERE on each row.
997    FullScan,
998}
999
1000/// Try to satisfy `WHERE` with an index probe. Currently supports the
1001/// simplest shape: a single `col = literal` (or `literal = col`) where
1002/// `col` is on a secondary index. AND/OR/range predicates fall back to
1003/// full scan — those can be layered on later without changing the caller.
1004fn select_rowids(table: &Table, selection: Option<&Expr>) -> Result<RowidSource> {
1005    let Some(expr) = selection else {
1006        return Ok(RowidSource::FullScan);
1007    };
1008    let Some((col, literal)) = try_extract_equality(expr) else {
1009        return Ok(RowidSource::FullScan);
1010    };
1011    let Some(idx) = table.index_for_column(&col) else {
1012        return Ok(RowidSource::FullScan);
1013    };
1014
1015    // Convert the literal into a runtime Value. If the literal type doesn't
1016    // match the column's index we still need correct semantics — evaluate
1017    // the WHERE against every row. Fall back to full scan.
1018    let literal_value = match convert_literal(&literal) {
1019        Ok(v) => v,
1020        Err(_) => return Ok(RowidSource::FullScan),
1021    };
1022
1023    // Index lookup returns the full list of rowids matching this equality
1024    // predicate. For unique indexes that's at most one; for non-unique it
1025    // can be many.
1026    let mut rowids = idx.lookup(&literal_value);
1027    rowids.sort_unstable();
1028    Ok(RowidSource::IndexProbe(rowids))
1029}
1030
1031/// Recognizes `expr` as a simple equality on a column reference against a
1032/// literal. Returns `(column_name, literal_value)` if the shape matches;
1033/// `None` otherwise. Accepts both `col = literal` and `literal = col`.
1034fn try_extract_equality(expr: &Expr) -> Option<(String, sqlparser::ast::Value)> {
1035    // Peel off Nested parens so `WHERE (x = 1)` is recognized too.
1036    let peeled = match expr {
1037        Expr::Nested(inner) => inner.as_ref(),
1038        other => other,
1039    };
1040    let Expr::BinaryOp { left, op, right } = peeled else {
1041        return None;
1042    };
1043    if !matches!(op, BinaryOperator::Eq) {
1044        return None;
1045    }
1046    let col_from = |e: &Expr| -> Option<String> {
1047        match e {
1048            Expr::Identifier(ident) => Some(ident.value.clone()),
1049            Expr::CompoundIdentifier(parts) => parts.last().map(|p| p.value.clone()),
1050            _ => None,
1051        }
1052    };
1053    let literal_from = |e: &Expr| -> Option<sqlparser::ast::Value> {
1054        if let Expr::Value(v) = e {
1055            Some(v.value.clone())
1056        } else {
1057            None
1058        }
1059    };
1060    if let (Some(c), Some(l)) = (col_from(left), literal_from(right)) {
1061        return Some((c, l));
1062    }
1063    if let (Some(l), Some(c)) = (literal_from(left), col_from(right)) {
1064        return Some((c, l));
1065    }
1066    None
1067}
1068
1069/// Recognizes the HNSW-probable query pattern and probes the graph
1070/// if a matching index exists.
1071///
1072/// Looks for ORDER BY `vec_distance_l2(<col>, <bracket-array literal>)`
1073/// where the table has an HNSW index attached to `<col>`. On a match,
1074/// returns the top-k rowids straight from the graph (O(log N)). On
1075/// any miss — different function name, no matching index, query
1076/// dimension wrong, etc. — returns `None` and the caller falls through
1077/// to the bounded-heap brute-force path (7c) or the full sort (7b),
1078/// preserving correct results regardless of whether the HNSW pathway
1079/// kicked in.
1080///
1081/// Phase 7d.2 caveats:
1082/// - Only `vec_distance_l2` is recognized. Cosine and dot fall through
1083///   to brute-force because we don't yet expose a per-index distance
1084///   knob (deferred to Phase 7d.x — see `docs/phase-7-plan.md`).
1085/// - Only ASCENDING order makes sense for "k nearest" — DESC ORDER BY
1086///   `vec_distance_l2(...) LIMIT k` would mean "k farthest", which
1087///   isn't what the index is built for. We don't bother to detect
1088///   `ascending == false` here; the optimizer just skips and the
1089///   fallback path handles it correctly (slower).
1090fn try_hnsw_probe(table: &Table, order_expr: &Expr, k: usize) -> Option<Vec<i64>> {
1091    if k == 0 {
1092        return None;
1093    }
1094
1095    // Pattern-match: order expr must be a function call vec_distance_l2(a, b).
1096    let func = match order_expr {
1097        Expr::Function(f) => f,
1098        _ => return None,
1099    };
1100    let fname = match func.name.0.as_slice() {
1101        [ObjectNamePart::Identifier(ident)] => ident.value.to_lowercase(),
1102        _ => return None,
1103    };
1104    if fname != "vec_distance_l2" {
1105        return None;
1106    }
1107
1108    // Extract the two args as raw Exprs.
1109    let arg_list = match &func.args {
1110        FunctionArguments::List(l) => &l.args,
1111        _ => return None,
1112    };
1113    if arg_list.len() != 2 {
1114        return None;
1115    }
1116    let exprs: Vec<&Expr> = arg_list
1117        .iter()
1118        .filter_map(|a| match a {
1119            FunctionArg::Unnamed(FunctionArgExpr::Expr(e)) => Some(e),
1120            _ => None,
1121        })
1122        .collect();
1123    if exprs.len() != 2 {
1124        return None;
1125    }
1126
1127    // One arg must be a column reference (the indexed col); the other
1128    // must be a bracket-array literal (the query vector). Try both
1129    // orderings — pgvector's idiom puts the column on the left, but
1130    // SQL is commutative for distance.
1131    let (col_name, query_vec) = match identify_indexed_arg_and_literal(exprs[0], exprs[1]) {
1132        Some(v) => v,
1133        None => match identify_indexed_arg_and_literal(exprs[1], exprs[0]) {
1134            Some(v) => v,
1135            None => return None,
1136        },
1137    };
1138
1139    // Find the HNSW index on this column.
1140    let entry = table
1141        .hnsw_indexes
1142        .iter()
1143        .find(|e| e.column_name == col_name)?;
1144
1145    // Dimension sanity check — the query vector must match the
1146    // indexed column's declared dimension. If it doesn't, the brute-
1147    // force fallback would also error at the vec_distance_l2 dim-check;
1148    // returning None here lets that path produce the user-visible
1149    // error message.
1150    let declared_dim = match table.columns.iter().find(|c| c.column_name == col_name) {
1151        Some(c) => match &c.datatype {
1152            DataType::Vector(d) => *d,
1153            _ => return None,
1154        },
1155        None => return None,
1156    };
1157    if query_vec.len() != declared_dim {
1158        return None;
1159    }
1160
1161    // Probe the graph. Vectors are looked up from the table's row
1162    // storage — a closure rather than a `&Table` so the algorithm
1163    // module stays decoupled from the SQL types.
1164    let column_for_closure = col_name.clone();
1165    let table_ref = table;
1166    let result = entry.index.search(&query_vec, k, |id| {
1167        match table_ref.get_value(&column_for_closure, id) {
1168            Some(Value::Vector(v)) => v,
1169            _ => Vec::new(),
1170        }
1171    });
1172    Some(result)
1173}
1174
1175/// Phase 8b — FTS optimizer hook.
1176///
1177/// Recognizes `ORDER BY bm25_score(<col>, '<query>') DESC LIMIT <k>`
1178/// and serves it from the FTS index instead of full-scanning. Returns
1179/// `Some(rowids)` already sorted by descending BM25 (with rowid
1180/// ascending as tie-break), or `None` to fall through to scalar eval.
1181///
1182/// **Known limitation (mirrors `try_hnsw_probe`).** This shortcut
1183/// ignores any `WHERE` clause. The canonical FTS query has a
1184/// `WHERE fts_match(<col>, '<q>')` predicate, which is implicitly
1185/// satisfied by the probe results — so dropping it is harmless.
1186/// Anything *else* in the WHERE (`AND status = 'published'`) gets
1187/// silently skipped on the optimizer path. Per Phase 8 plan Q6 we
1188/// match HNSW's posture here; a correctness-preserving multi-index
1189/// composer is deferred.
1190fn try_fts_probe(table: &Table, order_expr: &Expr, ascending: bool, k: usize) -> Option<Vec<i64>> {
1191    if k == 0 || ascending {
1192        // BM25 is "higher = better"; ASC ranking is almost certainly a
1193        // user mistake. Fall through so the caller gets either an
1194        // explicit error from scalar eval or the slow correct path.
1195        return None;
1196    }
1197
1198    let func = match order_expr {
1199        Expr::Function(f) => f,
1200        _ => return None,
1201    };
1202    let fname = match func.name.0.as_slice() {
1203        [ObjectNamePart::Identifier(ident)] => ident.value.to_lowercase(),
1204        _ => return None,
1205    };
1206    if fname != "bm25_score" {
1207        return None;
1208    }
1209
1210    let arg_list = match &func.args {
1211        FunctionArguments::List(l) => &l.args,
1212        _ => return None,
1213    };
1214    if arg_list.len() != 2 {
1215        return None;
1216    }
1217    let exprs: Vec<&Expr> = arg_list
1218        .iter()
1219        .filter_map(|a| match a {
1220            FunctionArg::Unnamed(FunctionArgExpr::Expr(e)) => Some(e),
1221            _ => None,
1222        })
1223        .collect();
1224    if exprs.len() != 2 {
1225        return None;
1226    }
1227
1228    // Arg 0 must be a bare column identifier.
1229    let col_name = match exprs[0] {
1230        Expr::Identifier(ident) if ident.quote_style.is_none() => ident.value.clone(),
1231        _ => return None,
1232    };
1233
1234    // Arg 1 must be a single-quoted string literal. Anything else
1235    // (column reference, function call) requires per-row evaluation —
1236    // we'd lose the whole point of the probe.
1237    let query = match exprs[1] {
1238        Expr::Value(v) => match &v.value {
1239            AstValue::SingleQuotedString(s) => s.clone(),
1240            _ => return None,
1241        },
1242        _ => return None,
1243    };
1244
1245    let entry = table
1246        .fts_indexes
1247        .iter()
1248        .find(|e| e.column_name == col_name)?;
1249
1250    let scored = entry.index.query(&query, &Bm25Params::default());
1251    let mut out: Vec<i64> = scored.into_iter().map(|(id, _)| id).collect();
1252    if out.len() > k {
1253        out.truncate(k);
1254    }
1255    Some(out)
1256}
1257
1258/// Helper for `try_hnsw_probe`: given two function args, identify which
1259/// one is a bare column identifier (the indexed column) and which is a
1260/// bracket-array literal (the query vector). Returns
1261/// `Some((column_name, query_vec))` on a match, `None` otherwise.
1262fn identify_indexed_arg_and_literal(a: &Expr, b: &Expr) -> Option<(String, Vec<f32>)> {
1263    let col_name = match a {
1264        Expr::Identifier(ident) if ident.quote_style.is_none() => ident.value.clone(),
1265        _ => return None,
1266    };
1267    let lit_str = match b {
1268        Expr::Identifier(ident) if ident.quote_style == Some('[') => {
1269            format!("[{}]", ident.value)
1270        }
1271        _ => return None,
1272    };
1273    let v = parse_vector_literal(&lit_str).ok()?;
1274    Some((col_name, v))
1275}
1276
1277/// One entry in the bounded-heap top-k path. Holds a pre-evaluated
1278/// sort key + the rowid it came from. The `asc` flag inverts `Ord`
1279/// so a single `BinaryHeap<HeapEntry>` works for both ASC and DESC
1280/// without wrapping in `std::cmp::Reverse` at the call site:
1281///
1282///   - ASC LIMIT k = "k smallest": natural Ord. Max-heap top is the
1283///     largest currently kept; new items smaller than top displace.
1284///   - DESC LIMIT k = "k largest": Ord reversed. Max-heap top is now
1285///     the smallest currently kept (under reversed Ord, smallest
1286///     looks largest); new items larger than top displace.
1287///
1288/// In both cases the displacement test reduces to "new entry < heap top".
1289struct HeapEntry {
1290    key: Value,
1291    rowid: i64,
1292    asc: bool,
1293}
1294
1295impl PartialEq for HeapEntry {
1296    fn eq(&self, other: &Self) -> bool {
1297        self.cmp(other) == Ordering::Equal
1298    }
1299}
1300
1301impl Eq for HeapEntry {}
1302
1303impl PartialOrd for HeapEntry {
1304    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1305        Some(self.cmp(other))
1306    }
1307}
1308
1309impl Ord for HeapEntry {
1310    fn cmp(&self, other: &Self) -> Ordering {
1311        let raw = compare_values(Some(&self.key), Some(&other.key));
1312        if self.asc { raw } else { raw.reverse() }
1313    }
1314}
1315
1316/// Bounded-heap top-k selection. Returns at most `k` rowids in the
1317/// caller's desired order (ascending key for `order.ascending`,
1318/// descending otherwise).
1319///
1320/// O(N log k) where N = `matching.len()`. Caller must check
1321/// `k < matching.len()` for this to be a win — for k ≥ N the
1322/// `sort_rowids` full-sort path is the same asymptotic cost without
1323/// the heap overhead.
1324fn select_topk(
1325    matching: &[i64],
1326    table: &Table,
1327    order: &OrderByClause,
1328    k: usize,
1329) -> Result<Vec<i64>> {
1330    use std::collections::BinaryHeap;
1331
1332    if k == 0 || matching.is_empty() {
1333        return Ok(Vec::new());
1334    }
1335
1336    let mut heap: BinaryHeap<HeapEntry> = BinaryHeap::with_capacity(k + 1);
1337
1338    for &rowid in matching {
1339        let key = eval_expr(&order.expr, table, rowid)?;
1340        let entry = HeapEntry {
1341            key,
1342            rowid,
1343            asc: order.ascending,
1344        };
1345
1346        if heap.len() < k {
1347            heap.push(entry);
1348        } else {
1349            // peek() returns the largest under our direction-aware Ord
1350            // — the worst entry currently kept. Displace it iff the
1351            // new entry is "better" (i.e. compares Less).
1352            if entry < *heap.peek().unwrap() {
1353                heap.pop();
1354                heap.push(entry);
1355            }
1356        }
1357    }
1358
1359    // `into_sorted_vec` returns ascending under our direction-aware Ord:
1360    //   ASC: ascending by raw key (what we want)
1361    //   DESC: ascending under reversed Ord = descending by raw key (what
1362    //         we want for an ORDER BY DESC LIMIT k result)
1363    Ok(heap
1364        .into_sorted_vec()
1365        .into_iter()
1366        .map(|e| e.rowid)
1367        .collect())
1368}
1369
1370fn sort_rowids(rowids: &mut [i64], table: &Table, order: &OrderByClause) -> Result<()> {
1371    // Phase 7b: ORDER BY now accepts any expression (column ref,
1372    // arithmetic, function call, …). Pre-compute the sort key for
1373    // every rowid up front so the comparator is called O(N log N)
1374    // times against pre-evaluated Values rather than re-evaluating
1375    // the expression O(N log N) times. Not strictly necessary today,
1376    // but vital once 7d's HNSW index lands and this same code path
1377    // could be running tens of millions of distance computations.
1378    let mut keys: Vec<(i64, Result<Value>)> = rowids
1379        .iter()
1380        .map(|r| (*r, eval_expr(&order.expr, table, *r)))
1381        .collect();
1382
1383    // Surface the FIRST evaluation error if any. We could be lazy
1384    // and let sort_by encounter it, but `Ord::cmp` can't return a
1385    // Result and we'd have to swallow errors silently.
1386    for (_, k) in &keys {
1387        if let Err(e) = k {
1388            return Err(SQLRiteError::General(format!(
1389                "ORDER BY expression failed: {e}"
1390            )));
1391        }
1392    }
1393
1394    keys.sort_by(|(_, ka), (_, kb)| {
1395        // Both unwrap()s are safe — we just verified above that
1396        // every key Result is Ok.
1397        let va = ka.as_ref().unwrap();
1398        let vb = kb.as_ref().unwrap();
1399        let ord = compare_values(Some(va), Some(vb));
1400        if order.ascending { ord } else { ord.reverse() }
1401    });
1402
1403    // Write the sorted rowids back into the caller's slice.
1404    for (i, (rowid, _)) in keys.into_iter().enumerate() {
1405        rowids[i] = rowid;
1406    }
1407    Ok(())
1408}
1409
1410fn compare_values(a: Option<&Value>, b: Option<&Value>) -> Ordering {
1411    match (a, b) {
1412        (None, None) => Ordering::Equal,
1413        (None, _) => Ordering::Less,
1414        (_, None) => Ordering::Greater,
1415        (Some(a), Some(b)) => match (a, b) {
1416            (Value::Null, Value::Null) => Ordering::Equal,
1417            (Value::Null, _) => Ordering::Less,
1418            (_, Value::Null) => Ordering::Greater,
1419            (Value::Integer(x), Value::Integer(y)) => x.cmp(y),
1420            (Value::Real(x), Value::Real(y)) => x.partial_cmp(y).unwrap_or(Ordering::Equal),
1421            (Value::Integer(x), Value::Real(y)) => {
1422                (*x as f64).partial_cmp(y).unwrap_or(Ordering::Equal)
1423            }
1424            (Value::Real(x), Value::Integer(y)) => {
1425                x.partial_cmp(&(*y as f64)).unwrap_or(Ordering::Equal)
1426            }
1427            (Value::Text(x), Value::Text(y)) => x.cmp(y),
1428            (Value::Bool(x), Value::Bool(y)) => x.cmp(y),
1429            // Cross-type fallback: stringify and compare; keeps ORDER BY total.
1430            (x, y) => x.to_display_string().cmp(&y.to_display_string()),
1431        },
1432    }
1433}
1434
1435/// Returns `true` if the row at `rowid` matches the predicate expression.
1436pub fn eval_predicate(expr: &Expr, table: &Table, rowid: i64) -> Result<bool> {
1437    let v = eval_expr(expr, table, rowid)?;
1438    match v {
1439        Value::Bool(b) => Ok(b),
1440        Value::Null => Ok(false), // SQL NULL in a WHERE is treated as false
1441        Value::Integer(i) => Ok(i != 0),
1442        other => Err(SQLRiteError::Internal(format!(
1443            "WHERE clause must evaluate to boolean, got {}",
1444            other.to_display_string()
1445        ))),
1446    }
1447}
1448
1449fn eval_expr(expr: &Expr, table: &Table, rowid: i64) -> Result<Value> {
1450    match expr {
1451        Expr::Nested(inner) => eval_expr(inner, table, rowid),
1452
1453        Expr::Identifier(ident) => {
1454            // Phase 7b — sqlparser parses bracket-array literals like
1455            // `[0.1, 0.2, 0.3]` as bracket-quoted identifiers (it inherits
1456            // MSSQL `[name]` syntax). When we see `quote_style == Some('[')`
1457            // in expression-evaluation position (SELECT projection, WHERE,
1458            // ORDER BY, function args), parse the bracketed content as a
1459            // vector literal so the rest of the executor can compare /
1460            // distance-compute against it. Same trick the INSERT parser
1461            // uses; the executor needed its own copy because expression
1462            // eval runs on a different code path.
1463            if ident.quote_style == Some('[') {
1464                let raw = format!("[{}]", ident.value);
1465                let v = parse_vector_literal(&raw)?;
1466                return Ok(Value::Vector(v));
1467            }
1468            Ok(table.get_value(&ident.value, rowid).unwrap_or(Value::Null))
1469        }
1470
1471        Expr::CompoundIdentifier(parts) => {
1472            // Accept `table.col` — we only have one table in scope, so ignore the qualifier.
1473            let col = parts
1474                .last()
1475                .map(|i| i.value.as_str())
1476                .ok_or_else(|| SQLRiteError::Internal("empty compound identifier".to_string()))?;
1477            Ok(table.get_value(col, rowid).unwrap_or(Value::Null))
1478        }
1479
1480        Expr::Value(v) => convert_literal(&v.value),
1481
1482        Expr::UnaryOp { op, expr } => {
1483            let inner = eval_expr(expr, table, rowid)?;
1484            match op {
1485                UnaryOperator::Not => match inner {
1486                    Value::Bool(b) => Ok(Value::Bool(!b)),
1487                    Value::Null => Ok(Value::Null),
1488                    other => Err(SQLRiteError::Internal(format!(
1489                        "NOT applied to non-boolean value: {}",
1490                        other.to_display_string()
1491                    ))),
1492                },
1493                UnaryOperator::Minus => match inner {
1494                    Value::Integer(i) => Ok(Value::Integer(-i)),
1495                    Value::Real(f) => Ok(Value::Real(-f)),
1496                    Value::Null => Ok(Value::Null),
1497                    other => Err(SQLRiteError::Internal(format!(
1498                        "unary minus on non-numeric value: {}",
1499                        other.to_display_string()
1500                    ))),
1501                },
1502                UnaryOperator::Plus => Ok(inner),
1503                other => Err(SQLRiteError::NotImplemented(format!(
1504                    "unary operator {other:?} is not supported"
1505                ))),
1506            }
1507        }
1508
1509        Expr::BinaryOp { left, op, right } => match op {
1510            BinaryOperator::And => {
1511                let l = eval_expr(left, table, rowid)?;
1512                let r = eval_expr(right, table, rowid)?;
1513                Ok(Value::Bool(as_bool(&l)? && as_bool(&r)?))
1514            }
1515            BinaryOperator::Or => {
1516                let l = eval_expr(left, table, rowid)?;
1517                let r = eval_expr(right, table, rowid)?;
1518                Ok(Value::Bool(as_bool(&l)? || as_bool(&r)?))
1519            }
1520            cmp @ (BinaryOperator::Eq
1521            | BinaryOperator::NotEq
1522            | BinaryOperator::Lt
1523            | BinaryOperator::LtEq
1524            | BinaryOperator::Gt
1525            | BinaryOperator::GtEq) => {
1526                let l = eval_expr(left, table, rowid)?;
1527                let r = eval_expr(right, table, rowid)?;
1528                // Any comparison involving NULL is unknown → false in a WHERE.
1529                if matches!(l, Value::Null) || matches!(r, Value::Null) {
1530                    return Ok(Value::Bool(false));
1531                }
1532                let ord = compare_values(Some(&l), Some(&r));
1533                let result = match cmp {
1534                    BinaryOperator::Eq => ord == Ordering::Equal,
1535                    BinaryOperator::NotEq => ord != Ordering::Equal,
1536                    BinaryOperator::Lt => ord == Ordering::Less,
1537                    BinaryOperator::LtEq => ord != Ordering::Greater,
1538                    BinaryOperator::Gt => ord == Ordering::Greater,
1539                    BinaryOperator::GtEq => ord != Ordering::Less,
1540                    _ => unreachable!(),
1541                };
1542                Ok(Value::Bool(result))
1543            }
1544            arith @ (BinaryOperator::Plus
1545            | BinaryOperator::Minus
1546            | BinaryOperator::Multiply
1547            | BinaryOperator::Divide
1548            | BinaryOperator::Modulo) => {
1549                let l = eval_expr(left, table, rowid)?;
1550                let r = eval_expr(right, table, rowid)?;
1551                eval_arith(arith, &l, &r)
1552            }
1553            BinaryOperator::StringConcat => {
1554                let l = eval_expr(left, table, rowid)?;
1555                let r = eval_expr(right, table, rowid)?;
1556                if matches!(l, Value::Null) || matches!(r, Value::Null) {
1557                    return Ok(Value::Null);
1558                }
1559                Ok(Value::Text(format!(
1560                    "{}{}",
1561                    l.to_display_string(),
1562                    r.to_display_string()
1563                )))
1564            }
1565            other => Err(SQLRiteError::NotImplemented(format!(
1566                "binary operator {other:?} is not supported yet"
1567            ))),
1568        },
1569
1570        // Phase 7b — function-call dispatch. Currently only the three
1571        // vector-distance functions; this match arm becomes the single
1572        // place to register more SQL functions later (e.g. abs(),
1573        // length(), …) without re-touching the rest of the executor.
1574        //
1575        // Operator forms (`<->` `<=>` `<#>`) are NOT plumbed here: two
1576        // of three don't parse natively in sqlparser (we'd need a
1577        // string-preprocessing pass or a sqlparser fork). Deferred to
1578        // a follow-up sub-phase; see docs/phase-7-plan.md's "Scope
1579        // corrections" note.
1580        Expr::Function(func) => eval_function(func, table, rowid),
1581
1582        other => Err(SQLRiteError::NotImplemented(format!(
1583            "unsupported expression in WHERE/projection: {other:?}"
1584        ))),
1585    }
1586}
1587
1588/// Dispatches an `Expr::Function` to its built-in implementation.
1589/// Currently only the three vec_distance_* functions; other functions
1590/// surface as `NotImplemented` errors with the function name in the
1591/// message so users see what they tried.
1592fn eval_function(func: &sqlparser::ast::Function, table: &Table, rowid: i64) -> Result<Value> {
1593    // Function name lives in `name.0[0]` for unqualified calls. Anything
1594    // qualified (e.g. `pkg.fn(...)`) falls through to NotImplemented.
1595    let name = match func.name.0.as_slice() {
1596        [ObjectNamePart::Identifier(ident)] => ident.value.to_lowercase(),
1597        _ => {
1598            return Err(SQLRiteError::NotImplemented(format!(
1599                "qualified function names not supported: {:?}",
1600                func.name
1601            )));
1602        }
1603    };
1604
1605    match name.as_str() {
1606        "vec_distance_l2" | "vec_distance_cosine" | "vec_distance_dot" => {
1607            let (a, b) = extract_two_vector_args(&name, &func.args, table, rowid)?;
1608            let dist = match name.as_str() {
1609                "vec_distance_l2" => vec_distance_l2(&a, &b),
1610                "vec_distance_cosine" => vec_distance_cosine(&a, &b)?,
1611                "vec_distance_dot" => vec_distance_dot(&a, &b),
1612                _ => unreachable!(),
1613            };
1614            // Widen f32 → f64 for the runtime Value. Vectors are stored
1615            // as f32 (consistent with industry convention for embeddings),
1616            // but the executor's numeric type is f64 so distances slot
1617            // into Value::Real cleanly and can be compared / ordered with
1618            // other reals via the existing arithmetic + comparison paths.
1619            Ok(Value::Real(dist as f64))
1620        }
1621        // Phase 7e — JSON functions. All four parse the JSON text on
1622        // demand (we don't cache parsed values), then resolve a path
1623        // (default `$` = root). The path resolver handles `.key` for
1624        // object access and `[N]` for array index. SQLite-style.
1625        "json_extract" => json_fn_extract(&name, &func.args, table, rowid),
1626        "json_type" => json_fn_type(&name, &func.args, table, rowid),
1627        "json_array_length" => json_fn_array_length(&name, &func.args, table, rowid),
1628        "json_object_keys" => json_fn_object_keys(&name, &func.args, table, rowid),
1629        // Phase 8b — FTS scalars. Both consult an FTS index attached to
1630        // the named column; both error if no index exists (the index is
1631        // a hard prerequisite, mirroring SQLite FTS5's MATCH).
1632        "fts_match" => {
1633            let (entry, query) = resolve_fts_args(&name, &func.args, table, rowid)?;
1634            Ok(Value::Bool(entry.index.matches(rowid, &query)))
1635        }
1636        "bm25_score" => {
1637            let (entry, query) = resolve_fts_args(&name, &func.args, table, rowid)?;
1638            let s = entry.index.score(rowid, &query, &Bm25Params::default());
1639            Ok(Value::Real(s))
1640        }
1641        other => Err(SQLRiteError::NotImplemented(format!(
1642            "unknown function: {other}(...)"
1643        ))),
1644    }
1645}
1646
1647/// Helper for `fts_match` / `bm25_score`: pull the column reference out
1648/// of arg 0 (a bare identifier — we need the *name*, not the per-row
1649/// value), evaluate arg 1 as a Text query string, and look up the FTS
1650/// index attached to that column. Errors if any step fails.
1651fn resolve_fts_args<'t>(
1652    fn_name: &str,
1653    args: &FunctionArguments,
1654    table: &'t Table,
1655    rowid: i64,
1656) -> Result<(&'t FtsIndexEntry, String)> {
1657    let arg_list = match args {
1658        FunctionArguments::List(l) => &l.args,
1659        _ => {
1660            return Err(SQLRiteError::General(format!(
1661                "{fn_name}() expects exactly two arguments: (column, query_text)"
1662            )));
1663        }
1664    };
1665    if arg_list.len() != 2 {
1666        return Err(SQLRiteError::General(format!(
1667            "{fn_name}() expects exactly 2 arguments, got {}",
1668            arg_list.len()
1669        )));
1670    }
1671
1672    // Arg 0: bare column identifier. Must resolve syntactically to a
1673    // column name (we can't accept arbitrary expressions because we
1674    // need the column to look up the index, not the column's value).
1675    let col_expr = match &arg_list[0] {
1676        FunctionArg::Unnamed(FunctionArgExpr::Expr(e)) => e,
1677        other => {
1678            return Err(SQLRiteError::NotImplemented(format!(
1679                "{fn_name}() argument 0 must be a column name, got {other:?}"
1680            )));
1681        }
1682    };
1683    let col_name = match col_expr {
1684        Expr::Identifier(ident) => ident.value.clone(),
1685        Expr::CompoundIdentifier(parts) => parts
1686            .last()
1687            .map(|p| p.value.clone())
1688            .ok_or_else(|| SQLRiteError::Internal("empty compound identifier".to_string()))?,
1689        other => {
1690            return Err(SQLRiteError::General(format!(
1691                "{fn_name}() argument 0 must be a column reference, got {other:?}"
1692            )));
1693        }
1694    };
1695
1696    // Arg 1: query string. Evaluated through the normal expression
1697    // pipeline so callers can pass a literal `'rust db'` or an
1698    // expression that yields TEXT.
1699    let q_expr = match &arg_list[1] {
1700        FunctionArg::Unnamed(FunctionArgExpr::Expr(e)) => e,
1701        other => {
1702            return Err(SQLRiteError::NotImplemented(format!(
1703                "{fn_name}() argument 1 must be a text expression, got {other:?}"
1704            )));
1705        }
1706    };
1707    let query = match eval_expr(q_expr, table, rowid)? {
1708        Value::Text(s) => s,
1709        other => {
1710            return Err(SQLRiteError::General(format!(
1711                "{fn_name}() argument 1 must be TEXT, got {}",
1712                other.to_display_string()
1713            )));
1714        }
1715    };
1716
1717    let entry = table
1718        .fts_indexes
1719        .iter()
1720        .find(|e| e.column_name == col_name)
1721        .ok_or_else(|| {
1722            SQLRiteError::General(format!(
1723                "{fn_name}({col_name}, ...): no FTS index on column '{col_name}' \
1724                 (run CREATE INDEX <name> ON <table> USING fts({col_name}) first)"
1725            ))
1726        })?;
1727    Ok((entry, query))
1728}
1729
1730// -----------------------------------------------------------------
1731// Phase 7e — JSON path-extraction functions
1732// -----------------------------------------------------------------
1733
1734/// Extracts the JSON-typed text + optional path string out of a
1735/// function call's args. Used by all four json_* functions.
1736///
1737/// Arity rules (matching SQLite JSON1):
1738///   - 1 arg  → JSON value, path defaults to `$` (root)
1739///   - 2 args → (JSON value, path text)
1740///
1741/// Returns `(json_text, path)` so caller can serde_json::from_str
1742/// + walk_json_path on it.
1743fn extract_json_and_path(
1744    fn_name: &str,
1745    args: &FunctionArguments,
1746    table: &Table,
1747    rowid: i64,
1748) -> Result<(String, String)> {
1749    let arg_list = match args {
1750        FunctionArguments::List(l) => &l.args,
1751        _ => {
1752            return Err(SQLRiteError::General(format!(
1753                "{fn_name}() expects 1 or 2 arguments"
1754            )));
1755        }
1756    };
1757    if !(arg_list.len() == 1 || arg_list.len() == 2) {
1758        return Err(SQLRiteError::General(format!(
1759            "{fn_name}() expects 1 or 2 arguments, got {}",
1760            arg_list.len()
1761        )));
1762    }
1763    // Evaluate first arg → must produce text.
1764    let first_expr = match &arg_list[0] {
1765        FunctionArg::Unnamed(FunctionArgExpr::Expr(e)) => e,
1766        other => {
1767            return Err(SQLRiteError::NotImplemented(format!(
1768                "{fn_name}() argument 0 has unsupported shape: {other:?}"
1769            )));
1770        }
1771    };
1772    let json_text = match eval_expr(first_expr, table, rowid)? {
1773        Value::Text(s) => s,
1774        Value::Null => {
1775            return Err(SQLRiteError::General(format!(
1776                "{fn_name}() called on NULL — JSON column has no value for this row"
1777            )));
1778        }
1779        other => {
1780            return Err(SQLRiteError::General(format!(
1781                "{fn_name}() argument 0 is not JSON-typed: got {}",
1782                other.to_display_string()
1783            )));
1784        }
1785    };
1786
1787    // Path defaults to root `$` when omitted.
1788    let path = if arg_list.len() == 2 {
1789        let path_expr = match &arg_list[1] {
1790            FunctionArg::Unnamed(FunctionArgExpr::Expr(e)) => e,
1791            other => {
1792                return Err(SQLRiteError::NotImplemented(format!(
1793                    "{fn_name}() argument 1 has unsupported shape: {other:?}"
1794                )));
1795            }
1796        };
1797        match eval_expr(path_expr, table, rowid)? {
1798            Value::Text(s) => s,
1799            other => {
1800                return Err(SQLRiteError::General(format!(
1801                    "{fn_name}() path argument must be a string literal, got {}",
1802                    other.to_display_string()
1803                )));
1804            }
1805        }
1806    } else {
1807        "$".to_string()
1808    };
1809
1810    Ok((json_text, path))
1811}
1812
1813/// Walks a `serde_json::Value` along a JSONPath subset:
1814///   - `$` is the root
1815///   - `.key` for object access (key may not contain `.` or `[`)
1816///   - `[N]` for array index (N a non-negative integer)
1817///   - chains arbitrarily: `$.foo.bar[0].baz`
1818///
1819/// Returns `Ok(None)` for "path didn't match anything" (NULL in SQL),
1820/// `Err` for malformed paths. Matches SQLite JSON1's semantic
1821/// distinction: missing-key = NULL, malformed-path = error.
1822fn walk_json_path<'a>(
1823    value: &'a serde_json::Value,
1824    path: &str,
1825) -> Result<Option<&'a serde_json::Value>> {
1826    let mut chars = path.chars().peekable();
1827    if chars.next() != Some('$') {
1828        return Err(SQLRiteError::General(format!(
1829            "JSON path must start with '$', got `{path}`"
1830        )));
1831    }
1832    let mut current = value;
1833    while let Some(&c) = chars.peek() {
1834        match c {
1835            '.' => {
1836                chars.next();
1837                let mut key = String::new();
1838                while let Some(&c) = chars.peek() {
1839                    if c == '.' || c == '[' {
1840                        break;
1841                    }
1842                    key.push(c);
1843                    chars.next();
1844                }
1845                if key.is_empty() {
1846                    return Err(SQLRiteError::General(format!(
1847                        "JSON path has empty key after '.' in `{path}`"
1848                    )));
1849                }
1850                match current.get(&key) {
1851                    Some(v) => current = v,
1852                    None => return Ok(None),
1853                }
1854            }
1855            '[' => {
1856                chars.next();
1857                let mut idx_str = String::new();
1858                while let Some(&c) = chars.peek() {
1859                    if c == ']' {
1860                        break;
1861                    }
1862                    idx_str.push(c);
1863                    chars.next();
1864                }
1865                if chars.next() != Some(']') {
1866                    return Err(SQLRiteError::General(format!(
1867                        "JSON path has unclosed `[` in `{path}`"
1868                    )));
1869                }
1870                let idx: usize = idx_str.trim().parse().map_err(|_| {
1871                    SQLRiteError::General(format!(
1872                        "JSON path has non-integer index `[{idx_str}]` in `{path}`"
1873                    ))
1874                })?;
1875                match current.get(idx) {
1876                    Some(v) => current = v,
1877                    None => return Ok(None),
1878                }
1879            }
1880            other => {
1881                return Err(SQLRiteError::General(format!(
1882                    "JSON path has unexpected character `{other}` in `{path}` \
1883                     (expected `.`, `[`, or end-of-path)"
1884                )));
1885            }
1886        }
1887    }
1888    Ok(Some(current))
1889}
1890
1891/// Converts a serde_json scalar to a SQLRite Value. For composite
1892/// types (object, array) returns the JSON-encoded text — callers
1893/// pattern-match on shape from the calling json_* function.
1894fn json_value_to_sql(v: &serde_json::Value) -> Value {
1895    match v {
1896        serde_json::Value::Null => Value::Null,
1897        serde_json::Value::Bool(b) => Value::Bool(*b),
1898        serde_json::Value::Number(n) => {
1899            // Match SQLite: integer if it fits an i64, else f64.
1900            if let Some(i) = n.as_i64() {
1901                Value::Integer(i)
1902            } else if let Some(f) = n.as_f64() {
1903                Value::Real(f)
1904            } else {
1905                Value::Null
1906            }
1907        }
1908        serde_json::Value::String(s) => Value::Text(s.clone()),
1909        // Objects + arrays come out as JSON-encoded text. Same as
1910        // SQLite's json_extract: composite results round-trip through
1911        // text rather than being modeled as a richer Value type.
1912        composite => Value::Text(composite.to_string()),
1913    }
1914}
1915
1916fn json_fn_extract(
1917    name: &str,
1918    args: &FunctionArguments,
1919    table: &Table,
1920    rowid: i64,
1921) -> Result<Value> {
1922    let (json_text, path) = extract_json_and_path(name, args, table, rowid)?;
1923    let parsed: serde_json::Value = serde_json::from_str(&json_text).map_err(|e| {
1924        SQLRiteError::General(format!("{name}() got invalid JSON `{json_text}`: {e}"))
1925    })?;
1926    match walk_json_path(&parsed, &path)? {
1927        Some(v) => Ok(json_value_to_sql(v)),
1928        None => Ok(Value::Null),
1929    }
1930}
1931
1932fn json_fn_type(name: &str, args: &FunctionArguments, table: &Table, rowid: i64) -> Result<Value> {
1933    let (json_text, path) = extract_json_and_path(name, args, table, rowid)?;
1934    let parsed: serde_json::Value = serde_json::from_str(&json_text).map_err(|e| {
1935        SQLRiteError::General(format!("{name}() got invalid JSON `{json_text}`: {e}"))
1936    })?;
1937    let resolved = match walk_json_path(&parsed, &path)? {
1938        Some(v) => v,
1939        None => return Ok(Value::Null),
1940    };
1941    let ty = match resolved {
1942        serde_json::Value::Null => "null",
1943        serde_json::Value::Bool(true) => "true",
1944        serde_json::Value::Bool(false) => "false",
1945        serde_json::Value::Number(n) => {
1946            if n.is_i64() || n.is_u64() {
1947                "integer"
1948            } else {
1949                "real"
1950            }
1951        }
1952        serde_json::Value::String(_) => "text",
1953        serde_json::Value::Array(_) => "array",
1954        serde_json::Value::Object(_) => "object",
1955    };
1956    Ok(Value::Text(ty.to_string()))
1957}
1958
1959fn json_fn_array_length(
1960    name: &str,
1961    args: &FunctionArguments,
1962    table: &Table,
1963    rowid: i64,
1964) -> Result<Value> {
1965    let (json_text, path) = extract_json_and_path(name, args, table, rowid)?;
1966    let parsed: serde_json::Value = serde_json::from_str(&json_text).map_err(|e| {
1967        SQLRiteError::General(format!("{name}() got invalid JSON `{json_text}`: {e}"))
1968    })?;
1969    let resolved = match walk_json_path(&parsed, &path)? {
1970        Some(v) => v,
1971        None => return Ok(Value::Null),
1972    };
1973    match resolved.as_array() {
1974        Some(arr) => Ok(Value::Integer(arr.len() as i64)),
1975        None => Err(SQLRiteError::General(format!(
1976            "{name}() resolved to a non-array value at path `{path}`"
1977        ))),
1978    }
1979}
1980
1981fn json_fn_object_keys(
1982    name: &str,
1983    args: &FunctionArguments,
1984    table: &Table,
1985    rowid: i64,
1986) -> Result<Value> {
1987    let (json_text, path) = extract_json_and_path(name, args, table, rowid)?;
1988    let parsed: serde_json::Value = serde_json::from_str(&json_text).map_err(|e| {
1989        SQLRiteError::General(format!("{name}() got invalid JSON `{json_text}`: {e}"))
1990    })?;
1991    let resolved = match walk_json_path(&parsed, &path)? {
1992        Some(v) => v,
1993        None => return Ok(Value::Null),
1994    };
1995    let obj = resolved.as_object().ok_or_else(|| {
1996        SQLRiteError::General(format!(
1997            "{name}() resolved to a non-object value at path `{path}`"
1998        ))
1999    })?;
2000    // SQLite's json_object_keys is a table-valued function (one row
2001    // per key). Without set-returning function support we can't
2002    // reproduce that shape; instead return the keys as a JSON array
2003    // text. Caller can iterate via json_array_length + json_extract,
2004    // or just treat it as a serialized list. Document this divergence
2005    // in supported-sql.md.
2006    let keys: Vec<serde_json::Value> = obj
2007        .keys()
2008        .map(|k| serde_json::Value::String(k.clone()))
2009        .collect();
2010    Ok(Value::Text(serde_json::Value::Array(keys).to_string()))
2011}
2012
2013/// Extracts exactly two `Vec<f32>` arguments from a function call,
2014/// validating arity and that both sides are Vector-typed with matching
2015/// dimensions. Used by all three vec_distance_* functions.
2016fn extract_two_vector_args(
2017    fn_name: &str,
2018    args: &FunctionArguments,
2019    table: &Table,
2020    rowid: i64,
2021) -> Result<(Vec<f32>, Vec<f32>)> {
2022    let arg_list = match args {
2023        FunctionArguments::List(l) => &l.args,
2024        _ => {
2025            return Err(SQLRiteError::General(format!(
2026                "{fn_name}() expects exactly two vector arguments"
2027            )));
2028        }
2029    };
2030    if arg_list.len() != 2 {
2031        return Err(SQLRiteError::General(format!(
2032            "{fn_name}() expects exactly 2 arguments, got {}",
2033            arg_list.len()
2034        )));
2035    }
2036    let mut out: Vec<Vec<f32>> = Vec::with_capacity(2);
2037    for (i, arg) in arg_list.iter().enumerate() {
2038        let expr = match arg {
2039            FunctionArg::Unnamed(FunctionArgExpr::Expr(e)) => e,
2040            other => {
2041                return Err(SQLRiteError::NotImplemented(format!(
2042                    "{fn_name}() argument {i} has unsupported shape: {other:?}"
2043                )));
2044            }
2045        };
2046        let val = eval_expr(expr, table, rowid)?;
2047        match val {
2048            Value::Vector(v) => out.push(v),
2049            other => {
2050                return Err(SQLRiteError::General(format!(
2051                    "{fn_name}() argument {i} is not a vector: got {}",
2052                    other.to_display_string()
2053                )));
2054            }
2055        }
2056    }
2057    let b = out.pop().unwrap();
2058    let a = out.pop().unwrap();
2059    if a.len() != b.len() {
2060        return Err(SQLRiteError::General(format!(
2061            "{fn_name}(): vector dimensions don't match (lhs={}, rhs={})",
2062            a.len(),
2063            b.len()
2064        )));
2065    }
2066    Ok((a, b))
2067}
2068
2069/// Euclidean (L2) distance: √Σ(aᵢ − bᵢ)².
2070/// Smaller-is-closer; identical vectors return 0.0.
2071pub(crate) fn vec_distance_l2(a: &[f32], b: &[f32]) -> f32 {
2072    debug_assert_eq!(a.len(), b.len());
2073    let mut sum = 0.0f32;
2074    for i in 0..a.len() {
2075        let d = a[i] - b[i];
2076        sum += d * d;
2077    }
2078    sum.sqrt()
2079}
2080
2081/// Cosine distance: 1 − (a·b) / (‖a‖·‖b‖).
2082/// Smaller-is-closer; identical (non-zero) vectors return 0.0,
2083/// orthogonal vectors return 1.0, opposite-direction vectors return 2.0.
2084///
2085/// Errors if either vector has zero magnitude — cosine similarity is
2086/// undefined for the zero vector and silently returning NaN would
2087/// poison `ORDER BY` ranking. Callers who want the silent-NaN
2088/// behavior can compute `vec_distance_dot(a, b) / (norm(a) * norm(b))`
2089/// themselves.
2090pub(crate) fn vec_distance_cosine(a: &[f32], b: &[f32]) -> Result<f32> {
2091    debug_assert_eq!(a.len(), b.len());
2092    let mut dot = 0.0f32;
2093    let mut norm_a_sq = 0.0f32;
2094    let mut norm_b_sq = 0.0f32;
2095    for i in 0..a.len() {
2096        dot += a[i] * b[i];
2097        norm_a_sq += a[i] * a[i];
2098        norm_b_sq += b[i] * b[i];
2099    }
2100    let denom = (norm_a_sq * norm_b_sq).sqrt();
2101    if denom == 0.0 {
2102        return Err(SQLRiteError::General(
2103            "vec_distance_cosine() is undefined for zero-magnitude vectors".to_string(),
2104        ));
2105    }
2106    Ok(1.0 - dot / denom)
2107}
2108
2109/// Negated dot product: −(a·b).
2110/// pgvector convention — negated so smaller-is-closer like L2 / cosine.
2111/// For unit-norm vectors `vec_distance_dot(a, b) == vec_distance_cosine(a, b) - 1`.
2112pub(crate) fn vec_distance_dot(a: &[f32], b: &[f32]) -> f32 {
2113    debug_assert_eq!(a.len(), b.len());
2114    let mut dot = 0.0f32;
2115    for i in 0..a.len() {
2116        dot += a[i] * b[i];
2117    }
2118    -dot
2119}
2120
2121/// Evaluates an integer/real arithmetic op. NULL on either side propagates.
2122/// Mixed Integer/Real promotes to Real. Divide/Modulo by zero → error.
2123fn eval_arith(op: &BinaryOperator, l: &Value, r: &Value) -> Result<Value> {
2124    if matches!(l, Value::Null) || matches!(r, Value::Null) {
2125        return Ok(Value::Null);
2126    }
2127    match (l, r) {
2128        (Value::Integer(a), Value::Integer(b)) => match op {
2129            BinaryOperator::Plus => Ok(Value::Integer(a.wrapping_add(*b))),
2130            BinaryOperator::Minus => Ok(Value::Integer(a.wrapping_sub(*b))),
2131            BinaryOperator::Multiply => Ok(Value::Integer(a.wrapping_mul(*b))),
2132            BinaryOperator::Divide => {
2133                if *b == 0 {
2134                    Err(SQLRiteError::General("division by zero".to_string()))
2135                } else {
2136                    Ok(Value::Integer(a / b))
2137                }
2138            }
2139            BinaryOperator::Modulo => {
2140                if *b == 0 {
2141                    Err(SQLRiteError::General("modulo by zero".to_string()))
2142                } else {
2143                    Ok(Value::Integer(a % b))
2144                }
2145            }
2146            _ => unreachable!(),
2147        },
2148        // Anything involving a Real promotes both sides to f64.
2149        (a, b) => {
2150            let af = as_number(a)?;
2151            let bf = as_number(b)?;
2152            match op {
2153                BinaryOperator::Plus => Ok(Value::Real(af + bf)),
2154                BinaryOperator::Minus => Ok(Value::Real(af - bf)),
2155                BinaryOperator::Multiply => Ok(Value::Real(af * bf)),
2156                BinaryOperator::Divide => {
2157                    if bf == 0.0 {
2158                        Err(SQLRiteError::General("division by zero".to_string()))
2159                    } else {
2160                        Ok(Value::Real(af / bf))
2161                    }
2162                }
2163                BinaryOperator::Modulo => {
2164                    if bf == 0.0 {
2165                        Err(SQLRiteError::General("modulo by zero".to_string()))
2166                    } else {
2167                        Ok(Value::Real(af % bf))
2168                    }
2169                }
2170                _ => unreachable!(),
2171            }
2172        }
2173    }
2174}
2175
2176fn as_number(v: &Value) -> Result<f64> {
2177    match v {
2178        Value::Integer(i) => Ok(*i as f64),
2179        Value::Real(f) => Ok(*f),
2180        Value::Bool(b) => Ok(if *b { 1.0 } else { 0.0 }),
2181        other => Err(SQLRiteError::General(format!(
2182            "arithmetic on non-numeric value '{}'",
2183            other.to_display_string()
2184        ))),
2185    }
2186}
2187
2188fn as_bool(v: &Value) -> Result<bool> {
2189    match v {
2190        Value::Bool(b) => Ok(*b),
2191        Value::Null => Ok(false),
2192        Value::Integer(i) => Ok(*i != 0),
2193        other => Err(SQLRiteError::Internal(format!(
2194            "expected boolean, got {}",
2195            other.to_display_string()
2196        ))),
2197    }
2198}
2199
2200fn convert_literal(v: &sqlparser::ast::Value) -> Result<Value> {
2201    use sqlparser::ast::Value as AstValue;
2202    match v {
2203        AstValue::Number(n, _) => {
2204            if let Ok(i) = n.parse::<i64>() {
2205                Ok(Value::Integer(i))
2206            } else if let Ok(f) = n.parse::<f64>() {
2207                Ok(Value::Real(f))
2208            } else {
2209                Err(SQLRiteError::Internal(format!(
2210                    "could not parse numeric literal '{n}'"
2211                )))
2212            }
2213        }
2214        AstValue::SingleQuotedString(s) => Ok(Value::Text(s.clone())),
2215        AstValue::Boolean(b) => Ok(Value::Bool(*b)),
2216        AstValue::Null => Ok(Value::Null),
2217        other => Err(SQLRiteError::NotImplemented(format!(
2218            "unsupported literal value: {other:?}"
2219        ))),
2220    }
2221}
2222
2223#[cfg(test)]
2224mod tests {
2225    use super::*;
2226
2227    // -----------------------------------------------------------------
2228    // Phase 7b — Vector distance function math
2229    // -----------------------------------------------------------------
2230
2231    /// Float comparison helper — distance results need a small epsilon
2232    /// because we accumulate sums across many f32 multiplies.
2233    fn approx_eq(a: f32, b: f32, eps: f32) -> bool {
2234        (a - b).abs() < eps
2235    }
2236
2237    #[test]
2238    fn vec_distance_l2_identical_is_zero() {
2239        let v = vec![0.1, 0.2, 0.3];
2240        assert_eq!(vec_distance_l2(&v, &v), 0.0);
2241    }
2242
2243    #[test]
2244    fn vec_distance_l2_unit_basis_is_sqrt2() {
2245        // [1, 0] vs [0, 1]: distance = √((1-0)² + (0-1)²) = √2 ≈ 1.414
2246        let a = vec![1.0, 0.0];
2247        let b = vec![0.0, 1.0];
2248        assert!(approx_eq(vec_distance_l2(&a, &b), 2.0_f32.sqrt(), 1e-6));
2249    }
2250
2251    #[test]
2252    fn vec_distance_l2_known_value() {
2253        // [0, 0, 0] vs [3, 4, 0]: √(9 + 16 + 0) = 5 (the classic 3-4-5 triangle).
2254        let a = vec![0.0, 0.0, 0.0];
2255        let b = vec![3.0, 4.0, 0.0];
2256        assert!(approx_eq(vec_distance_l2(&a, &b), 5.0, 1e-6));
2257    }
2258
2259    #[test]
2260    fn vec_distance_cosine_identical_is_zero() {
2261        let v = vec![0.1, 0.2, 0.3];
2262        let d = vec_distance_cosine(&v, &v).unwrap();
2263        assert!(approx_eq(d, 0.0, 1e-6), "cos(v,v) = {d}, expected ≈ 0");
2264    }
2265
2266    #[test]
2267    fn vec_distance_cosine_orthogonal_is_one() {
2268        // Two orthogonal unit vectors should have cosine distance = 1.0
2269        // (cosine similarity = 0 → distance = 1 - 0 = 1).
2270        let a = vec![1.0, 0.0];
2271        let b = vec![0.0, 1.0];
2272        assert!(approx_eq(vec_distance_cosine(&a, &b).unwrap(), 1.0, 1e-6));
2273    }
2274
2275    #[test]
2276    fn vec_distance_cosine_opposite_is_two() {
2277        // a and -a have cosine similarity = -1 → distance = 1 - (-1) = 2.
2278        let a = vec![1.0, 0.0, 0.0];
2279        let b = vec![-1.0, 0.0, 0.0];
2280        assert!(approx_eq(vec_distance_cosine(&a, &b).unwrap(), 2.0, 1e-6));
2281    }
2282
2283    #[test]
2284    fn vec_distance_cosine_zero_magnitude_errors() {
2285        // Cosine is undefined for the zero vector — error rather than NaN.
2286        let a = vec![0.0, 0.0];
2287        let b = vec![1.0, 0.0];
2288        let err = vec_distance_cosine(&a, &b).unwrap_err();
2289        assert!(format!("{err}").contains("zero-magnitude"));
2290    }
2291
2292    #[test]
2293    fn vec_distance_dot_negates() {
2294        // a·b = 1*4 + 2*5 + 3*6 = 32. Negated → -32.
2295        let a = vec![1.0, 2.0, 3.0];
2296        let b = vec![4.0, 5.0, 6.0];
2297        assert!(approx_eq(vec_distance_dot(&a, &b), -32.0, 1e-6));
2298    }
2299
2300    #[test]
2301    fn vec_distance_dot_orthogonal_is_zero() {
2302        // Orthogonal vectors have dot product 0 → negated is also 0.
2303        let a = vec![1.0, 0.0];
2304        let b = vec![0.0, 1.0];
2305        assert_eq!(vec_distance_dot(&a, &b), 0.0);
2306    }
2307
2308    #[test]
2309    fn vec_distance_dot_unit_norm_matches_cosine_minus_one() {
2310        // For unit-norm vectors: dot(a,b) = cos(a,b)
2311        // → -dot(a,b) = -cos(a,b) = (1 - cos(a,b)) - 1 = vec_distance_cosine(a,b) - 1.
2312        // Useful sanity check that the two functions agree on unit vectors.
2313        let a = vec![0.6f32, 0.8]; // unit norm: √(0.36+0.64) = 1
2314        let b = vec![0.8f32, 0.6]; // unit norm too
2315        let dot = vec_distance_dot(&a, &b);
2316        let cos = vec_distance_cosine(&a, &b).unwrap();
2317        assert!(approx_eq(dot, cos - 1.0, 1e-5));
2318    }
2319
2320    // -----------------------------------------------------------------
2321    // Phase 7c — bounded-heap top-k correctness + benchmark
2322    // -----------------------------------------------------------------
2323
2324    use crate::sql::db::database::Database;
2325    use crate::sql::parser::select::SelectQuery;
2326    use sqlparser::dialect::SQLiteDialect;
2327    use sqlparser::parser::Parser;
2328
2329    /// Builds a `docs(id INTEGER PK, score REAL)` table with N rows of
2330    /// distinct positive scores so top-k tests aren't sensitive to
2331    /// tie-breaking (heap is unstable; full-sort is stable; we want
2332    /// both to agree without arguing about equal-score row order).
2333    ///
2334    /// **Why positive scores:** the INSERT parser doesn't currently
2335    /// handle `Expr::UnaryOp(Minus, …)` for negative number literals
2336    /// (it would parse `-3.14` as a unary expression and the value
2337    /// extractor would skip it). That's a pre-existing bug, out of
2338    /// scope for 7c. Using the Knuth multiplicative hash gives us
2339    /// distinct positive scrambled values without dancing around the
2340    /// negative-literal limitation.
2341    fn seed_score_table(n: usize) -> Database {
2342        let mut db = Database::new("tempdb".to_string());
2343        crate::sql::process_command(
2344            "CREATE TABLE docs (id INTEGER PRIMARY KEY, score REAL);",
2345            &mut db,
2346        )
2347        .expect("create");
2348        for i in 0..n {
2349            // Knuth multiplicative hash mod 1_000_000 — distinct,
2350            // dense in [0, 999_999], no collisions for n up to ~tens
2351            // of thousands.
2352            let score = ((i as u64).wrapping_mul(2_654_435_761) % 1_000_000) as f64;
2353            let sql = format!("INSERT INTO docs (score) VALUES ({score});");
2354            crate::sql::process_command(&sql, &mut db).expect("insert");
2355        }
2356        db
2357    }
2358
2359    /// Helper: parses an SQL SELECT into a SelectQuery so we can drive
2360    /// `select_topk` / `sort_rowids` directly without the rest of the
2361    /// process_command pipeline.
2362    fn parse_select(sql: &str) -> SelectQuery {
2363        let dialect = SQLiteDialect {};
2364        let mut ast = Parser::parse_sql(&dialect, sql).expect("parse");
2365        let stmt = ast.pop().expect("one statement");
2366        SelectQuery::new(&stmt).expect("select-query")
2367    }
2368
2369    #[test]
2370    fn topk_matches_full_sort_asc() {
2371        // Build N=200, top-k=10. Bounded heap output must equal
2372        // full-sort-then-truncate output (both produce ASC order).
2373        let db = seed_score_table(200);
2374        let table = db.get_table("docs".to_string()).unwrap();
2375        let q = parse_select("SELECT * FROM docs ORDER BY score ASC LIMIT 10;");
2376        let order = q.order_by.as_ref().unwrap();
2377        let all_rowids = table.rowids();
2378
2379        // Full-sort path
2380        let mut full = all_rowids.clone();
2381        sort_rowids(&mut full, table, order).unwrap();
2382        full.truncate(10);
2383
2384        // Bounded-heap path
2385        let topk = select_topk(&all_rowids, table, order, 10).unwrap();
2386
2387        assert_eq!(topk, full, "top-k via heap should match full-sort+truncate");
2388    }
2389
2390    #[test]
2391    fn topk_matches_full_sort_desc() {
2392        // Same with DESC — verifies the direction-aware Ord wrapper.
2393        let db = seed_score_table(200);
2394        let table = db.get_table("docs".to_string()).unwrap();
2395        let q = parse_select("SELECT * FROM docs ORDER BY score DESC LIMIT 10;");
2396        let order = q.order_by.as_ref().unwrap();
2397        let all_rowids = table.rowids();
2398
2399        let mut full = all_rowids.clone();
2400        sort_rowids(&mut full, table, order).unwrap();
2401        full.truncate(10);
2402
2403        let topk = select_topk(&all_rowids, table, order, 10).unwrap();
2404
2405        assert_eq!(
2406            topk, full,
2407            "top-k DESC via heap should match full-sort+truncate"
2408        );
2409    }
2410
2411    #[test]
2412    fn topk_k_larger_than_n_returns_everything_sorted() {
2413        // The executor branches off to the full-sort path when k >= N,
2414        // but if a caller invokes select_topk directly with k > N, it
2415        // should still produce all-sorted output (no truncation
2416        // because we don't have N items to truncate to k).
2417        let db = seed_score_table(50);
2418        let table = db.get_table("docs".to_string()).unwrap();
2419        let q = parse_select("SELECT * FROM docs ORDER BY score ASC LIMIT 1000;");
2420        let order = q.order_by.as_ref().unwrap();
2421        let topk = select_topk(&table.rowids(), table, order, 1000).unwrap();
2422        assert_eq!(topk.len(), 50);
2423        // All scores in ascending order.
2424        let scores: Vec<f64> = topk
2425            .iter()
2426            .filter_map(|r| match table.get_value("score", *r) {
2427                Some(Value::Real(f)) => Some(f),
2428                _ => None,
2429            })
2430            .collect();
2431        assert!(scores.windows(2).all(|w| w[0] <= w[1]));
2432    }
2433
2434    #[test]
2435    fn topk_k_zero_returns_empty() {
2436        let db = seed_score_table(10);
2437        let table = db.get_table("docs".to_string()).unwrap();
2438        let q = parse_select("SELECT * FROM docs ORDER BY score ASC LIMIT 1;");
2439        let order = q.order_by.as_ref().unwrap();
2440        let topk = select_topk(&table.rowids(), table, order, 0).unwrap();
2441        assert!(topk.is_empty());
2442    }
2443
2444    #[test]
2445    fn topk_empty_input_returns_empty() {
2446        let db = seed_score_table(0);
2447        let table = db.get_table("docs".to_string()).unwrap();
2448        let q = parse_select("SELECT * FROM docs ORDER BY score ASC LIMIT 5;");
2449        let order = q.order_by.as_ref().unwrap();
2450        let topk = select_topk(&[], table, order, 5).unwrap();
2451        assert!(topk.is_empty());
2452    }
2453
2454    #[test]
2455    fn topk_works_through_select_executor_with_distance_function() {
2456        // Integration check that the executor actually picks the
2457        // bounded-heap path on a KNN-shaped query and produces the
2458        // correct top-k.
2459        let mut db = Database::new("tempdb".to_string());
2460        crate::sql::process_command(
2461            "CREATE TABLE docs (id INTEGER PRIMARY KEY, e VECTOR(2));",
2462            &mut db,
2463        )
2464        .unwrap();
2465        // Five rows with distinct distances from probe [1.0, 0.0]:
2466        //   id=1 [1.0, 0.0]   distance=0
2467        //   id=2 [2.0, 0.0]   distance=1
2468        //   id=3 [0.0, 3.0]   distance=√(1+9) = √10 ≈ 3.16
2469        //   id=4 [1.0, 4.0]   distance=4
2470        //   id=5 [10.0, 10.0] distance=√(81+100) ≈ 13.45
2471        for v in &[
2472            "[1.0, 0.0]",
2473            "[2.0, 0.0]",
2474            "[0.0, 3.0]",
2475            "[1.0, 4.0]",
2476            "[10.0, 10.0]",
2477        ] {
2478            crate::sql::process_command(&format!("INSERT INTO docs (e) VALUES ({v});"), &mut db)
2479                .unwrap();
2480        }
2481        let resp = crate::sql::process_command(
2482            "SELECT id FROM docs ORDER BY vec_distance_l2(e, [1.0, 0.0]) ASC LIMIT 3;",
2483            &mut db,
2484        )
2485        .unwrap();
2486        // Top-3 closest to [1.0, 0.0] are id=1, id=2, id=3 (in that order).
2487        // The status message tells us how many rows came back.
2488        assert!(resp.contains("3 rows returned"), "got: {resp}");
2489    }
2490
2491    /// Manual benchmark — not run by default. Recommended invocation:
2492    ///
2493    ///     cargo test -p sqlrite-engine --lib topk_benchmark --release \
2494    ///         -- --ignored --nocapture
2495    ///
2496    /// (`--release` matters: Rust's optimized sort gets very fast under
2497    /// optimization, so the heap's relative advantage is best observed
2498    /// against a sort that's also been optimized.)
2499    ///
2500    /// Measured numbers on an Apple Silicon laptop with N=10_000 + k=10:
2501    ///   - bounded heap:    ~820µs
2502    ///   - full sort+trunc: ~1.5ms
2503    ///   - ratio:           ~1.8×
2504    ///
2505    /// The advantage is real but moderate at this size because the sort
2506    /// key here is a single REAL column read (cheap) and Rust's sort_by
2507    /// has a very low constant factor. The asymptotic O(N log k) vs
2508    /// O(N log N) advantage scales with N and with per-row work — KNN
2509    /// queries where the sort key is `vec_distance_l2(col, [...])` are
2510    /// where this path really pays off, because each key evaluation is
2511    /// itself O(dim) and the heap path skips the per-row evaluation
2512    /// in the comparator (see `sort_rowids` for the contrast).
2513    #[test]
2514    #[ignore]
2515    fn topk_benchmark() {
2516        use std::time::Instant;
2517        const N: usize = 10_000;
2518        const K: usize = 10;
2519
2520        let db = seed_score_table(N);
2521        let table = db.get_table("docs".to_string()).unwrap();
2522        let q = parse_select("SELECT * FROM docs ORDER BY score ASC LIMIT 10;");
2523        let order = q.order_by.as_ref().unwrap();
2524        let all_rowids = table.rowids();
2525
2526        // Time bounded heap.
2527        let t0 = Instant::now();
2528        let _topk = select_topk(&all_rowids, table, order, K).unwrap();
2529        let heap_dur = t0.elapsed();
2530
2531        // Time full sort + truncate.
2532        let t1 = Instant::now();
2533        let mut full = all_rowids.clone();
2534        sort_rowids(&mut full, table, order).unwrap();
2535        full.truncate(K);
2536        let sort_dur = t1.elapsed();
2537
2538        let ratio = sort_dur.as_secs_f64() / heap_dur.as_secs_f64().max(1e-9);
2539        println!("\n--- topk_benchmark (N={N}, k={K}) ---");
2540        println!("  bounded heap:   {heap_dur:?}");
2541        println!("  full sort+trunc: {sort_dur:?}");
2542        println!("  speedup ratio:  {ratio:.2}×");
2543
2544        // Soft assertion. Floor is 1.4× because the cheap-key
2545        // benchmark hovers around 1.8× empirically; setting this too
2546        // close to the measured value risks flaky CI on slower
2547        // runners. Floor of 1.4× still catches an actual regression
2548        // (e.g., if select_topk became O(N²) or stopped using the
2549        // heap entirely).
2550        assert!(
2551            ratio > 1.4,
2552            "bounded heap should be substantially faster than full sort, but ratio = {ratio:.2}"
2553        );
2554    }
2555}