Skip to main content

sqlrite/sql/
executor.rs

1//! Query executors — evaluate parsed SQL statements against the in-memory
2//! storage and produce formatted output.
3
4use std::cmp::Ordering;
5
6use prettytable::{Cell as PrintCell, Row as PrintRow, Table as PrintTable};
7use sqlparser::ast::{
8    AlterTable, AlterTableOperation, AssignmentTarget, BinaryOperator, CreateIndex, Delete, Expr,
9    FromTable, FunctionArg, FunctionArgExpr, FunctionArguments, IndexType, ObjectName,
10    ObjectNamePart, RenameTableNameKind, Statement, TableFactor, TableWithJoins, UnaryOperator,
11    Update, Value as AstValue,
12};
13
14use crate::error::{Result, SQLRiteError};
15use crate::sql::db::database::Database;
16use crate::sql::db::secondary_index::{IndexOrigin, SecondaryIndex};
17use crate::sql::db::table::{
18    DataType, FtsIndexEntry, HnswIndexEntry, Table, Value, parse_vector_literal,
19};
20use crate::sql::fts::{Bm25Params, PostingList};
21use crate::sql::hnsw::{DistanceMetric, HnswIndex};
22use crate::sql::parser::select::{OrderByClause, Projection, SelectQuery};
23
24/// Executes a parsed `SelectQuery` against the database and returns a
25/// human-readable rendering of the result set (prettytable). Also returns
26/// the number of rows produced, for the top-level status message.
27/// Structured result of a SELECT: column names in projection order,
28/// and each matching row as a `Vec<Value>` aligned with the columns.
29/// Phase 5a introduced this so the public `Connection` / `Statement`
30/// API has typed rows to yield; the existing `execute_select` that
31/// returns pre-rendered text is now a thin wrapper on top.
32pub struct SelectResult {
33    pub columns: Vec<String>,
34    pub rows: Vec<Vec<Value>>,
35}
36
37/// Executes a SELECT and returns structured rows. The typed rows are
38/// what the new public API streams to callers; the REPL / Tauri app
39/// pre-render into a prettytable via `execute_select`.
40pub fn execute_select_rows(query: SelectQuery, db: &Database) -> Result<SelectResult> {
41    let table = db
42        .get_table(query.table_name.clone())
43        .map_err(|_| SQLRiteError::Internal(format!("Table '{}' not found", query.table_name)))?;
44
45    // Resolve projection to a concrete ordered column list.
46    let projected_cols: Vec<String> = match &query.projection {
47        Projection::All => table.column_names(),
48        Projection::Columns(cols) => {
49            for c in cols {
50                if !table.contains_column(c.to_string()) {
51                    return Err(SQLRiteError::Internal(format!(
52                        "Column '{c}' does not exist on table '{}'",
53                        query.table_name
54                    )));
55                }
56            }
57            cols.clone()
58        }
59    };
60
61    // Collect matching rowids. If the WHERE is the shape `col = literal`
62    // and `col` has a secondary index, probe the index for an O(log N)
63    // seek; otherwise fall back to the full table scan.
64    let matching = match select_rowids(table, query.selection.as_ref())? {
65        RowidSource::IndexProbe(rowids) => rowids,
66        RowidSource::FullScan => {
67            let mut out = Vec::new();
68            for rowid in table.rowids() {
69                if let Some(expr) = &query.selection {
70                    if !eval_predicate(expr, table, rowid)? {
71                        continue;
72                    }
73                }
74                out.push(rowid);
75            }
76            out
77        }
78    };
79    let mut matching = matching;
80
81    // Phase 7c — bounded-heap top-k optimization.
82    //
83    // The naive "ORDER BY <expr>" path (Phase 7b) sorts every matching
84    // rowid: O(N log N) sort_by + a truncate. For KNN queries
85    //
86    //     SELECT id FROM docs
87    //     ORDER BY vec_distance_l2(embedding, [...])
88    //     LIMIT 10;
89    //
90    // N is the table row count and k is the LIMIT. With a bounded
91    // max-heap of size k we can find the top-k in O(N log k) — same
92    // sort_by-per-row cost on the heap operations, but k is typically
93    // 10-100 while N can be millions.
94    //
95    // Phase 7d.2 — HNSW ANN probe.
96    //
97    // Even better than the bounded heap: if the ORDER BY expression is
98    // exactly `vec_distance_l2(<col>, <bracket-array literal>)` AND
99    // `<col>` has an HNSW index attached, skip the linear scan
100    // entirely and probe the graph in O(log N). Approximate but
101    // typically ≥ 0.95 recall (verified by the recall tests in
102    // src/sql/hnsw.rs).
103    //
104    // We branch in cases:
105    //   1. ORDER BY + LIMIT k matches the HNSW probe pattern  → graph probe.
106    //   2. ORDER BY + LIMIT k matches the FTS probe pattern   → posting probe.
107    //   3. ORDER BY + LIMIT k where k < |matching|            → bounded heap (7c).
108    //   4. ORDER BY without LIMIT, or LIMIT >= |matching|     → full sort.
109    //   5. LIMIT without ORDER BY                              → just truncate.
110    match (&query.order_by, query.limit) {
111        (Some(order), Some(k)) if try_hnsw_probe(table, &order.expr, k).is_some() => {
112            matching = try_hnsw_probe(table, &order.expr, k).unwrap();
113        }
114        (Some(order), Some(k))
115            if try_fts_probe(table, &order.expr, order.ascending, k).is_some() =>
116        {
117            matching = try_fts_probe(table, &order.expr, order.ascending, k).unwrap();
118        }
119        (Some(order), Some(k)) if k < matching.len() => {
120            matching = select_topk(&matching, table, order, k)?;
121        }
122        (Some(order), _) => {
123            sort_rowids(&mut matching, table, order)?;
124            if let Some(k) = query.limit {
125                matching.truncate(k);
126            }
127        }
128        (None, Some(k)) => {
129            matching.truncate(k);
130        }
131        (None, None) => {}
132    }
133
134    // Build typed rows. Missing cells surface as `Value::Null` — that
135    // maps a column-not-present-for-this-rowid case onto the public
136    // `Row::get` → `Option<T>` surface cleanly.
137    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(matching.len());
138    for rowid in &matching {
139        let row: Vec<Value> = projected_cols
140            .iter()
141            .map(|col| table.get_value(col, *rowid).unwrap_or(Value::Null))
142            .collect();
143        rows.push(row);
144    }
145
146    Ok(SelectResult {
147        columns: projected_cols,
148        rows,
149    })
150}
151
152/// Executes a SELECT and returns `(rendered_table, row_count)`. The
153/// REPL and Tauri app use this to keep the table-printing behaviour
154/// the engine has always shipped. Structured callers use
155/// `execute_select_rows` instead.
156pub fn execute_select(query: SelectQuery, db: &Database) -> Result<(String, usize)> {
157    let result = execute_select_rows(query, db)?;
158    let row_count = result.rows.len();
159
160    let mut print_table = PrintTable::new();
161    let header_cells: Vec<PrintCell> = result.columns.iter().map(|c| PrintCell::new(c)).collect();
162    print_table.add_row(PrintRow::new(header_cells));
163
164    for row in &result.rows {
165        let cells: Vec<PrintCell> = row
166            .iter()
167            .map(|v| PrintCell::new(&v.to_display_string()))
168            .collect();
169        print_table.add_row(PrintRow::new(cells));
170    }
171
172    Ok((print_table.to_string(), row_count))
173}
174
175/// Executes a DELETE statement. Returns the number of rows removed.
176pub fn execute_delete(stmt: &Statement, db: &mut Database) -> Result<usize> {
177    let Statement::Delete(Delete {
178        from, selection, ..
179    }) = stmt
180    else {
181        return Err(SQLRiteError::Internal(
182            "execute_delete called on a non-DELETE statement".to_string(),
183        ));
184    };
185
186    let tables = match from {
187        FromTable::WithFromKeyword(t) | FromTable::WithoutKeyword(t) => t,
188    };
189    let table_name = extract_single_table_name(tables)?;
190
191    // Compute matching rowids with an immutable borrow, then mutate.
192    let matching: Vec<i64> = {
193        let table = db
194            .get_table(table_name.clone())
195            .map_err(|_| SQLRiteError::Internal(format!("Table '{table_name}' not found")))?;
196        match select_rowids(table, selection.as_ref())? {
197            RowidSource::IndexProbe(rowids) => rowids,
198            RowidSource::FullScan => {
199                let mut out = Vec::new();
200                for rowid in table.rowids() {
201                    if let Some(expr) = selection {
202                        if !eval_predicate(expr, table, rowid)? {
203                            continue;
204                        }
205                    }
206                    out.push(rowid);
207                }
208                out
209            }
210        }
211    };
212
213    let table = db.get_table_mut(table_name)?;
214    for rowid in &matching {
215        table.delete_row(*rowid);
216    }
217    // Phase 7d.3 — any DELETE invalidates every HNSW index on this
218    // table (the deleted node could still appear in other nodes'
219    // neighbor lists, breaking subsequent searches). Mark dirty so
220    // the next save rebuilds from current rows before serializing.
221    //
222    // Phase 8b — same posture for FTS indexes (Q7 — rebuild-on-save
223    // mirrors HNSW). The deleted rowid still appears in posting
224    // lists; leaving it would surface zombie hits in future queries.
225    if !matching.is_empty() {
226        for entry in &mut table.hnsw_indexes {
227            entry.needs_rebuild = true;
228        }
229        for entry in &mut table.fts_indexes {
230            entry.needs_rebuild = true;
231        }
232    }
233    Ok(matching.len())
234}
235
236/// Executes an UPDATE statement. Returns the number of rows updated.
237pub fn execute_update(stmt: &Statement, db: &mut Database) -> Result<usize> {
238    let Statement::Update(Update {
239        table,
240        assignments,
241        from,
242        selection,
243        ..
244    }) = stmt
245    else {
246        return Err(SQLRiteError::Internal(
247            "execute_update called on a non-UPDATE statement".to_string(),
248        ));
249    };
250
251    if from.is_some() {
252        return Err(SQLRiteError::NotImplemented(
253            "UPDATE ... FROM is not supported yet".to_string(),
254        ));
255    }
256
257    let table_name = extract_table_name(table)?;
258
259    // Resolve assignment targets to plain column names and verify they exist.
260    let mut parsed_assignments: Vec<(String, Expr)> = Vec::with_capacity(assignments.len());
261    {
262        let tbl = db
263            .get_table(table_name.clone())
264            .map_err(|_| SQLRiteError::Internal(format!("Table '{table_name}' not found")))?;
265        for a in assignments {
266            let col = match &a.target {
267                AssignmentTarget::ColumnName(name) => name
268                    .0
269                    .last()
270                    .map(|p| p.to_string())
271                    .ok_or_else(|| SQLRiteError::Internal("empty column name".to_string()))?,
272                AssignmentTarget::Tuple(_) => {
273                    return Err(SQLRiteError::NotImplemented(
274                        "tuple assignment targets are not supported".to_string(),
275                    ));
276                }
277            };
278            if !tbl.contains_column(col.clone()) {
279                return Err(SQLRiteError::Internal(format!(
280                    "UPDATE references unknown column '{col}'"
281                )));
282            }
283            parsed_assignments.push((col, a.value.clone()));
284        }
285    }
286
287    // Gather matching rowids + the new values to write for each assignment, under
288    // an immutable borrow. Uses the index-probe fast path when the WHERE is
289    // `col = literal` on an indexed column.
290    let work: Vec<(i64, Vec<(String, Value)>)> = {
291        let tbl = db.get_table(table_name.clone())?;
292        let matched_rowids: Vec<i64> = match select_rowids(tbl, selection.as_ref())? {
293            RowidSource::IndexProbe(rowids) => rowids,
294            RowidSource::FullScan => {
295                let mut out = Vec::new();
296                for rowid in tbl.rowids() {
297                    if let Some(expr) = selection {
298                        if !eval_predicate(expr, tbl, rowid)? {
299                            continue;
300                        }
301                    }
302                    out.push(rowid);
303                }
304                out
305            }
306        };
307        let mut rows_to_update = Vec::new();
308        for rowid in matched_rowids {
309            let mut values = Vec::with_capacity(parsed_assignments.len());
310            for (col, expr) in &parsed_assignments {
311                // UPDATE's RHS is evaluated in the context of the row being updated,
312                // so column references on the right resolve to the current row's values.
313                let v = eval_expr(expr, tbl, rowid)?;
314                values.push((col.clone(), v));
315            }
316            rows_to_update.push((rowid, values));
317        }
318        rows_to_update
319    };
320
321    let tbl = db.get_table_mut(table_name)?;
322    for (rowid, values) in &work {
323        for (col, v) in values {
324            tbl.set_value(col, *rowid, v.clone())?;
325        }
326    }
327
328    // Phase 7d.3 — UPDATE may have changed a vector column that an
329    // HNSW index covers. Mark every covering index dirty so save
330    // rebuilds from current rows. (Updates that only touched
331    // non-vector columns also mark dirty, which is over-conservative
332    // but harmless — the rebuild walks rows anyway, and the cost is
333    // only paid on save.)
334    //
335    // Phase 8b — same shape for FTS indexes covering updated TEXT cols.
336    if !work.is_empty() {
337        let updated_columns: std::collections::HashSet<&str> = work
338            .iter()
339            .flat_map(|(_, values)| values.iter().map(|(c, _)| c.as_str()))
340            .collect();
341        for entry in &mut tbl.hnsw_indexes {
342            if updated_columns.contains(entry.column_name.as_str()) {
343                entry.needs_rebuild = true;
344            }
345        }
346        for entry in &mut tbl.fts_indexes {
347            if updated_columns.contains(entry.column_name.as_str()) {
348                entry.needs_rebuild = true;
349            }
350        }
351    }
352    Ok(work.len())
353}
354
355/// Handles `CREATE INDEX [UNIQUE] <name> ON <table> [USING <method>] (<column>)`.
356/// Single-column indexes only.
357///
358/// Two flavours, branching on the optional `USING <method>` clause:
359///   - **No USING, or `USING btree`**: regular B-Tree secondary index
360///     (Phase 3e). Indexable types: Integer, Text.
361///   - **`USING hnsw`**: HNSW ANN index (Phase 7d.2). Indexable types:
362///     Vector(N) only. Distance metric is L2 by default; cosine and
363///     dot variants are deferred to Phase 7d.x.
364///
365/// Returns the (possibly synthesized) index name for the status message.
366pub fn execute_create_index(stmt: &Statement, db: &mut Database) -> Result<String> {
367    let Statement::CreateIndex(CreateIndex {
368        name,
369        table_name,
370        columns,
371        using,
372        unique,
373        if_not_exists,
374        predicate,
375        ..
376    }) = stmt
377    else {
378        return Err(SQLRiteError::Internal(
379            "execute_create_index called on a non-CREATE-INDEX statement".to_string(),
380        ));
381    };
382
383    if predicate.is_some() {
384        return Err(SQLRiteError::NotImplemented(
385            "partial indexes (CREATE INDEX ... WHERE) are not supported yet".to_string(),
386        ));
387    }
388
389    if columns.len() != 1 {
390        return Err(SQLRiteError::NotImplemented(format!(
391            "multi-column indexes are not supported yet ({} columns given)",
392            columns.len()
393        )));
394    }
395
396    let index_name = name.as_ref().map(|n| n.to_string()).ok_or_else(|| {
397        SQLRiteError::NotImplemented(
398            "anonymous CREATE INDEX (no name) is not supported — give it a name".to_string(),
399        )
400    })?;
401
402    // Detect USING <method>. The `using` field on CreateIndex covers the
403    // pre-column form `CREATE INDEX … USING hnsw (col)`. (sqlparser also
404    // accepts a post-column form `… (col) USING hnsw` and parks that in
405    // `index_options`; we don't bother with it — the canonical form is
406    // pre-column and matches PG/pgvector convention.)
407    let method = match using {
408        Some(IndexType::Custom(ident)) if ident.value.eq_ignore_ascii_case("hnsw") => {
409            IndexMethod::Hnsw
410        }
411        Some(IndexType::Custom(ident)) if ident.value.eq_ignore_ascii_case("fts") => {
412            IndexMethod::Fts
413        }
414        Some(IndexType::Custom(ident)) if ident.value.eq_ignore_ascii_case("btree") => {
415            IndexMethod::Btree
416        }
417        Some(other) => {
418            return Err(SQLRiteError::NotImplemented(format!(
419                "CREATE INDEX … USING {other:?} is not supported \
420                 (try `hnsw`, `fts`, or no USING clause)"
421            )));
422        }
423        None => IndexMethod::Btree,
424    };
425
426    let table_name_str = table_name.to_string();
427    let column_name = match &columns[0].column.expr {
428        Expr::Identifier(ident) => ident.value.clone(),
429        Expr::CompoundIdentifier(parts) => parts
430            .last()
431            .map(|p| p.value.clone())
432            .ok_or_else(|| SQLRiteError::Internal("empty compound identifier".to_string()))?,
433        other => {
434            return Err(SQLRiteError::NotImplemented(format!(
435                "CREATE INDEX only supports simple column references, got {other:?}"
436            )));
437        }
438    };
439
440    // Validate: table exists, column exists, type matches the index method,
441    // name is unique across both index kinds. Snapshot (rowid, value) pairs
442    // up front under the immutable borrow so the mutable attach later
443    // doesn't fight over `self`.
444    let (datatype, existing_rowids_and_values): (DataType, Vec<(i64, Value)>) = {
445        let table = db.get_table(table_name_str.clone()).map_err(|_| {
446            SQLRiteError::General(format!(
447                "CREATE INDEX references unknown table '{table_name_str}'"
448            ))
449        })?;
450        if !table.contains_column(column_name.clone()) {
451            return Err(SQLRiteError::General(format!(
452                "CREATE INDEX references unknown column '{column_name}' on table '{table_name_str}'"
453            )));
454        }
455        let col = table
456            .columns
457            .iter()
458            .find(|c| c.column_name == column_name)
459            .expect("we just verified the column exists");
460
461        // Name uniqueness check spans ALL index kinds — btree, hnsw, and
462        // fts share one namespace per table.
463        if table.index_by_name(&index_name).is_some()
464            || table.hnsw_indexes.iter().any(|i| i.name == index_name)
465            || table.fts_indexes.iter().any(|i| i.name == index_name)
466        {
467            if *if_not_exists {
468                return Ok(index_name);
469            }
470            return Err(SQLRiteError::General(format!(
471                "index '{index_name}' already exists"
472            )));
473        }
474        let datatype = clone_datatype(&col.datatype);
475
476        let mut pairs = Vec::new();
477        for rowid in table.rowids() {
478            if let Some(v) = table.get_value(&column_name, rowid) {
479                pairs.push((rowid, v));
480            }
481        }
482        (datatype, pairs)
483    };
484
485    match method {
486        IndexMethod::Btree => create_btree_index(
487            db,
488            &table_name_str,
489            &index_name,
490            &column_name,
491            &datatype,
492            *unique,
493            &existing_rowids_and_values,
494        ),
495        IndexMethod::Hnsw => create_hnsw_index(
496            db,
497            &table_name_str,
498            &index_name,
499            &column_name,
500            &datatype,
501            *unique,
502            &existing_rowids_and_values,
503        ),
504        IndexMethod::Fts => create_fts_index(
505            db,
506            &table_name_str,
507            &index_name,
508            &column_name,
509            &datatype,
510            *unique,
511            &existing_rowids_and_values,
512        ),
513    }
514}
515
516/// Executes `DROP TABLE [IF EXISTS] <name>;`. Mirrors SQLite's single-target
517/// shape: sqlparser parses `DROP TABLE a, b` as one statement with
518/// `names: vec![a, b]`, but we reject the multi-target form to keep error
519/// semantics simple (no partial-failure rollback).
520///
521/// On success the table — and every index attached to it — disappears from
522/// the in-memory `Database`. The next auto-save rebuilds `sqlrite_master`
523/// from scratch and simply doesn't write a row for the dropped table or
524/// its indexes; pages previously occupied by them become orphans on disk
525/// (no free-list yet — file size doesn't shrink until a future VACUUM).
526pub fn execute_drop_table(
527    names: &[ObjectName],
528    if_exists: bool,
529    db: &mut Database,
530) -> Result<usize> {
531    if names.len() != 1 {
532        return Err(SQLRiteError::NotImplemented(
533            "DROP TABLE supports a single table per statement".to_string(),
534        ));
535    }
536    let name = names[0].to_string();
537
538    if name == crate::sql::pager::MASTER_TABLE_NAME {
539        return Err(SQLRiteError::General(format!(
540            "'{}' is a reserved name used by the internal schema catalog",
541            crate::sql::pager::MASTER_TABLE_NAME
542        )));
543    }
544
545    if !db.contains_table(name.clone()) {
546        return if if_exists {
547            Ok(0)
548        } else {
549            Err(SQLRiteError::General(format!(
550                "Table '{name}' does not exist"
551            )))
552        };
553    }
554
555    db.tables.remove(&name);
556    Ok(1)
557}
558
559/// Executes `DROP INDEX [IF EXISTS] <name>;`. The statement does not name a
560/// table, so we walk every table looking for the index across all three
561/// index families (B-Tree secondary, HNSW, FTS).
562///
563/// Refuses to drop auto-indexes (`origin == IndexOrigin::Auto`) — those are
564/// invariants of the table's PRIMARY KEY / UNIQUE constraints and should
565/// only disappear when the column or table they depend on is dropped.
566/// SQLite has the same rule for its `sqlite_autoindex_*` indexes.
567pub fn execute_drop_index(
568    names: &[ObjectName],
569    if_exists: bool,
570    db: &mut Database,
571) -> Result<usize> {
572    if names.len() != 1 {
573        return Err(SQLRiteError::NotImplemented(
574            "DROP INDEX supports a single index per statement".to_string(),
575        ));
576    }
577    let name = names[0].to_string();
578
579    for table in db.tables.values_mut() {
580        if let Some(secondary) = table.secondary_indexes.iter().find(|i| i.name == name) {
581            if secondary.origin == IndexOrigin::Auto {
582                return Err(SQLRiteError::General(format!(
583                    "cannot drop auto-created index '{name}' (drop the column or table instead)"
584                )));
585            }
586            table.secondary_indexes.retain(|i| i.name != name);
587            return Ok(1);
588        }
589        if table.hnsw_indexes.iter().any(|i| i.name == name) {
590            table.hnsw_indexes.retain(|i| i.name != name);
591            return Ok(1);
592        }
593        if table.fts_indexes.iter().any(|i| i.name == name) {
594            table.fts_indexes.retain(|i| i.name != name);
595            return Ok(1);
596        }
597    }
598
599    if if_exists {
600        Ok(0)
601    } else {
602        Err(SQLRiteError::General(format!(
603            "Index '{name}' does not exist"
604        )))
605    }
606}
607
608/// Executes `ALTER TABLE [IF EXISTS] <name> <op>;` for one operation per
609/// statement. Supports four sub-operations matching SQLite:
610///
611///   - `RENAME TO <new>`
612///   - `RENAME COLUMN <old> TO <new>`
613///   - `ADD COLUMN <coldef>` (NOT NULL requires DEFAULT on a non-empty table;
614///     PK / UNIQUE constraints rejected — would need backfill + uniqueness)
615///   - `DROP COLUMN <name>` (refuses PK column and only-column)
616///
617/// Multi-operation ALTER (`ALTER TABLE foo RENAME TO bar, ADD COLUMN x ...`)
618/// is rejected; SQLite forbids it too.
619pub fn execute_alter_table(alter: AlterTable, db: &mut Database) -> Result<String> {
620    let table_name = alter.name.to_string();
621
622    if table_name == crate::sql::pager::MASTER_TABLE_NAME {
623        return Err(SQLRiteError::General(format!(
624            "'{}' is a reserved name used by the internal schema catalog",
625            crate::sql::pager::MASTER_TABLE_NAME
626        )));
627    }
628
629    if !db.contains_table(table_name.clone()) {
630        return if alter.if_exists {
631            Ok("ALTER TABLE: no-op (table does not exist)".to_string())
632        } else {
633            Err(SQLRiteError::General(format!(
634                "Table '{table_name}' does not exist"
635            )))
636        };
637    }
638
639    if alter.operations.len() != 1 {
640        return Err(SQLRiteError::NotImplemented(
641            "ALTER TABLE supports one operation per statement".to_string(),
642        ));
643    }
644
645    match &alter.operations[0] {
646        AlterTableOperation::RenameTable { table_name: kind } => {
647            let new_name = match kind {
648                RenameTableNameKind::To(name) => name.to_string(),
649                RenameTableNameKind::As(_) => {
650                    return Err(SQLRiteError::NotImplemented(
651                        "ALTER TABLE ... RENAME AS (MySQL-only) is not supported; use RENAME TO"
652                            .to_string(),
653                    ));
654                }
655            };
656            alter_rename_table(db, &table_name, &new_name)?;
657            Ok(format!(
658                "ALTER TABLE '{table_name}' RENAME TO '{new_name}' executed."
659            ))
660        }
661        AlterTableOperation::RenameColumn {
662            old_column_name,
663            new_column_name,
664        } => {
665            let old = old_column_name.value.clone();
666            let new = new_column_name.value.clone();
667            db.get_table_mut(table_name.clone())?
668                .rename_column(&old, &new)?;
669            Ok(format!(
670                "ALTER TABLE '{table_name}' RENAME COLUMN '{old}' TO '{new}' executed."
671            ))
672        }
673        AlterTableOperation::AddColumn {
674            column_def,
675            if_not_exists,
676            ..
677        } => {
678            let parsed = crate::sql::parser::create::parse_one_column(column_def)?;
679            let table = db.get_table_mut(table_name.clone())?;
680            if *if_not_exists && table.contains_column(parsed.name.clone()) {
681                return Ok(format!(
682                    "ALTER TABLE '{table_name}' ADD COLUMN: no-op (column '{}' already exists)",
683                    parsed.name
684                ));
685            }
686            let col_name = parsed.name.clone();
687            table.add_column(parsed)?;
688            Ok(format!(
689                "ALTER TABLE '{table_name}' ADD COLUMN '{col_name}' executed."
690            ))
691        }
692        AlterTableOperation::DropColumn {
693            column_names,
694            if_exists,
695            ..
696        } => {
697            if column_names.len() != 1 {
698                return Err(SQLRiteError::NotImplemented(
699                    "ALTER TABLE DROP COLUMN supports a single column per statement".to_string(),
700                ));
701            }
702            let col_name = column_names[0].value.clone();
703            let table = db.get_table_mut(table_name.clone())?;
704            if *if_exists && !table.contains_column(col_name.clone()) {
705                return Ok(format!(
706                    "ALTER TABLE '{table_name}' DROP COLUMN: no-op (column '{col_name}' does not exist)"
707                ));
708            }
709            table.drop_column(&col_name)?;
710            Ok(format!(
711                "ALTER TABLE '{table_name}' DROP COLUMN '{col_name}' executed."
712            ))
713        }
714        other => Err(SQLRiteError::NotImplemented(format!(
715            "ALTER TABLE operation {other:?} is not supported"
716        ))),
717    }
718}
719
720/// Executes `VACUUM;` (SQLR-6). Compacts the database file: rewrites
721/// every live table, index, and the catalog contiguously from page 1,
722/// drops the freelist, and truncates the tail at the next checkpoint.
723///
724/// Refuses to run inside a transaction (would publish in-flight writes
725/// out of band); refuses on read-only databases (handled upstream by
726/// the read-only mutation gate); and is a no-op on in-memory databases
727/// (no file to compact). Bare `VACUUM;` only — non-default options
728/// (`FULL`, `REINDEX`, table targets, etc.) are rejected.
729pub fn execute_vacuum(db: &mut Database) -> Result<String> {
730    if db.in_transaction() {
731        return Err(SQLRiteError::General(
732            "VACUUM cannot run inside a transaction".to_string(),
733        ));
734    }
735    let path = match db.source_path.clone() {
736        Some(p) => p,
737        None => {
738            return Ok("VACUUM is a no-op for in-memory databases".to_string());
739        }
740    };
741    // Checkpoint before AND after VACUUM so the main-file size we report
742    // reflects only what VACUUM actually reclaimed — without the leading
743    // checkpoint, `size_before` would be the stale main-file snapshot
744    // (typically 2 pages) while WAL holds the live bytes, making the
745    // bytes-reclaimed delta meaningless.
746    if let Some(pager) = db.pager.as_mut() {
747        let _ = pager.checkpoint();
748    }
749    let size_before = std::fs::metadata(&path).ok().map(|m| m.len()).unwrap_or(0);
750    let pages_before = db
751        .pager
752        .as_ref()
753        .map(|p| p.header().page_count)
754        .unwrap_or(0);
755    crate::sql::pager::vacuum_database(db, &path)?;
756    // Second checkpoint so the main file shrinks now — VACUUM's whole
757    // purpose is to reclaim bytes, so paying the I/O up front is fair.
758    if let Some(pager) = db.pager.as_mut() {
759        let _ = pager.checkpoint();
760    }
761    let size_after = std::fs::metadata(&path).ok().map(|m| m.len()).unwrap_or(0);
762    let pages_after = db
763        .pager
764        .as_ref()
765        .map(|p| p.header().page_count)
766        .unwrap_or(0);
767    let pages_reclaimed = pages_before.saturating_sub(pages_after);
768    let bytes_reclaimed = size_before.saturating_sub(size_after);
769    Ok(format!(
770        "VACUUM completed. {pages_reclaimed} pages reclaimed ({bytes_reclaimed} bytes)."
771    ))
772}
773
774/// Renames a table in `db.tables`. Updates `tb_name`, every secondary
775/// index's `table_name` field, and any auto-index whose name embedded
776/// the old table name. HNSW / FTS index entries don't carry a
777/// `table_name` field — they're addressed implicitly via the `Table`
778/// they live inside, so they move with the rename for free.
779fn alter_rename_table(db: &mut Database, old: &str, new: &str) -> Result<()> {
780    if new == crate::sql::pager::MASTER_TABLE_NAME {
781        return Err(SQLRiteError::General(format!(
782            "'{}' is a reserved name used by the internal schema catalog",
783            crate::sql::pager::MASTER_TABLE_NAME
784        )));
785    }
786    if old == new {
787        return Ok(());
788    }
789    if db.contains_table(new.to_string()) {
790        return Err(SQLRiteError::General(format!(
791            "target table '{new}' already exists"
792        )));
793    }
794
795    let mut table = db
796        .tables
797        .remove(old)
798        .ok_or_else(|| SQLRiteError::General(format!("Table '{old}' does not exist")))?;
799    table.tb_name = new.to_string();
800    for idx in table.secondary_indexes.iter_mut() {
801        idx.table_name = new.to_string();
802        if idx.origin == IndexOrigin::Auto
803            && idx.name == SecondaryIndex::auto_name(old, &idx.column_name)
804        {
805            idx.name = SecondaryIndex::auto_name(new, &idx.column_name);
806        }
807    }
808    db.tables.insert(new.to_string(), table);
809    Ok(())
810}
811
812/// `USING <method>` choices recognized by `execute_create_index`. A
813/// missing USING clause defaults to `Btree` so existing CREATE INDEX
814/// statements (Phase 3e) keep working unchanged.
815#[derive(Debug, Clone, Copy)]
816enum IndexMethod {
817    Btree,
818    Hnsw,
819    /// Phase 8b — full-text inverted index over a TEXT column.
820    Fts,
821}
822
823/// Builds a Phase 3e B-Tree secondary index and attaches it to the table.
824fn create_btree_index(
825    db: &mut Database,
826    table_name: &str,
827    index_name: &str,
828    column_name: &str,
829    datatype: &DataType,
830    unique: bool,
831    existing: &[(i64, Value)],
832) -> Result<String> {
833    let mut idx = SecondaryIndex::new(
834        index_name.to_string(),
835        table_name.to_string(),
836        column_name.to_string(),
837        datatype,
838        unique,
839        IndexOrigin::Explicit,
840    )?;
841
842    // Populate from existing rows. UNIQUE violations here mean the
843    // existing data already breaks the new index's constraint — a
844    // common source of user confusion, so be explicit.
845    for (rowid, v) in existing {
846        if unique && idx.would_violate_unique(v) {
847            return Err(SQLRiteError::General(format!(
848                "cannot create UNIQUE index '{index_name}': column '{column_name}' \
849                 already contains the duplicate value {}",
850                v.to_display_string()
851            )));
852        }
853        idx.insert(v, *rowid)?;
854    }
855
856    let table_mut = db.get_table_mut(table_name.to_string())?;
857    table_mut.secondary_indexes.push(idx);
858    Ok(index_name.to_string())
859}
860
861/// Builds a Phase 7d.2 HNSW index and attaches it to the table.
862fn create_hnsw_index(
863    db: &mut Database,
864    table_name: &str,
865    index_name: &str,
866    column_name: &str,
867    datatype: &DataType,
868    unique: bool,
869    existing: &[(i64, Value)],
870) -> Result<String> {
871    // HNSW only makes sense on VECTOR columns. Reject anything else
872    // with a clear message — this is the most likely user error.
873    let dim = match datatype {
874        DataType::Vector(d) => *d,
875        other => {
876            return Err(SQLRiteError::General(format!(
877                "USING hnsw requires a VECTOR column; '{column_name}' is {other}"
878            )));
879        }
880    };
881
882    if unique {
883        return Err(SQLRiteError::General(
884            "UNIQUE has no meaning for HNSW indexes".to_string(),
885        ));
886    }
887
888    // Build the in-memory graph. Distance metric is L2 by default
889    // (Phase 7d.2 doesn't yet expose a knob for picking cosine/dot —
890    // see `docs/phase-7-plan.md` for the deferral).
891    //
892    // Seed: hash the index name so different indexes get different
893    // graph topologies, but the same index always gets the same one
894    // — useful when debugging recall / index size.
895    let seed = hash_str_to_seed(index_name);
896    let mut idx = HnswIndex::new(DistanceMetric::L2, seed);
897
898    // Snapshot the (rowid, vector) pairs into a side map so the
899    // get_vec closure below can serve them by id without re-borrowing
900    // the table (we're already holding `existing` — flatten it).
901    let mut vec_map: std::collections::HashMap<i64, Vec<f32>> =
902        std::collections::HashMap::with_capacity(existing.len());
903    for (rowid, v) in existing {
904        match v {
905            Value::Vector(vec) => {
906                if vec.len() != dim {
907                    return Err(SQLRiteError::Internal(format!(
908                        "row {rowid} stores a {}-dim vector in column '{column_name}' \
909                         declared as VECTOR({dim}) — schema invariant violated",
910                        vec.len()
911                    )));
912                }
913                vec_map.insert(*rowid, vec.clone());
914            }
915            // Non-vector values (theoretical NULL, type coercion bug)
916            // get skipped — they wouldn't have a sensible graph
917            // position anyway.
918            _ => continue,
919        }
920    }
921
922    for (rowid, _) in existing {
923        if let Some(v) = vec_map.get(rowid) {
924            let v_clone = v.clone();
925            idx.insert(*rowid, &v_clone, |id| {
926                vec_map.get(&id).cloned().unwrap_or_default()
927            });
928        }
929    }
930
931    let table_mut = db.get_table_mut(table_name.to_string())?;
932    table_mut.hnsw_indexes.push(HnswIndexEntry {
933        name: index_name.to_string(),
934        column_name: column_name.to_string(),
935        index: idx,
936        // Freshly built — no DELETE/UPDATE has invalidated it yet.
937        needs_rebuild: false,
938    });
939    Ok(index_name.to_string())
940}
941
942/// Builds a Phase 8b FTS inverted index and attaches it to the table.
943/// Mirrors [`create_hnsw_index`] in shape: validate column type,
944/// tokenize each existing row's text into the in-memory posting list,
945/// push an `FtsIndexEntry`.
946fn create_fts_index(
947    db: &mut Database,
948    table_name: &str,
949    index_name: &str,
950    column_name: &str,
951    datatype: &DataType,
952    unique: bool,
953    existing: &[(i64, Value)],
954) -> Result<String> {
955    // FTS is a TEXT-only feature for the MVP. JSON columns share the
956    // Row::Text storage but their content is structured — full-text
957    // indexing JSON keys + values would need a different design (and
958    // is out of scope per the Phase 8 plan's "Out of scope" section).
959    match datatype {
960        DataType::Text => {}
961        other => {
962            return Err(SQLRiteError::General(format!(
963                "USING fts requires a TEXT column; '{column_name}' is {other}"
964            )));
965        }
966    }
967
968    if unique {
969        return Err(SQLRiteError::General(
970            "UNIQUE has no meaning for FTS indexes".to_string(),
971        ));
972    }
973
974    let mut idx = PostingList::new();
975    for (rowid, v) in existing {
976        if let Value::Text(text) = v {
977            idx.insert(*rowid, text);
978        }
979        // Non-text values (Null, type coercion bugs) get skipped — same
980        // posture as create_hnsw_index for non-vector values.
981    }
982
983    let table_mut = db.get_table_mut(table_name.to_string())?;
984    table_mut.fts_indexes.push(FtsIndexEntry {
985        name: index_name.to_string(),
986        column_name: column_name.to_string(),
987        index: idx,
988        needs_rebuild: false,
989    });
990    Ok(index_name.to_string())
991}
992
993/// Stable, deterministic hash of a string into a u64 RNG seed. FNV-1a;
994/// avoids pulling in `std::hash::DefaultHasher` (which is randomized
995/// per process).
996fn hash_str_to_seed(s: &str) -> u64 {
997    let mut h: u64 = 0xCBF29CE484222325;
998    for b in s.as_bytes() {
999        h ^= *b as u64;
1000        h = h.wrapping_mul(0x100000001B3);
1001    }
1002    h
1003}
1004
1005/// Cheap clone helper — `DataType` intentionally doesn't derive `Clone`
1006/// because the enum has no ergonomic reason to be cloneable elsewhere.
1007fn clone_datatype(dt: &DataType) -> DataType {
1008    match dt {
1009        DataType::Integer => DataType::Integer,
1010        DataType::Text => DataType::Text,
1011        DataType::Real => DataType::Real,
1012        DataType::Bool => DataType::Bool,
1013        DataType::Vector(dim) => DataType::Vector(*dim),
1014        DataType::Json => DataType::Json,
1015        DataType::None => DataType::None,
1016        DataType::Invalid => DataType::Invalid,
1017    }
1018}
1019
1020fn extract_single_table_name(tables: &[TableWithJoins]) -> Result<String> {
1021    if tables.len() != 1 {
1022        return Err(SQLRiteError::NotImplemented(
1023            "multi-table DELETE is not supported yet".to_string(),
1024        ));
1025    }
1026    extract_table_name(&tables[0])
1027}
1028
1029fn extract_table_name(twj: &TableWithJoins) -> Result<String> {
1030    if !twj.joins.is_empty() {
1031        return Err(SQLRiteError::NotImplemented(
1032            "JOIN is not supported yet".to_string(),
1033        ));
1034    }
1035    match &twj.relation {
1036        TableFactor::Table { name, .. } => Ok(name.to_string()),
1037        _ => Err(SQLRiteError::NotImplemented(
1038            "only plain table references are supported".to_string(),
1039        )),
1040    }
1041}
1042
1043/// Tells the executor how to produce its candidate rowid list.
1044enum RowidSource {
1045    /// The WHERE was simple enough to probe a secondary index directly.
1046    /// The `Vec` already contains exactly the rows the index matched;
1047    /// no further WHERE evaluation is needed (the probe is precise).
1048    IndexProbe(Vec<i64>),
1049    /// No applicable index; caller falls back to walking `table.rowids()`
1050    /// and evaluating the WHERE on each row.
1051    FullScan,
1052}
1053
1054/// Try to satisfy `WHERE` with an index probe. Currently supports the
1055/// simplest shape: a single `col = literal` (or `literal = col`) where
1056/// `col` is on a secondary index. AND/OR/range predicates fall back to
1057/// full scan — those can be layered on later without changing the caller.
1058fn select_rowids(table: &Table, selection: Option<&Expr>) -> Result<RowidSource> {
1059    let Some(expr) = selection else {
1060        return Ok(RowidSource::FullScan);
1061    };
1062    let Some((col, literal)) = try_extract_equality(expr) else {
1063        return Ok(RowidSource::FullScan);
1064    };
1065    let Some(idx) = table.index_for_column(&col) else {
1066        return Ok(RowidSource::FullScan);
1067    };
1068
1069    // Convert the literal into a runtime Value. If the literal type doesn't
1070    // match the column's index we still need correct semantics — evaluate
1071    // the WHERE against every row. Fall back to full scan.
1072    let literal_value = match convert_literal(&literal) {
1073        Ok(v) => v,
1074        Err(_) => return Ok(RowidSource::FullScan),
1075    };
1076
1077    // Index lookup returns the full list of rowids matching this equality
1078    // predicate. For unique indexes that's at most one; for non-unique it
1079    // can be many.
1080    let mut rowids = idx.lookup(&literal_value);
1081    rowids.sort_unstable();
1082    Ok(RowidSource::IndexProbe(rowids))
1083}
1084
1085/// Recognizes `expr` as a simple equality on a column reference against a
1086/// literal. Returns `(column_name, literal_value)` if the shape matches;
1087/// `None` otherwise. Accepts both `col = literal` and `literal = col`.
1088fn try_extract_equality(expr: &Expr) -> Option<(String, sqlparser::ast::Value)> {
1089    // Peel off Nested parens so `WHERE (x = 1)` is recognized too.
1090    let peeled = match expr {
1091        Expr::Nested(inner) => inner.as_ref(),
1092        other => other,
1093    };
1094    let Expr::BinaryOp { left, op, right } = peeled else {
1095        return None;
1096    };
1097    if !matches!(op, BinaryOperator::Eq) {
1098        return None;
1099    }
1100    let col_from = |e: &Expr| -> Option<String> {
1101        match e {
1102            Expr::Identifier(ident) => Some(ident.value.clone()),
1103            Expr::CompoundIdentifier(parts) => parts.last().map(|p| p.value.clone()),
1104            _ => None,
1105        }
1106    };
1107    let literal_from = |e: &Expr| -> Option<sqlparser::ast::Value> {
1108        if let Expr::Value(v) = e {
1109            Some(v.value.clone())
1110        } else {
1111            None
1112        }
1113    };
1114    if let (Some(c), Some(l)) = (col_from(left), literal_from(right)) {
1115        return Some((c, l));
1116    }
1117    if let (Some(l), Some(c)) = (literal_from(left), col_from(right)) {
1118        return Some((c, l));
1119    }
1120    None
1121}
1122
1123/// Recognizes the HNSW-probable query pattern and probes the graph
1124/// if a matching index exists.
1125///
1126/// Looks for ORDER BY `vec_distance_l2(<col>, <bracket-array literal>)`
1127/// where the table has an HNSW index attached to `<col>`. On a match,
1128/// returns the top-k rowids straight from the graph (O(log N)). On
1129/// any miss — different function name, no matching index, query
1130/// dimension wrong, etc. — returns `None` and the caller falls through
1131/// to the bounded-heap brute-force path (7c) or the full sort (7b),
1132/// preserving correct results regardless of whether the HNSW pathway
1133/// kicked in.
1134///
1135/// Phase 7d.2 caveats:
1136/// - Only `vec_distance_l2` is recognized. Cosine and dot fall through
1137///   to brute-force because we don't yet expose a per-index distance
1138///   knob (deferred to Phase 7d.x — see `docs/phase-7-plan.md`).
1139/// - Only ASCENDING order makes sense for "k nearest" — DESC ORDER BY
1140///   `vec_distance_l2(...) LIMIT k` would mean "k farthest", which
1141///   isn't what the index is built for. We don't bother to detect
1142///   `ascending == false` here; the optimizer just skips and the
1143///   fallback path handles it correctly (slower).
1144fn try_hnsw_probe(table: &Table, order_expr: &Expr, k: usize) -> Option<Vec<i64>> {
1145    if k == 0 {
1146        return None;
1147    }
1148
1149    // Pattern-match: order expr must be a function call vec_distance_l2(a, b).
1150    let func = match order_expr {
1151        Expr::Function(f) => f,
1152        _ => return None,
1153    };
1154    let fname = match func.name.0.as_slice() {
1155        [ObjectNamePart::Identifier(ident)] => ident.value.to_lowercase(),
1156        _ => return None,
1157    };
1158    if fname != "vec_distance_l2" {
1159        return None;
1160    }
1161
1162    // Extract the two args as raw Exprs.
1163    let arg_list = match &func.args {
1164        FunctionArguments::List(l) => &l.args,
1165        _ => return None,
1166    };
1167    if arg_list.len() != 2 {
1168        return None;
1169    }
1170    let exprs: Vec<&Expr> = arg_list
1171        .iter()
1172        .filter_map(|a| match a {
1173            FunctionArg::Unnamed(FunctionArgExpr::Expr(e)) => Some(e),
1174            _ => None,
1175        })
1176        .collect();
1177    if exprs.len() != 2 {
1178        return None;
1179    }
1180
1181    // One arg must be a column reference (the indexed col); the other
1182    // must be a bracket-array literal (the query vector). Try both
1183    // orderings — pgvector's idiom puts the column on the left, but
1184    // SQL is commutative for distance.
1185    let (col_name, query_vec) = match identify_indexed_arg_and_literal(exprs[0], exprs[1]) {
1186        Some(v) => v,
1187        None => match identify_indexed_arg_and_literal(exprs[1], exprs[0]) {
1188            Some(v) => v,
1189            None => return None,
1190        },
1191    };
1192
1193    // Find the HNSW index on this column.
1194    let entry = table
1195        .hnsw_indexes
1196        .iter()
1197        .find(|e| e.column_name == col_name)?;
1198
1199    // Dimension sanity check — the query vector must match the
1200    // indexed column's declared dimension. If it doesn't, the brute-
1201    // force fallback would also error at the vec_distance_l2 dim-check;
1202    // returning None here lets that path produce the user-visible
1203    // error message.
1204    let declared_dim = match table.columns.iter().find(|c| c.column_name == col_name) {
1205        Some(c) => match &c.datatype {
1206            DataType::Vector(d) => *d,
1207            _ => return None,
1208        },
1209        None => return None,
1210    };
1211    if query_vec.len() != declared_dim {
1212        return None;
1213    }
1214
1215    // Probe the graph. Vectors are looked up from the table's row
1216    // storage — a closure rather than a `&Table` so the algorithm
1217    // module stays decoupled from the SQL types.
1218    let column_for_closure = col_name.clone();
1219    let table_ref = table;
1220    let result = entry.index.search(&query_vec, k, |id| {
1221        match table_ref.get_value(&column_for_closure, id) {
1222            Some(Value::Vector(v)) => v,
1223            _ => Vec::new(),
1224        }
1225    });
1226    Some(result)
1227}
1228
1229/// Phase 8b — FTS optimizer hook.
1230///
1231/// Recognizes `ORDER BY bm25_score(<col>, '<query>') DESC LIMIT <k>`
1232/// and serves it from the FTS index instead of full-scanning. Returns
1233/// `Some(rowids)` already sorted by descending BM25 (with rowid
1234/// ascending as tie-break), or `None` to fall through to scalar eval.
1235///
1236/// **Known limitation (mirrors `try_hnsw_probe`).** This shortcut
1237/// ignores any `WHERE` clause. The canonical FTS query has a
1238/// `WHERE fts_match(<col>, '<q>')` predicate, which is implicitly
1239/// satisfied by the probe results — so dropping it is harmless.
1240/// Anything *else* in the WHERE (`AND status = 'published'`) gets
1241/// silently skipped on the optimizer path. Per Phase 8 plan Q6 we
1242/// match HNSW's posture here; a correctness-preserving multi-index
1243/// composer is deferred.
1244fn try_fts_probe(table: &Table, order_expr: &Expr, ascending: bool, k: usize) -> Option<Vec<i64>> {
1245    if k == 0 || ascending {
1246        // BM25 is "higher = better"; ASC ranking is almost certainly a
1247        // user mistake. Fall through so the caller gets either an
1248        // explicit error from scalar eval or the slow correct path.
1249        return None;
1250    }
1251
1252    let func = match order_expr {
1253        Expr::Function(f) => f,
1254        _ => return None,
1255    };
1256    let fname = match func.name.0.as_slice() {
1257        [ObjectNamePart::Identifier(ident)] => ident.value.to_lowercase(),
1258        _ => return None,
1259    };
1260    if fname != "bm25_score" {
1261        return None;
1262    }
1263
1264    let arg_list = match &func.args {
1265        FunctionArguments::List(l) => &l.args,
1266        _ => return None,
1267    };
1268    if arg_list.len() != 2 {
1269        return None;
1270    }
1271    let exprs: Vec<&Expr> = arg_list
1272        .iter()
1273        .filter_map(|a| match a {
1274            FunctionArg::Unnamed(FunctionArgExpr::Expr(e)) => Some(e),
1275            _ => None,
1276        })
1277        .collect();
1278    if exprs.len() != 2 {
1279        return None;
1280    }
1281
1282    // Arg 0 must be a bare column identifier.
1283    let col_name = match exprs[0] {
1284        Expr::Identifier(ident) if ident.quote_style.is_none() => ident.value.clone(),
1285        _ => return None,
1286    };
1287
1288    // Arg 1 must be a single-quoted string literal. Anything else
1289    // (column reference, function call) requires per-row evaluation —
1290    // we'd lose the whole point of the probe.
1291    let query = match exprs[1] {
1292        Expr::Value(v) => match &v.value {
1293            AstValue::SingleQuotedString(s) => s.clone(),
1294            _ => return None,
1295        },
1296        _ => return None,
1297    };
1298
1299    let entry = table
1300        .fts_indexes
1301        .iter()
1302        .find(|e| e.column_name == col_name)?;
1303
1304    let scored = entry.index.query(&query, &Bm25Params::default());
1305    let mut out: Vec<i64> = scored.into_iter().map(|(id, _)| id).collect();
1306    if out.len() > k {
1307        out.truncate(k);
1308    }
1309    Some(out)
1310}
1311
1312/// Helper for `try_hnsw_probe`: given two function args, identify which
1313/// one is a bare column identifier (the indexed column) and which is a
1314/// bracket-array literal (the query vector). Returns
1315/// `Some((column_name, query_vec))` on a match, `None` otherwise.
1316fn identify_indexed_arg_and_literal(a: &Expr, b: &Expr) -> Option<(String, Vec<f32>)> {
1317    let col_name = match a {
1318        Expr::Identifier(ident) if ident.quote_style.is_none() => ident.value.clone(),
1319        _ => return None,
1320    };
1321    let lit_str = match b {
1322        Expr::Identifier(ident) if ident.quote_style == Some('[') => {
1323            format!("[{}]", ident.value)
1324        }
1325        _ => return None,
1326    };
1327    let v = parse_vector_literal(&lit_str).ok()?;
1328    Some((col_name, v))
1329}
1330
1331/// One entry in the bounded-heap top-k path. Holds a pre-evaluated
1332/// sort key + the rowid it came from. The `asc` flag inverts `Ord`
1333/// so a single `BinaryHeap<HeapEntry>` works for both ASC and DESC
1334/// without wrapping in `std::cmp::Reverse` at the call site:
1335///
1336///   - ASC LIMIT k = "k smallest": natural Ord. Max-heap top is the
1337///     largest currently kept; new items smaller than top displace.
1338///   - DESC LIMIT k = "k largest": Ord reversed. Max-heap top is now
1339///     the smallest currently kept (under reversed Ord, smallest
1340///     looks largest); new items larger than top displace.
1341///
1342/// In both cases the displacement test reduces to "new entry < heap top".
1343struct HeapEntry {
1344    key: Value,
1345    rowid: i64,
1346    asc: bool,
1347}
1348
1349impl PartialEq for HeapEntry {
1350    fn eq(&self, other: &Self) -> bool {
1351        self.cmp(other) == Ordering::Equal
1352    }
1353}
1354
1355impl Eq for HeapEntry {}
1356
1357impl PartialOrd for HeapEntry {
1358    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1359        Some(self.cmp(other))
1360    }
1361}
1362
1363impl Ord for HeapEntry {
1364    fn cmp(&self, other: &Self) -> Ordering {
1365        let raw = compare_values(Some(&self.key), Some(&other.key));
1366        if self.asc { raw } else { raw.reverse() }
1367    }
1368}
1369
1370/// Bounded-heap top-k selection. Returns at most `k` rowids in the
1371/// caller's desired order (ascending key for `order.ascending`,
1372/// descending otherwise).
1373///
1374/// O(N log k) where N = `matching.len()`. Caller must check
1375/// `k < matching.len()` for this to be a win — for k ≥ N the
1376/// `sort_rowids` full-sort path is the same asymptotic cost without
1377/// the heap overhead.
1378fn select_topk(
1379    matching: &[i64],
1380    table: &Table,
1381    order: &OrderByClause,
1382    k: usize,
1383) -> Result<Vec<i64>> {
1384    use std::collections::BinaryHeap;
1385
1386    if k == 0 || matching.is_empty() {
1387        return Ok(Vec::new());
1388    }
1389
1390    let mut heap: BinaryHeap<HeapEntry> = BinaryHeap::with_capacity(k + 1);
1391
1392    for &rowid in matching {
1393        let key = eval_expr(&order.expr, table, rowid)?;
1394        let entry = HeapEntry {
1395            key,
1396            rowid,
1397            asc: order.ascending,
1398        };
1399
1400        if heap.len() < k {
1401            heap.push(entry);
1402        } else {
1403            // peek() returns the largest under our direction-aware Ord
1404            // — the worst entry currently kept. Displace it iff the
1405            // new entry is "better" (i.e. compares Less).
1406            if entry < *heap.peek().unwrap() {
1407                heap.pop();
1408                heap.push(entry);
1409            }
1410        }
1411    }
1412
1413    // `into_sorted_vec` returns ascending under our direction-aware Ord:
1414    //   ASC: ascending by raw key (what we want)
1415    //   DESC: ascending under reversed Ord = descending by raw key (what
1416    //         we want for an ORDER BY DESC LIMIT k result)
1417    Ok(heap
1418        .into_sorted_vec()
1419        .into_iter()
1420        .map(|e| e.rowid)
1421        .collect())
1422}
1423
1424fn sort_rowids(rowids: &mut [i64], table: &Table, order: &OrderByClause) -> Result<()> {
1425    // Phase 7b: ORDER BY now accepts any expression (column ref,
1426    // arithmetic, function call, …). Pre-compute the sort key for
1427    // every rowid up front so the comparator is called O(N log N)
1428    // times against pre-evaluated Values rather than re-evaluating
1429    // the expression O(N log N) times. Not strictly necessary today,
1430    // but vital once 7d's HNSW index lands and this same code path
1431    // could be running tens of millions of distance computations.
1432    let mut keys: Vec<(i64, Result<Value>)> = rowids
1433        .iter()
1434        .map(|r| (*r, eval_expr(&order.expr, table, *r)))
1435        .collect();
1436
1437    // Surface the FIRST evaluation error if any. We could be lazy
1438    // and let sort_by encounter it, but `Ord::cmp` can't return a
1439    // Result and we'd have to swallow errors silently.
1440    for (_, k) in &keys {
1441        if let Err(e) = k {
1442            return Err(SQLRiteError::General(format!(
1443                "ORDER BY expression failed: {e}"
1444            )));
1445        }
1446    }
1447
1448    keys.sort_by(|(_, ka), (_, kb)| {
1449        // Both unwrap()s are safe — we just verified above that
1450        // every key Result is Ok.
1451        let va = ka.as_ref().unwrap();
1452        let vb = kb.as_ref().unwrap();
1453        let ord = compare_values(Some(va), Some(vb));
1454        if order.ascending { ord } else { ord.reverse() }
1455    });
1456
1457    // Write the sorted rowids back into the caller's slice.
1458    for (i, (rowid, _)) in keys.into_iter().enumerate() {
1459        rowids[i] = rowid;
1460    }
1461    Ok(())
1462}
1463
1464fn compare_values(a: Option<&Value>, b: Option<&Value>) -> Ordering {
1465    match (a, b) {
1466        (None, None) => Ordering::Equal,
1467        (None, _) => Ordering::Less,
1468        (_, None) => Ordering::Greater,
1469        (Some(a), Some(b)) => match (a, b) {
1470            (Value::Null, Value::Null) => Ordering::Equal,
1471            (Value::Null, _) => Ordering::Less,
1472            (_, Value::Null) => Ordering::Greater,
1473            (Value::Integer(x), Value::Integer(y)) => x.cmp(y),
1474            (Value::Real(x), Value::Real(y)) => x.partial_cmp(y).unwrap_or(Ordering::Equal),
1475            (Value::Integer(x), Value::Real(y)) => {
1476                (*x as f64).partial_cmp(y).unwrap_or(Ordering::Equal)
1477            }
1478            (Value::Real(x), Value::Integer(y)) => {
1479                x.partial_cmp(&(*y as f64)).unwrap_or(Ordering::Equal)
1480            }
1481            (Value::Text(x), Value::Text(y)) => x.cmp(y),
1482            (Value::Bool(x), Value::Bool(y)) => x.cmp(y),
1483            // Cross-type fallback: stringify and compare; keeps ORDER BY total.
1484            (x, y) => x.to_display_string().cmp(&y.to_display_string()),
1485        },
1486    }
1487}
1488
1489/// Returns `true` if the row at `rowid` matches the predicate expression.
1490pub fn eval_predicate(expr: &Expr, table: &Table, rowid: i64) -> Result<bool> {
1491    let v = eval_expr(expr, table, rowid)?;
1492    match v {
1493        Value::Bool(b) => Ok(b),
1494        Value::Null => Ok(false), // SQL NULL in a WHERE is treated as false
1495        Value::Integer(i) => Ok(i != 0),
1496        other => Err(SQLRiteError::Internal(format!(
1497            "WHERE clause must evaluate to boolean, got {}",
1498            other.to_display_string()
1499        ))),
1500    }
1501}
1502
1503fn eval_expr(expr: &Expr, table: &Table, rowid: i64) -> Result<Value> {
1504    match expr {
1505        Expr::Nested(inner) => eval_expr(inner, table, rowid),
1506
1507        Expr::Identifier(ident) => {
1508            // Phase 7b — sqlparser parses bracket-array literals like
1509            // `[0.1, 0.2, 0.3]` as bracket-quoted identifiers (it inherits
1510            // MSSQL `[name]` syntax). When we see `quote_style == Some('[')`
1511            // in expression-evaluation position (SELECT projection, WHERE,
1512            // ORDER BY, function args), parse the bracketed content as a
1513            // vector literal so the rest of the executor can compare /
1514            // distance-compute against it. Same trick the INSERT parser
1515            // uses; the executor needed its own copy because expression
1516            // eval runs on a different code path.
1517            if ident.quote_style == Some('[') {
1518                let raw = format!("[{}]", ident.value);
1519                let v = parse_vector_literal(&raw)?;
1520                return Ok(Value::Vector(v));
1521            }
1522            Ok(table.get_value(&ident.value, rowid).unwrap_or(Value::Null))
1523        }
1524
1525        Expr::CompoundIdentifier(parts) => {
1526            // Accept `table.col` — we only have one table in scope, so ignore the qualifier.
1527            let col = parts
1528                .last()
1529                .map(|i| i.value.as_str())
1530                .ok_or_else(|| SQLRiteError::Internal("empty compound identifier".to_string()))?;
1531            Ok(table.get_value(col, rowid).unwrap_or(Value::Null))
1532        }
1533
1534        Expr::Value(v) => convert_literal(&v.value),
1535
1536        Expr::UnaryOp { op, expr } => {
1537            let inner = eval_expr(expr, table, rowid)?;
1538            match op {
1539                UnaryOperator::Not => match inner {
1540                    Value::Bool(b) => Ok(Value::Bool(!b)),
1541                    Value::Null => Ok(Value::Null),
1542                    other => Err(SQLRiteError::Internal(format!(
1543                        "NOT applied to non-boolean value: {}",
1544                        other.to_display_string()
1545                    ))),
1546                },
1547                UnaryOperator::Minus => match inner {
1548                    Value::Integer(i) => Ok(Value::Integer(-i)),
1549                    Value::Real(f) => Ok(Value::Real(-f)),
1550                    Value::Null => Ok(Value::Null),
1551                    other => Err(SQLRiteError::Internal(format!(
1552                        "unary minus on non-numeric value: {}",
1553                        other.to_display_string()
1554                    ))),
1555                },
1556                UnaryOperator::Plus => Ok(inner),
1557                other => Err(SQLRiteError::NotImplemented(format!(
1558                    "unary operator {other:?} is not supported"
1559                ))),
1560            }
1561        }
1562
1563        Expr::BinaryOp { left, op, right } => match op {
1564            BinaryOperator::And => {
1565                let l = eval_expr(left, table, rowid)?;
1566                let r = eval_expr(right, table, rowid)?;
1567                Ok(Value::Bool(as_bool(&l)? && as_bool(&r)?))
1568            }
1569            BinaryOperator::Or => {
1570                let l = eval_expr(left, table, rowid)?;
1571                let r = eval_expr(right, table, rowid)?;
1572                Ok(Value::Bool(as_bool(&l)? || as_bool(&r)?))
1573            }
1574            cmp @ (BinaryOperator::Eq
1575            | BinaryOperator::NotEq
1576            | BinaryOperator::Lt
1577            | BinaryOperator::LtEq
1578            | BinaryOperator::Gt
1579            | BinaryOperator::GtEq) => {
1580                let l = eval_expr(left, table, rowid)?;
1581                let r = eval_expr(right, table, rowid)?;
1582                // Any comparison involving NULL is unknown → false in a WHERE.
1583                if matches!(l, Value::Null) || matches!(r, Value::Null) {
1584                    return Ok(Value::Bool(false));
1585                }
1586                let ord = compare_values(Some(&l), Some(&r));
1587                let result = match cmp {
1588                    BinaryOperator::Eq => ord == Ordering::Equal,
1589                    BinaryOperator::NotEq => ord != Ordering::Equal,
1590                    BinaryOperator::Lt => ord == Ordering::Less,
1591                    BinaryOperator::LtEq => ord != Ordering::Greater,
1592                    BinaryOperator::Gt => ord == Ordering::Greater,
1593                    BinaryOperator::GtEq => ord != Ordering::Less,
1594                    _ => unreachable!(),
1595                };
1596                Ok(Value::Bool(result))
1597            }
1598            arith @ (BinaryOperator::Plus
1599            | BinaryOperator::Minus
1600            | BinaryOperator::Multiply
1601            | BinaryOperator::Divide
1602            | BinaryOperator::Modulo) => {
1603                let l = eval_expr(left, table, rowid)?;
1604                let r = eval_expr(right, table, rowid)?;
1605                eval_arith(arith, &l, &r)
1606            }
1607            BinaryOperator::StringConcat => {
1608                let l = eval_expr(left, table, rowid)?;
1609                let r = eval_expr(right, table, rowid)?;
1610                if matches!(l, Value::Null) || matches!(r, Value::Null) {
1611                    return Ok(Value::Null);
1612                }
1613                Ok(Value::Text(format!(
1614                    "{}{}",
1615                    l.to_display_string(),
1616                    r.to_display_string()
1617                )))
1618            }
1619            other => Err(SQLRiteError::NotImplemented(format!(
1620                "binary operator {other:?} is not supported yet"
1621            ))),
1622        },
1623
1624        // Phase 7b — function-call dispatch. Currently only the three
1625        // vector-distance functions; this match arm becomes the single
1626        // place to register more SQL functions later (e.g. abs(),
1627        // length(), …) without re-touching the rest of the executor.
1628        //
1629        // Operator forms (`<->` `<=>` `<#>`) are NOT plumbed here: two
1630        // of three don't parse natively in sqlparser (we'd need a
1631        // string-preprocessing pass or a sqlparser fork). Deferred to
1632        // a follow-up sub-phase; see docs/phase-7-plan.md's "Scope
1633        // corrections" note.
1634        Expr::Function(func) => eval_function(func, table, rowid),
1635
1636        other => Err(SQLRiteError::NotImplemented(format!(
1637            "unsupported expression in WHERE/projection: {other:?}"
1638        ))),
1639    }
1640}
1641
1642/// Dispatches an `Expr::Function` to its built-in implementation.
1643/// Currently only the three vec_distance_* functions; other functions
1644/// surface as `NotImplemented` errors with the function name in the
1645/// message so users see what they tried.
1646fn eval_function(func: &sqlparser::ast::Function, table: &Table, rowid: i64) -> Result<Value> {
1647    // Function name lives in `name.0[0]` for unqualified calls. Anything
1648    // qualified (e.g. `pkg.fn(...)`) falls through to NotImplemented.
1649    let name = match func.name.0.as_slice() {
1650        [ObjectNamePart::Identifier(ident)] => ident.value.to_lowercase(),
1651        _ => {
1652            return Err(SQLRiteError::NotImplemented(format!(
1653                "qualified function names not supported: {:?}",
1654                func.name
1655            )));
1656        }
1657    };
1658
1659    match name.as_str() {
1660        "vec_distance_l2" | "vec_distance_cosine" | "vec_distance_dot" => {
1661            let (a, b) = extract_two_vector_args(&name, &func.args, table, rowid)?;
1662            let dist = match name.as_str() {
1663                "vec_distance_l2" => vec_distance_l2(&a, &b),
1664                "vec_distance_cosine" => vec_distance_cosine(&a, &b)?,
1665                "vec_distance_dot" => vec_distance_dot(&a, &b),
1666                _ => unreachable!(),
1667            };
1668            // Widen f32 → f64 for the runtime Value. Vectors are stored
1669            // as f32 (consistent with industry convention for embeddings),
1670            // but the executor's numeric type is f64 so distances slot
1671            // into Value::Real cleanly and can be compared / ordered with
1672            // other reals via the existing arithmetic + comparison paths.
1673            Ok(Value::Real(dist as f64))
1674        }
1675        // Phase 7e — JSON functions. All four parse the JSON text on
1676        // demand (we don't cache parsed values), then resolve a path
1677        // (default `$` = root). The path resolver handles `.key` for
1678        // object access and `[N]` for array index. SQLite-style.
1679        "json_extract" => json_fn_extract(&name, &func.args, table, rowid),
1680        "json_type" => json_fn_type(&name, &func.args, table, rowid),
1681        "json_array_length" => json_fn_array_length(&name, &func.args, table, rowid),
1682        "json_object_keys" => json_fn_object_keys(&name, &func.args, table, rowid),
1683        // Phase 8b — FTS scalars. Both consult an FTS index attached to
1684        // the named column; both error if no index exists (the index is
1685        // a hard prerequisite, mirroring SQLite FTS5's MATCH).
1686        "fts_match" => {
1687            let (entry, query) = resolve_fts_args(&name, &func.args, table, rowid)?;
1688            Ok(Value::Bool(entry.index.matches(rowid, &query)))
1689        }
1690        "bm25_score" => {
1691            let (entry, query) = resolve_fts_args(&name, &func.args, table, rowid)?;
1692            let s = entry.index.score(rowid, &query, &Bm25Params::default());
1693            Ok(Value::Real(s))
1694        }
1695        other => Err(SQLRiteError::NotImplemented(format!(
1696            "unknown function: {other}(...)"
1697        ))),
1698    }
1699}
1700
1701/// Helper for `fts_match` / `bm25_score`: pull the column reference out
1702/// of arg 0 (a bare identifier — we need the *name*, not the per-row
1703/// value), evaluate arg 1 as a Text query string, and look up the FTS
1704/// index attached to that column. Errors if any step fails.
1705fn resolve_fts_args<'t>(
1706    fn_name: &str,
1707    args: &FunctionArguments,
1708    table: &'t Table,
1709    rowid: i64,
1710) -> Result<(&'t FtsIndexEntry, String)> {
1711    let arg_list = match args {
1712        FunctionArguments::List(l) => &l.args,
1713        _ => {
1714            return Err(SQLRiteError::General(format!(
1715                "{fn_name}() expects exactly two arguments: (column, query_text)"
1716            )));
1717        }
1718    };
1719    if arg_list.len() != 2 {
1720        return Err(SQLRiteError::General(format!(
1721            "{fn_name}() expects exactly 2 arguments, got {}",
1722            arg_list.len()
1723        )));
1724    }
1725
1726    // Arg 0: bare column identifier. Must resolve syntactically to a
1727    // column name (we can't accept arbitrary expressions because we
1728    // need the column to look up the index, not the column's value).
1729    let col_expr = match &arg_list[0] {
1730        FunctionArg::Unnamed(FunctionArgExpr::Expr(e)) => e,
1731        other => {
1732            return Err(SQLRiteError::NotImplemented(format!(
1733                "{fn_name}() argument 0 must be a column name, got {other:?}"
1734            )));
1735        }
1736    };
1737    let col_name = match col_expr {
1738        Expr::Identifier(ident) => ident.value.clone(),
1739        Expr::CompoundIdentifier(parts) => parts
1740            .last()
1741            .map(|p| p.value.clone())
1742            .ok_or_else(|| SQLRiteError::Internal("empty compound identifier".to_string()))?,
1743        other => {
1744            return Err(SQLRiteError::General(format!(
1745                "{fn_name}() argument 0 must be a column reference, got {other:?}"
1746            )));
1747        }
1748    };
1749
1750    // Arg 1: query string. Evaluated through the normal expression
1751    // pipeline so callers can pass a literal `'rust db'` or an
1752    // expression that yields TEXT.
1753    let q_expr = match &arg_list[1] {
1754        FunctionArg::Unnamed(FunctionArgExpr::Expr(e)) => e,
1755        other => {
1756            return Err(SQLRiteError::NotImplemented(format!(
1757                "{fn_name}() argument 1 must be a text expression, got {other:?}"
1758            )));
1759        }
1760    };
1761    let query = match eval_expr(q_expr, table, rowid)? {
1762        Value::Text(s) => s,
1763        other => {
1764            return Err(SQLRiteError::General(format!(
1765                "{fn_name}() argument 1 must be TEXT, got {}",
1766                other.to_display_string()
1767            )));
1768        }
1769    };
1770
1771    let entry = table
1772        .fts_indexes
1773        .iter()
1774        .find(|e| e.column_name == col_name)
1775        .ok_or_else(|| {
1776            SQLRiteError::General(format!(
1777                "{fn_name}({col_name}, ...): no FTS index on column '{col_name}' \
1778                 (run CREATE INDEX <name> ON <table> USING fts({col_name}) first)"
1779            ))
1780        })?;
1781    Ok((entry, query))
1782}
1783
1784// -----------------------------------------------------------------
1785// Phase 7e — JSON path-extraction functions
1786// -----------------------------------------------------------------
1787
1788/// Extracts the JSON-typed text + optional path string out of a
1789/// function call's args. Used by all four json_* functions.
1790///
1791/// Arity rules (matching SQLite JSON1):
1792///   - 1 arg  → JSON value, path defaults to `$` (root)
1793///   - 2 args → (JSON value, path text)
1794///
1795/// Returns `(json_text, path)` so caller can serde_json::from_str
1796/// + walk_json_path on it.
1797fn extract_json_and_path(
1798    fn_name: &str,
1799    args: &FunctionArguments,
1800    table: &Table,
1801    rowid: i64,
1802) -> Result<(String, String)> {
1803    let arg_list = match args {
1804        FunctionArguments::List(l) => &l.args,
1805        _ => {
1806            return Err(SQLRiteError::General(format!(
1807                "{fn_name}() expects 1 or 2 arguments"
1808            )));
1809        }
1810    };
1811    if !(arg_list.len() == 1 || arg_list.len() == 2) {
1812        return Err(SQLRiteError::General(format!(
1813            "{fn_name}() expects 1 or 2 arguments, got {}",
1814            arg_list.len()
1815        )));
1816    }
1817    // Evaluate first arg → must produce text.
1818    let first_expr = match &arg_list[0] {
1819        FunctionArg::Unnamed(FunctionArgExpr::Expr(e)) => e,
1820        other => {
1821            return Err(SQLRiteError::NotImplemented(format!(
1822                "{fn_name}() argument 0 has unsupported shape: {other:?}"
1823            )));
1824        }
1825    };
1826    let json_text = match eval_expr(first_expr, table, rowid)? {
1827        Value::Text(s) => s,
1828        Value::Null => {
1829            return Err(SQLRiteError::General(format!(
1830                "{fn_name}() called on NULL — JSON column has no value for this row"
1831            )));
1832        }
1833        other => {
1834            return Err(SQLRiteError::General(format!(
1835                "{fn_name}() argument 0 is not JSON-typed: got {}",
1836                other.to_display_string()
1837            )));
1838        }
1839    };
1840
1841    // Path defaults to root `$` when omitted.
1842    let path = if arg_list.len() == 2 {
1843        let path_expr = match &arg_list[1] {
1844            FunctionArg::Unnamed(FunctionArgExpr::Expr(e)) => e,
1845            other => {
1846                return Err(SQLRiteError::NotImplemented(format!(
1847                    "{fn_name}() argument 1 has unsupported shape: {other:?}"
1848                )));
1849            }
1850        };
1851        match eval_expr(path_expr, table, rowid)? {
1852            Value::Text(s) => s,
1853            other => {
1854                return Err(SQLRiteError::General(format!(
1855                    "{fn_name}() path argument must be a string literal, got {}",
1856                    other.to_display_string()
1857                )));
1858            }
1859        }
1860    } else {
1861        "$".to_string()
1862    };
1863
1864    Ok((json_text, path))
1865}
1866
1867/// Walks a `serde_json::Value` along a JSONPath subset:
1868///   - `$` is the root
1869///   - `.key` for object access (key may not contain `.` or `[`)
1870///   - `[N]` for array index (N a non-negative integer)
1871///   - chains arbitrarily: `$.foo.bar[0].baz`
1872///
1873/// Returns `Ok(None)` for "path didn't match anything" (NULL in SQL),
1874/// `Err` for malformed paths. Matches SQLite JSON1's semantic
1875/// distinction: missing-key = NULL, malformed-path = error.
1876fn walk_json_path<'a>(
1877    value: &'a serde_json::Value,
1878    path: &str,
1879) -> Result<Option<&'a serde_json::Value>> {
1880    let mut chars = path.chars().peekable();
1881    if chars.next() != Some('$') {
1882        return Err(SQLRiteError::General(format!(
1883            "JSON path must start with '$', got `{path}`"
1884        )));
1885    }
1886    let mut current = value;
1887    while let Some(&c) = chars.peek() {
1888        match c {
1889            '.' => {
1890                chars.next();
1891                let mut key = String::new();
1892                while let Some(&c) = chars.peek() {
1893                    if c == '.' || c == '[' {
1894                        break;
1895                    }
1896                    key.push(c);
1897                    chars.next();
1898                }
1899                if key.is_empty() {
1900                    return Err(SQLRiteError::General(format!(
1901                        "JSON path has empty key after '.' in `{path}`"
1902                    )));
1903                }
1904                match current.get(&key) {
1905                    Some(v) => current = v,
1906                    None => return Ok(None),
1907                }
1908            }
1909            '[' => {
1910                chars.next();
1911                let mut idx_str = String::new();
1912                while let Some(&c) = chars.peek() {
1913                    if c == ']' {
1914                        break;
1915                    }
1916                    idx_str.push(c);
1917                    chars.next();
1918                }
1919                if chars.next() != Some(']') {
1920                    return Err(SQLRiteError::General(format!(
1921                        "JSON path has unclosed `[` in `{path}`"
1922                    )));
1923                }
1924                let idx: usize = idx_str.trim().parse().map_err(|_| {
1925                    SQLRiteError::General(format!(
1926                        "JSON path has non-integer index `[{idx_str}]` in `{path}`"
1927                    ))
1928                })?;
1929                match current.get(idx) {
1930                    Some(v) => current = v,
1931                    None => return Ok(None),
1932                }
1933            }
1934            other => {
1935                return Err(SQLRiteError::General(format!(
1936                    "JSON path has unexpected character `{other}` in `{path}` \
1937                     (expected `.`, `[`, or end-of-path)"
1938                )));
1939            }
1940        }
1941    }
1942    Ok(Some(current))
1943}
1944
1945/// Converts a serde_json scalar to a SQLRite Value. For composite
1946/// types (object, array) returns the JSON-encoded text — callers
1947/// pattern-match on shape from the calling json_* function.
1948fn json_value_to_sql(v: &serde_json::Value) -> Value {
1949    match v {
1950        serde_json::Value::Null => Value::Null,
1951        serde_json::Value::Bool(b) => Value::Bool(*b),
1952        serde_json::Value::Number(n) => {
1953            // Match SQLite: integer if it fits an i64, else f64.
1954            if let Some(i) = n.as_i64() {
1955                Value::Integer(i)
1956            } else if let Some(f) = n.as_f64() {
1957                Value::Real(f)
1958            } else {
1959                Value::Null
1960            }
1961        }
1962        serde_json::Value::String(s) => Value::Text(s.clone()),
1963        // Objects + arrays come out as JSON-encoded text. Same as
1964        // SQLite's json_extract: composite results round-trip through
1965        // text rather than being modeled as a richer Value type.
1966        composite => Value::Text(composite.to_string()),
1967    }
1968}
1969
1970fn json_fn_extract(
1971    name: &str,
1972    args: &FunctionArguments,
1973    table: &Table,
1974    rowid: i64,
1975) -> Result<Value> {
1976    let (json_text, path) = extract_json_and_path(name, args, table, rowid)?;
1977    let parsed: serde_json::Value = serde_json::from_str(&json_text).map_err(|e| {
1978        SQLRiteError::General(format!("{name}() got invalid JSON `{json_text}`: {e}"))
1979    })?;
1980    match walk_json_path(&parsed, &path)? {
1981        Some(v) => Ok(json_value_to_sql(v)),
1982        None => Ok(Value::Null),
1983    }
1984}
1985
1986fn json_fn_type(name: &str, args: &FunctionArguments, table: &Table, rowid: i64) -> Result<Value> {
1987    let (json_text, path) = extract_json_and_path(name, args, table, rowid)?;
1988    let parsed: serde_json::Value = serde_json::from_str(&json_text).map_err(|e| {
1989        SQLRiteError::General(format!("{name}() got invalid JSON `{json_text}`: {e}"))
1990    })?;
1991    let resolved = match walk_json_path(&parsed, &path)? {
1992        Some(v) => v,
1993        None => return Ok(Value::Null),
1994    };
1995    let ty = match resolved {
1996        serde_json::Value::Null => "null",
1997        serde_json::Value::Bool(true) => "true",
1998        serde_json::Value::Bool(false) => "false",
1999        serde_json::Value::Number(n) => {
2000            if n.is_i64() || n.is_u64() {
2001                "integer"
2002            } else {
2003                "real"
2004            }
2005        }
2006        serde_json::Value::String(_) => "text",
2007        serde_json::Value::Array(_) => "array",
2008        serde_json::Value::Object(_) => "object",
2009    };
2010    Ok(Value::Text(ty.to_string()))
2011}
2012
2013fn json_fn_array_length(
2014    name: &str,
2015    args: &FunctionArguments,
2016    table: &Table,
2017    rowid: i64,
2018) -> Result<Value> {
2019    let (json_text, path) = extract_json_and_path(name, args, table, rowid)?;
2020    let parsed: serde_json::Value = serde_json::from_str(&json_text).map_err(|e| {
2021        SQLRiteError::General(format!("{name}() got invalid JSON `{json_text}`: {e}"))
2022    })?;
2023    let resolved = match walk_json_path(&parsed, &path)? {
2024        Some(v) => v,
2025        None => return Ok(Value::Null),
2026    };
2027    match resolved.as_array() {
2028        Some(arr) => Ok(Value::Integer(arr.len() as i64)),
2029        None => Err(SQLRiteError::General(format!(
2030            "{name}() resolved to a non-array value at path `{path}`"
2031        ))),
2032    }
2033}
2034
2035fn json_fn_object_keys(
2036    name: &str,
2037    args: &FunctionArguments,
2038    table: &Table,
2039    rowid: i64,
2040) -> Result<Value> {
2041    let (json_text, path) = extract_json_and_path(name, args, table, rowid)?;
2042    let parsed: serde_json::Value = serde_json::from_str(&json_text).map_err(|e| {
2043        SQLRiteError::General(format!("{name}() got invalid JSON `{json_text}`: {e}"))
2044    })?;
2045    let resolved = match walk_json_path(&parsed, &path)? {
2046        Some(v) => v,
2047        None => return Ok(Value::Null),
2048    };
2049    let obj = resolved.as_object().ok_or_else(|| {
2050        SQLRiteError::General(format!(
2051            "{name}() resolved to a non-object value at path `{path}`"
2052        ))
2053    })?;
2054    // SQLite's json_object_keys is a table-valued function (one row
2055    // per key). Without set-returning function support we can't
2056    // reproduce that shape; instead return the keys as a JSON array
2057    // text. Caller can iterate via json_array_length + json_extract,
2058    // or just treat it as a serialized list. Document this divergence
2059    // in supported-sql.md.
2060    let keys: Vec<serde_json::Value> = obj
2061        .keys()
2062        .map(|k| serde_json::Value::String(k.clone()))
2063        .collect();
2064    Ok(Value::Text(serde_json::Value::Array(keys).to_string()))
2065}
2066
2067/// Extracts exactly two `Vec<f32>` arguments from a function call,
2068/// validating arity and that both sides are Vector-typed with matching
2069/// dimensions. Used by all three vec_distance_* functions.
2070fn extract_two_vector_args(
2071    fn_name: &str,
2072    args: &FunctionArguments,
2073    table: &Table,
2074    rowid: i64,
2075) -> Result<(Vec<f32>, Vec<f32>)> {
2076    let arg_list = match args {
2077        FunctionArguments::List(l) => &l.args,
2078        _ => {
2079            return Err(SQLRiteError::General(format!(
2080                "{fn_name}() expects exactly two vector arguments"
2081            )));
2082        }
2083    };
2084    if arg_list.len() != 2 {
2085        return Err(SQLRiteError::General(format!(
2086            "{fn_name}() expects exactly 2 arguments, got {}",
2087            arg_list.len()
2088        )));
2089    }
2090    let mut out: Vec<Vec<f32>> = Vec::with_capacity(2);
2091    for (i, arg) in arg_list.iter().enumerate() {
2092        let expr = match arg {
2093            FunctionArg::Unnamed(FunctionArgExpr::Expr(e)) => e,
2094            other => {
2095                return Err(SQLRiteError::NotImplemented(format!(
2096                    "{fn_name}() argument {i} has unsupported shape: {other:?}"
2097                )));
2098            }
2099        };
2100        let val = eval_expr(expr, table, rowid)?;
2101        match val {
2102            Value::Vector(v) => out.push(v),
2103            other => {
2104                return Err(SQLRiteError::General(format!(
2105                    "{fn_name}() argument {i} is not a vector: got {}",
2106                    other.to_display_string()
2107                )));
2108            }
2109        }
2110    }
2111    let b = out.pop().unwrap();
2112    let a = out.pop().unwrap();
2113    if a.len() != b.len() {
2114        return Err(SQLRiteError::General(format!(
2115            "{fn_name}(): vector dimensions don't match (lhs={}, rhs={})",
2116            a.len(),
2117            b.len()
2118        )));
2119    }
2120    Ok((a, b))
2121}
2122
2123/// Euclidean (L2) distance: √Σ(aᵢ − bᵢ)².
2124/// Smaller-is-closer; identical vectors return 0.0.
2125pub(crate) fn vec_distance_l2(a: &[f32], b: &[f32]) -> f32 {
2126    debug_assert_eq!(a.len(), b.len());
2127    let mut sum = 0.0f32;
2128    for i in 0..a.len() {
2129        let d = a[i] - b[i];
2130        sum += d * d;
2131    }
2132    sum.sqrt()
2133}
2134
2135/// Cosine distance: 1 − (a·b) / (‖a‖·‖b‖).
2136/// Smaller-is-closer; identical (non-zero) vectors return 0.0,
2137/// orthogonal vectors return 1.0, opposite-direction vectors return 2.0.
2138///
2139/// Errors if either vector has zero magnitude — cosine similarity is
2140/// undefined for the zero vector and silently returning NaN would
2141/// poison `ORDER BY` ranking. Callers who want the silent-NaN
2142/// behavior can compute `vec_distance_dot(a, b) / (norm(a) * norm(b))`
2143/// themselves.
2144pub(crate) fn vec_distance_cosine(a: &[f32], b: &[f32]) -> Result<f32> {
2145    debug_assert_eq!(a.len(), b.len());
2146    let mut dot = 0.0f32;
2147    let mut norm_a_sq = 0.0f32;
2148    let mut norm_b_sq = 0.0f32;
2149    for i in 0..a.len() {
2150        dot += a[i] * b[i];
2151        norm_a_sq += a[i] * a[i];
2152        norm_b_sq += b[i] * b[i];
2153    }
2154    let denom = (norm_a_sq * norm_b_sq).sqrt();
2155    if denom == 0.0 {
2156        return Err(SQLRiteError::General(
2157            "vec_distance_cosine() is undefined for zero-magnitude vectors".to_string(),
2158        ));
2159    }
2160    Ok(1.0 - dot / denom)
2161}
2162
2163/// Negated dot product: −(a·b).
2164/// pgvector convention — negated so smaller-is-closer like L2 / cosine.
2165/// For unit-norm vectors `vec_distance_dot(a, b) == vec_distance_cosine(a, b) - 1`.
2166pub(crate) fn vec_distance_dot(a: &[f32], b: &[f32]) -> f32 {
2167    debug_assert_eq!(a.len(), b.len());
2168    let mut dot = 0.0f32;
2169    for i in 0..a.len() {
2170        dot += a[i] * b[i];
2171    }
2172    -dot
2173}
2174
2175/// Evaluates an integer/real arithmetic op. NULL on either side propagates.
2176/// Mixed Integer/Real promotes to Real. Divide/Modulo by zero → error.
2177fn eval_arith(op: &BinaryOperator, l: &Value, r: &Value) -> Result<Value> {
2178    if matches!(l, Value::Null) || matches!(r, Value::Null) {
2179        return Ok(Value::Null);
2180    }
2181    match (l, r) {
2182        (Value::Integer(a), Value::Integer(b)) => match op {
2183            BinaryOperator::Plus => Ok(Value::Integer(a.wrapping_add(*b))),
2184            BinaryOperator::Minus => Ok(Value::Integer(a.wrapping_sub(*b))),
2185            BinaryOperator::Multiply => Ok(Value::Integer(a.wrapping_mul(*b))),
2186            BinaryOperator::Divide => {
2187                if *b == 0 {
2188                    Err(SQLRiteError::General("division by zero".to_string()))
2189                } else {
2190                    Ok(Value::Integer(a / b))
2191                }
2192            }
2193            BinaryOperator::Modulo => {
2194                if *b == 0 {
2195                    Err(SQLRiteError::General("modulo by zero".to_string()))
2196                } else {
2197                    Ok(Value::Integer(a % b))
2198                }
2199            }
2200            _ => unreachable!(),
2201        },
2202        // Anything involving a Real promotes both sides to f64.
2203        (a, b) => {
2204            let af = as_number(a)?;
2205            let bf = as_number(b)?;
2206            match op {
2207                BinaryOperator::Plus => Ok(Value::Real(af + bf)),
2208                BinaryOperator::Minus => Ok(Value::Real(af - bf)),
2209                BinaryOperator::Multiply => Ok(Value::Real(af * bf)),
2210                BinaryOperator::Divide => {
2211                    if bf == 0.0 {
2212                        Err(SQLRiteError::General("division by zero".to_string()))
2213                    } else {
2214                        Ok(Value::Real(af / bf))
2215                    }
2216                }
2217                BinaryOperator::Modulo => {
2218                    if bf == 0.0 {
2219                        Err(SQLRiteError::General("modulo by zero".to_string()))
2220                    } else {
2221                        Ok(Value::Real(af % bf))
2222                    }
2223                }
2224                _ => unreachable!(),
2225            }
2226        }
2227    }
2228}
2229
2230fn as_number(v: &Value) -> Result<f64> {
2231    match v {
2232        Value::Integer(i) => Ok(*i as f64),
2233        Value::Real(f) => Ok(*f),
2234        Value::Bool(b) => Ok(if *b { 1.0 } else { 0.0 }),
2235        other => Err(SQLRiteError::General(format!(
2236            "arithmetic on non-numeric value '{}'",
2237            other.to_display_string()
2238        ))),
2239    }
2240}
2241
2242fn as_bool(v: &Value) -> Result<bool> {
2243    match v {
2244        Value::Bool(b) => Ok(*b),
2245        Value::Null => Ok(false),
2246        Value::Integer(i) => Ok(*i != 0),
2247        other => Err(SQLRiteError::Internal(format!(
2248            "expected boolean, got {}",
2249            other.to_display_string()
2250        ))),
2251    }
2252}
2253
2254fn convert_literal(v: &sqlparser::ast::Value) -> Result<Value> {
2255    use sqlparser::ast::Value as AstValue;
2256    match v {
2257        AstValue::Number(n, _) => {
2258            if let Ok(i) = n.parse::<i64>() {
2259                Ok(Value::Integer(i))
2260            } else if let Ok(f) = n.parse::<f64>() {
2261                Ok(Value::Real(f))
2262            } else {
2263                Err(SQLRiteError::Internal(format!(
2264                    "could not parse numeric literal '{n}'"
2265                )))
2266            }
2267        }
2268        AstValue::SingleQuotedString(s) => Ok(Value::Text(s.clone())),
2269        AstValue::Boolean(b) => Ok(Value::Bool(*b)),
2270        AstValue::Null => Ok(Value::Null),
2271        other => Err(SQLRiteError::NotImplemented(format!(
2272            "unsupported literal value: {other:?}"
2273        ))),
2274    }
2275}
2276
2277#[cfg(test)]
2278mod tests {
2279    use super::*;
2280
2281    // -----------------------------------------------------------------
2282    // Phase 7b — Vector distance function math
2283    // -----------------------------------------------------------------
2284
2285    /// Float comparison helper — distance results need a small epsilon
2286    /// because we accumulate sums across many f32 multiplies.
2287    fn approx_eq(a: f32, b: f32, eps: f32) -> bool {
2288        (a - b).abs() < eps
2289    }
2290
2291    #[test]
2292    fn vec_distance_l2_identical_is_zero() {
2293        let v = vec![0.1, 0.2, 0.3];
2294        assert_eq!(vec_distance_l2(&v, &v), 0.0);
2295    }
2296
2297    #[test]
2298    fn vec_distance_l2_unit_basis_is_sqrt2() {
2299        // [1, 0] vs [0, 1]: distance = √((1-0)² + (0-1)²) = √2 ≈ 1.414
2300        let a = vec![1.0, 0.0];
2301        let b = vec![0.0, 1.0];
2302        assert!(approx_eq(vec_distance_l2(&a, &b), 2.0_f32.sqrt(), 1e-6));
2303    }
2304
2305    #[test]
2306    fn vec_distance_l2_known_value() {
2307        // [0, 0, 0] vs [3, 4, 0]: √(9 + 16 + 0) = 5 (the classic 3-4-5 triangle).
2308        let a = vec![0.0, 0.0, 0.0];
2309        let b = vec![3.0, 4.0, 0.0];
2310        assert!(approx_eq(vec_distance_l2(&a, &b), 5.0, 1e-6));
2311    }
2312
2313    #[test]
2314    fn vec_distance_cosine_identical_is_zero() {
2315        let v = vec![0.1, 0.2, 0.3];
2316        let d = vec_distance_cosine(&v, &v).unwrap();
2317        assert!(approx_eq(d, 0.0, 1e-6), "cos(v,v) = {d}, expected ≈ 0");
2318    }
2319
2320    #[test]
2321    fn vec_distance_cosine_orthogonal_is_one() {
2322        // Two orthogonal unit vectors should have cosine distance = 1.0
2323        // (cosine similarity = 0 → distance = 1 - 0 = 1).
2324        let a = vec![1.0, 0.0];
2325        let b = vec![0.0, 1.0];
2326        assert!(approx_eq(vec_distance_cosine(&a, &b).unwrap(), 1.0, 1e-6));
2327    }
2328
2329    #[test]
2330    fn vec_distance_cosine_opposite_is_two() {
2331        // a and -a have cosine similarity = -1 → distance = 1 - (-1) = 2.
2332        let a = vec![1.0, 0.0, 0.0];
2333        let b = vec![-1.0, 0.0, 0.0];
2334        assert!(approx_eq(vec_distance_cosine(&a, &b).unwrap(), 2.0, 1e-6));
2335    }
2336
2337    #[test]
2338    fn vec_distance_cosine_zero_magnitude_errors() {
2339        // Cosine is undefined for the zero vector — error rather than NaN.
2340        let a = vec![0.0, 0.0];
2341        let b = vec![1.0, 0.0];
2342        let err = vec_distance_cosine(&a, &b).unwrap_err();
2343        assert!(format!("{err}").contains("zero-magnitude"));
2344    }
2345
2346    #[test]
2347    fn vec_distance_dot_negates() {
2348        // a·b = 1*4 + 2*5 + 3*6 = 32. Negated → -32.
2349        let a = vec![1.0, 2.0, 3.0];
2350        let b = vec![4.0, 5.0, 6.0];
2351        assert!(approx_eq(vec_distance_dot(&a, &b), -32.0, 1e-6));
2352    }
2353
2354    #[test]
2355    fn vec_distance_dot_orthogonal_is_zero() {
2356        // Orthogonal vectors have dot product 0 → negated is also 0.
2357        let a = vec![1.0, 0.0];
2358        let b = vec![0.0, 1.0];
2359        assert_eq!(vec_distance_dot(&a, &b), 0.0);
2360    }
2361
2362    #[test]
2363    fn vec_distance_dot_unit_norm_matches_cosine_minus_one() {
2364        // For unit-norm vectors: dot(a,b) = cos(a,b)
2365        // → -dot(a,b) = -cos(a,b) = (1 - cos(a,b)) - 1 = vec_distance_cosine(a,b) - 1.
2366        // Useful sanity check that the two functions agree on unit vectors.
2367        let a = vec![0.6f32, 0.8]; // unit norm: √(0.36+0.64) = 1
2368        let b = vec![0.8f32, 0.6]; // unit norm too
2369        let dot = vec_distance_dot(&a, &b);
2370        let cos = vec_distance_cosine(&a, &b).unwrap();
2371        assert!(approx_eq(dot, cos - 1.0, 1e-5));
2372    }
2373
2374    // -----------------------------------------------------------------
2375    // Phase 7c — bounded-heap top-k correctness + benchmark
2376    // -----------------------------------------------------------------
2377
2378    use crate::sql::db::database::Database;
2379    use crate::sql::parser::select::SelectQuery;
2380    use sqlparser::dialect::SQLiteDialect;
2381    use sqlparser::parser::Parser;
2382
2383    /// Builds a `docs(id INTEGER PK, score REAL)` table with N rows of
2384    /// distinct positive scores so top-k tests aren't sensitive to
2385    /// tie-breaking (heap is unstable; full-sort is stable; we want
2386    /// both to agree without arguing about equal-score row order).
2387    ///
2388    /// **Why positive scores:** the INSERT parser doesn't currently
2389    /// handle `Expr::UnaryOp(Minus, …)` for negative number literals
2390    /// (it would parse `-3.14` as a unary expression and the value
2391    /// extractor would skip it). That's a pre-existing bug, out of
2392    /// scope for 7c. Using the Knuth multiplicative hash gives us
2393    /// distinct positive scrambled values without dancing around the
2394    /// negative-literal limitation.
2395    fn seed_score_table(n: usize) -> Database {
2396        let mut db = Database::new("tempdb".to_string());
2397        crate::sql::process_command(
2398            "CREATE TABLE docs (id INTEGER PRIMARY KEY, score REAL);",
2399            &mut db,
2400        )
2401        .expect("create");
2402        for i in 0..n {
2403            // Knuth multiplicative hash mod 1_000_000 — distinct,
2404            // dense in [0, 999_999], no collisions for n up to ~tens
2405            // of thousands.
2406            let score = ((i as u64).wrapping_mul(2_654_435_761) % 1_000_000) as f64;
2407            let sql = format!("INSERT INTO docs (score) VALUES ({score});");
2408            crate::sql::process_command(&sql, &mut db).expect("insert");
2409        }
2410        db
2411    }
2412
2413    /// Helper: parses an SQL SELECT into a SelectQuery so we can drive
2414    /// `select_topk` / `sort_rowids` directly without the rest of the
2415    /// process_command pipeline.
2416    fn parse_select(sql: &str) -> SelectQuery {
2417        let dialect = SQLiteDialect {};
2418        let mut ast = Parser::parse_sql(&dialect, sql).expect("parse");
2419        let stmt = ast.pop().expect("one statement");
2420        SelectQuery::new(&stmt).expect("select-query")
2421    }
2422
2423    #[test]
2424    fn topk_matches_full_sort_asc() {
2425        // Build N=200, top-k=10. Bounded heap output must equal
2426        // full-sort-then-truncate output (both produce ASC order).
2427        let db = seed_score_table(200);
2428        let table = db.get_table("docs".to_string()).unwrap();
2429        let q = parse_select("SELECT * FROM docs ORDER BY score ASC LIMIT 10;");
2430        let order = q.order_by.as_ref().unwrap();
2431        let all_rowids = table.rowids();
2432
2433        // Full-sort path
2434        let mut full = all_rowids.clone();
2435        sort_rowids(&mut full, table, order).unwrap();
2436        full.truncate(10);
2437
2438        // Bounded-heap path
2439        let topk = select_topk(&all_rowids, table, order, 10).unwrap();
2440
2441        assert_eq!(topk, full, "top-k via heap should match full-sort+truncate");
2442    }
2443
2444    #[test]
2445    fn topk_matches_full_sort_desc() {
2446        // Same with DESC — verifies the direction-aware Ord wrapper.
2447        let db = seed_score_table(200);
2448        let table = db.get_table("docs".to_string()).unwrap();
2449        let q = parse_select("SELECT * FROM docs ORDER BY score DESC LIMIT 10;");
2450        let order = q.order_by.as_ref().unwrap();
2451        let all_rowids = table.rowids();
2452
2453        let mut full = all_rowids.clone();
2454        sort_rowids(&mut full, table, order).unwrap();
2455        full.truncate(10);
2456
2457        let topk = select_topk(&all_rowids, table, order, 10).unwrap();
2458
2459        assert_eq!(
2460            topk, full,
2461            "top-k DESC via heap should match full-sort+truncate"
2462        );
2463    }
2464
2465    #[test]
2466    fn topk_k_larger_than_n_returns_everything_sorted() {
2467        // The executor branches off to the full-sort path when k >= N,
2468        // but if a caller invokes select_topk directly with k > N, it
2469        // should still produce all-sorted output (no truncation
2470        // because we don't have N items to truncate to k).
2471        let db = seed_score_table(50);
2472        let table = db.get_table("docs".to_string()).unwrap();
2473        let q = parse_select("SELECT * FROM docs ORDER BY score ASC LIMIT 1000;");
2474        let order = q.order_by.as_ref().unwrap();
2475        let topk = select_topk(&table.rowids(), table, order, 1000).unwrap();
2476        assert_eq!(topk.len(), 50);
2477        // All scores in ascending order.
2478        let scores: Vec<f64> = topk
2479            .iter()
2480            .filter_map(|r| match table.get_value("score", *r) {
2481                Some(Value::Real(f)) => Some(f),
2482                _ => None,
2483            })
2484            .collect();
2485        assert!(scores.windows(2).all(|w| w[0] <= w[1]));
2486    }
2487
2488    #[test]
2489    fn topk_k_zero_returns_empty() {
2490        let db = seed_score_table(10);
2491        let table = db.get_table("docs".to_string()).unwrap();
2492        let q = parse_select("SELECT * FROM docs ORDER BY score ASC LIMIT 1;");
2493        let order = q.order_by.as_ref().unwrap();
2494        let topk = select_topk(&table.rowids(), table, order, 0).unwrap();
2495        assert!(topk.is_empty());
2496    }
2497
2498    #[test]
2499    fn topk_empty_input_returns_empty() {
2500        let db = seed_score_table(0);
2501        let table = db.get_table("docs".to_string()).unwrap();
2502        let q = parse_select("SELECT * FROM docs ORDER BY score ASC LIMIT 5;");
2503        let order = q.order_by.as_ref().unwrap();
2504        let topk = select_topk(&[], table, order, 5).unwrap();
2505        assert!(topk.is_empty());
2506    }
2507
2508    #[test]
2509    fn topk_works_through_select_executor_with_distance_function() {
2510        // Integration check that the executor actually picks the
2511        // bounded-heap path on a KNN-shaped query and produces the
2512        // correct top-k.
2513        let mut db = Database::new("tempdb".to_string());
2514        crate::sql::process_command(
2515            "CREATE TABLE docs (id INTEGER PRIMARY KEY, e VECTOR(2));",
2516            &mut db,
2517        )
2518        .unwrap();
2519        // Five rows with distinct distances from probe [1.0, 0.0]:
2520        //   id=1 [1.0, 0.0]   distance=0
2521        //   id=2 [2.0, 0.0]   distance=1
2522        //   id=3 [0.0, 3.0]   distance=√(1+9) = √10 ≈ 3.16
2523        //   id=4 [1.0, 4.0]   distance=4
2524        //   id=5 [10.0, 10.0] distance=√(81+100) ≈ 13.45
2525        for v in &[
2526            "[1.0, 0.0]",
2527            "[2.0, 0.0]",
2528            "[0.0, 3.0]",
2529            "[1.0, 4.0]",
2530            "[10.0, 10.0]",
2531        ] {
2532            crate::sql::process_command(&format!("INSERT INTO docs (e) VALUES ({v});"), &mut db)
2533                .unwrap();
2534        }
2535        let resp = crate::sql::process_command(
2536            "SELECT id FROM docs ORDER BY vec_distance_l2(e, [1.0, 0.0]) ASC LIMIT 3;",
2537            &mut db,
2538        )
2539        .unwrap();
2540        // Top-3 closest to [1.0, 0.0] are id=1, id=2, id=3 (in that order).
2541        // The status message tells us how many rows came back.
2542        assert!(resp.contains("3 rows returned"), "got: {resp}");
2543    }
2544
2545    /// Manual benchmark — not run by default. Recommended invocation:
2546    ///
2547    ///     cargo test -p sqlrite-engine --lib topk_benchmark --release \
2548    ///         -- --ignored --nocapture
2549    ///
2550    /// (`--release` matters: Rust's optimized sort gets very fast under
2551    /// optimization, so the heap's relative advantage is best observed
2552    /// against a sort that's also been optimized.)
2553    ///
2554    /// Measured numbers on an Apple Silicon laptop with N=10_000 + k=10:
2555    ///   - bounded heap:    ~820µs
2556    ///   - full sort+trunc: ~1.5ms
2557    ///   - ratio:           ~1.8×
2558    ///
2559    /// The advantage is real but moderate at this size because the sort
2560    /// key here is a single REAL column read (cheap) and Rust's sort_by
2561    /// has a very low constant factor. The asymptotic O(N log k) vs
2562    /// O(N log N) advantage scales with N and with per-row work — KNN
2563    /// queries where the sort key is `vec_distance_l2(col, [...])` are
2564    /// where this path really pays off, because each key evaluation is
2565    /// itself O(dim) and the heap path skips the per-row evaluation
2566    /// in the comparator (see `sort_rowids` for the contrast).
2567    #[test]
2568    #[ignore]
2569    fn topk_benchmark() {
2570        use std::time::Instant;
2571        const N: usize = 10_000;
2572        const K: usize = 10;
2573
2574        let db = seed_score_table(N);
2575        let table = db.get_table("docs".to_string()).unwrap();
2576        let q = parse_select("SELECT * FROM docs ORDER BY score ASC LIMIT 10;");
2577        let order = q.order_by.as_ref().unwrap();
2578        let all_rowids = table.rowids();
2579
2580        // Time bounded heap.
2581        let t0 = Instant::now();
2582        let _topk = select_topk(&all_rowids, table, order, K).unwrap();
2583        let heap_dur = t0.elapsed();
2584
2585        // Time full sort + truncate.
2586        let t1 = Instant::now();
2587        let mut full = all_rowids.clone();
2588        sort_rowids(&mut full, table, order).unwrap();
2589        full.truncate(K);
2590        let sort_dur = t1.elapsed();
2591
2592        let ratio = sort_dur.as_secs_f64() / heap_dur.as_secs_f64().max(1e-9);
2593        println!("\n--- topk_benchmark (N={N}, k={K}) ---");
2594        println!("  bounded heap:   {heap_dur:?}");
2595        println!("  full sort+trunc: {sort_dur:?}");
2596        println!("  speedup ratio:  {ratio:.2}×");
2597
2598        // Soft assertion. Floor is 1.4× because the cheap-key
2599        // benchmark hovers around 1.8× empirically; setting this too
2600        // close to the measured value risks flaky CI on slower
2601        // runners. Floor of 1.4× still catches an actual regression
2602        // (e.g., if select_topk became O(N²) or stopped using the
2603        // heap entirely).
2604        assert!(
2605            ratio > 1.4,
2606            "bounded heap should be substantially faster than full sort, but ratio = {ratio:.2}"
2607        );
2608    }
2609}