Skip to main content

contextdb_engine/
executor.rs

1use crate::database::{Database, InsertRowResult, QueryResult, QueryTrace, rank_index_name};
2use crate::rank_formula::RankFormula;
3use crate::sync_types::ConflictPolicy;
4use contextdb_core::*;
5use contextdb_parser::ast::{
6    AlterAction, BinOp, ColumnRef, Cte, DataType, Expr, Literal, SelectStatement,
7    SetDiskLimitValue, SetMemoryLimitValue, SortDirection, Statement, UnaryOp,
8};
9use contextdb_planner::{
10    DeletePlan, GraphStepPlan, InsertPlan, OnConflictPlan, PhysicalPlan, UpdatePlan, plan,
11};
12use roaring::RoaringTreemap;
13use std::cmp::Ordering;
14use std::collections::{BTreeSet, HashMap, HashSet};
15use std::sync::Arc;
16use std::time::{SystemTime, UNIX_EPOCH};
17use time::OffsetDateTime;
18use time::format_description::well_known::Rfc3339;
19
20pub(crate) fn execute_plan(
21    db: &Database,
22    plan: &PhysicalPlan,
23    params: &HashMap<String, Value>,
24    tx: Option<TxId>,
25) -> Result<QueryResult> {
26    match plan {
27        PhysicalPlan::CreateTable(p) => {
28            db.check_disk_budget("CREATE TABLE")?;
29            let expires_column = expires_column_name(&p.columns)?;
30            // Auto-generate implicit indexes for PK / UNIQUE columns and
31            // composite UNIQUE constraints, so constraint probes run at
32            // O(log n) instead of O(n) per insert.
33            let mut auto_indexes: Vec<contextdb_core::IndexDecl> = Vec::new();
34            for c in &p.columns {
35                if c.primary_key
36                    && !matches!(
37                        map_column_type(&c.data_type),
38                        ColumnType::Json | ColumnType::Vector(_)
39                    )
40                {
41                    auto_indexes.push(contextdb_core::IndexDecl {
42                        name: format!("__pk_{}", c.name),
43                        columns: vec![(c.name.clone(), contextdb_core::SortDirection::Asc)],
44                        kind: contextdb_core::IndexKind::Auto,
45                    });
46                }
47                if c.unique
48                    && !c.primary_key
49                    && !matches!(
50                        map_column_type(&c.data_type),
51                        ColumnType::Json | ColumnType::Vector(_)
52                    )
53                {
54                    auto_indexes.push(contextdb_core::IndexDecl {
55                        name: format!("__unique_{}", c.name),
56                        columns: vec![(c.name.clone(), contextdb_core::SortDirection::Asc)],
57                        kind: contextdb_core::IndexKind::Auto,
58                    });
59                }
60            }
61            for uc in &p.unique_constraints {
62                // Only index composite UNIQUE constraints whose columns are
63                // all B-tree indexable.
64                let all_indexable = uc.iter().all(|col_name| {
65                    p.columns
66                        .iter()
67                        .find(|c| c.name == *col_name)
68                        .map(|c| {
69                            !matches!(
70                                map_column_type(&c.data_type),
71                                ColumnType::Json | ColumnType::Vector(_)
72                            )
73                        })
74                        .unwrap_or(false)
75                });
76                if !all_indexable || uc.is_empty() {
77                    continue;
78                }
79                let name = format!("__unique_{}", uc.join("_"));
80                let cols: Vec<(String, contextdb_core::SortDirection)> = uc
81                    .iter()
82                    .map(|c| (c.clone(), contextdb_core::SortDirection::Asc))
83                    .collect();
84                auto_indexes.push(contextdb_core::IndexDecl {
85                    name,
86                    columns: cols,
87                    kind: contextdb_core::IndexKind::Auto,
88                });
89            }
90            let mut resolved_policies = HashMap::<String, ResolvedRankPolicy>::new();
91            for column in &p.columns {
92                if let Some(resolved) =
93                    validate_rank_policy_for_column(db, &p.name, column, &p.columns)?
94                {
95                    resolved_policies.insert(column.name.clone(), resolved);
96                }
97            }
98            let meta = TableMeta {
99                columns: p
100                    .columns
101                    .iter()
102                    .map(|c| {
103                        core_column_from_ast(
104                            c,
105                            resolved_policies
106                                .get(&c.name)
107                                .map(|resolved| resolved.policy.clone()),
108                        )
109                    })
110                    .collect(),
111                immutable: p.immutable,
112                state_machine: p.state_machine.as_ref().map(|sm| StateMachineConstraint {
113                    column: sm.column.clone(),
114                    transitions: sm
115                        .transitions
116                        .iter()
117                        .map(|(from, tos)| (from.clone(), tos.clone()))
118                        .collect(),
119                }),
120                dag_edge_types: p.dag_edge_types.clone(),
121                unique_constraints: p.unique_constraints.clone(),
122                natural_key_column: None,
123                propagation_rules: p.propagation_rules.clone(),
124                default_ttl_seconds: p.retain.as_ref().map(|r| r.duration_seconds),
125                sync_safe: p.retain.as_ref().is_some_and(|r| r.sync_safe),
126                expires_column,
127                // Auto-indexes land in `TableMeta.indexes` at CREATE TABLE
128                // time with `kind == IndexKind::Auto`. The planner sees them
129                // (so PK / UNIQUE point probes pick IndexScan); the user-
130                // visible `.schema` render suppresses them by default but
131                // keeps them in `EXPLAIN <query>` so agents can assert
132                // routing programmatically.
133                indexes: auto_indexes.clone(),
134            };
135            let metadata_bytes = meta.estimated_bytes();
136            db.accountant().try_allocate_for(
137                metadata_bytes,
138                "ddl",
139                "create_table",
140                "Reduce schema size or raise MEMORY_LIMIT before creating more tables.",
141            )?;
142            db.relational_store().create_table(&p.name, meta);
143            for idx in &auto_indexes {
144                db.relational_store()
145                    .create_index_storage(&p.name, &idx.name, idx.columns.clone());
146            }
147            if let Some(table_meta) = db.table_meta(&p.name) {
148                for column in &table_meta.columns {
149                    db.register_vector_index_for_column(&p.name, column);
150                }
151            }
152            for (column, resolved) in resolved_policies {
153                db.register_rank_formula(&p.name, &column, resolved.formula);
154            }
155            if let Some(table_meta) = db.table_meta(&p.name) {
156                db.persist_table_meta(&p.name, &table_meta)?;
157                db.allocate_ddl_lsn(|lsn| db.log_create_table_ddl(&p.name, &table_meta, lsn))?;
158            }
159            db.clear_statement_cache();
160            Ok(QueryResult::empty_with_affected(0))
161        }
162        PhysicalPlan::DropTable(name) => {
163            if let Some(block) = rank_policy_drop_table_blocker(db, name) {
164                return Err(block);
165            }
166            let bytes_to_release = estimate_drop_table_bytes(db, name);
167            db.drop_table_aux_state(name);
168            db.remove_rank_formulas_for_table(name);
169            db.vector_store_deregister_table(name);
170            db.relational_store().drop_table(name);
171            db.remove_persisted_table(name)?;
172            db.allocate_ddl_lsn(|lsn| db.log_drop_table_ddl(name, lsn))?;
173            db.accountant().release(bytes_to_release);
174            db.clear_statement_cache();
175            Ok(QueryResult::empty_with_affected(0))
176        }
177        PhysicalPlan::AlterTable(p) => {
178            db.check_disk_budget("ALTER TABLE")?;
179            let store = db.relational_store();
180            let mut rewrite_vectors_after_alter = false;
181            match &p.action {
182                AlterAction::AddColumn(col) => {
183                    if col.primary_key {
184                        return Err(Error::Other(
185                            "adding a primary key column via ALTER TABLE is not supported"
186                                .to_string(),
187                        ));
188                    }
189                    validate_expires_column(col)?;
190                    // If the new column is flagged IMMUTABLE, refuse to add it if any
191                    // existing propagation rule would write into a column of that name.
192                    // This closes the DROP-then-ADD-as-flagged loophole (Gotcha 13).
193                    if col.immutable
194                        && let Some(existing_meta) = db.table_meta(&p.table)
195                    {
196                        let targets_col =
197                            existing_meta
198                                .propagation_rules
199                                .iter()
200                                .any(|rule| {
201                                    match rule {
202                                contextdb_core::table_meta::PropagationRule::ForeignKey {
203                                    target_state,
204                                    ..
205                                } => *target_state == col.name,
206                                contextdb_core::table_meta::PropagationRule::Edge {
207                                    target_state,
208                                    ..
209                                } => *target_state == col.name,
210                                contextdb_core::table_meta::PropagationRule::VectorExclusion {
211                                    ..
212                                } => false,
213                            }
214                                });
215                        if targets_col {
216                            return Err(Error::ImmutableColumn {
217                                table: p.table.clone(),
218                                column: col.name.clone(),
219                            });
220                        }
221                    }
222                    let mut all_columns = db
223                        .table_meta(&p.table)
224                        .map(|meta| {
225                            meta.columns
226                                .into_iter()
227                                .map(ast_column_from_core)
228                                .collect::<Vec<_>>()
229                        })
230                        .unwrap_or_default();
231                    all_columns.push(col.clone());
232                    let resolved_policy =
233                        validate_rank_policy_for_column(db, &p.table, col, &all_columns)?;
234                    let core_col = core_column_from_ast(
235                        col,
236                        resolved_policy
237                            .as_ref()
238                            .map(|resolved| resolved.policy.clone()),
239                    );
240                    store
241                        .alter_table_add_column(&p.table, core_col)
242                        .map_err(Error::Other)?;
243                    if let Some(table_meta) = db.table_meta(&p.table)
244                        && let Some(column) = table_meta
245                            .columns
246                            .iter()
247                            .find(|column| column.name == col.name)
248                    {
249                        db.register_vector_index_for_column(&p.table, column);
250                    }
251                    if col.expires {
252                        let mut meta = store.table_meta.write();
253                        let table_meta = meta.get_mut(&p.table).ok_or_else(|| {
254                            Error::Other(format!("table '{}' not found", p.table))
255                        })?;
256                        table_meta.expires_column = Some(col.name.clone());
257                    }
258                    if let Some(resolved) = resolved_policy {
259                        db.register_rank_formula(&p.table, &col.name, resolved.formula);
260                    }
261                }
262                AlterAction::DropColumn {
263                    column: name,
264                    cascade,
265                } => {
266                    if let Some(block) = rank_policy_drop_column_blocker(db, &p.table, name) {
267                        return Err(block);
268                    }
269                    let dropped_vector_column = db
270                        .table_meta(&p.table)
271                        .and_then(|meta| {
272                            meta.columns.into_iter().find(|column| column.name == *name)
273                        })
274                        .is_some_and(|column| {
275                            matches!(column.column_type, contextdb_core::ColumnType::Vector(_))
276                        });
277                    if let Some(existing_meta) = db.table_meta(&p.table)
278                        && let Some(col) = existing_meta.columns.iter().find(|c| c.name == *name)
279                        && col.immutable
280                    {
281                        return Err(Error::ImmutableColumn {
282                            table: p.table.clone(),
283                            column: name.clone(),
284                        });
285                    }
286                    // PK check precedes index-dependency reporting: a column
287                    // flagged PRIMARY KEY cannot be dropped (the auto-index
288                    // would also show as a dependency, but the actionable
289                    // error is that the PK cannot be removed).
290                    if let Some(existing_meta) = db.table_meta(&p.table)
291                        && let Some(col) = existing_meta.columns.iter().find(|c| c.name == *name)
292                        && col.primary_key
293                    {
294                        return Err(Error::Other(format!(
295                            "cannot drop primary key column {}.{}",
296                            p.table, name
297                        )));
298                    }
299                    // RESTRICT / CASCADE on indexed columns. Only user-declared
300                    // indexes gate the RESTRICT path — auto-indexes dissolve
301                    // naturally when their defining column leaves.
302                    let dependent_user_indexes: Vec<String> = db
303                        .table_meta(&p.table)
304                        .map(|m| {
305                            m.indexes
306                                .iter()
307                                .filter(|i| {
308                                    i.kind == contextdb_core::IndexKind::UserDeclared
309                                        && i.columns.iter().any(|(c, _)| c == name)
310                                })
311                                .map(|i| i.name.clone())
312                                .collect()
313                        })
314                        .unwrap_or_default();
315                    let dependent_indexes: Vec<String> = db
316                        .table_meta(&p.table)
317                        .map(|m| {
318                            m.indexes
319                                .iter()
320                                .filter(|i| i.columns.iter().any(|(c, _)| c == name))
321                                .map(|i| i.name.clone())
322                                .collect()
323                        })
324                        .unwrap_or_default();
325                    if !*cascade && !dependent_user_indexes.is_empty() {
326                        return Err(Error::ColumnInIndex {
327                            table: p.table.clone(),
328                            column: name.clone(),
329                            index: dependent_user_indexes[0].clone(),
330                        });
331                    }
332                    store
333                        .alter_table_drop_column(&p.table, name)
334                        .map_err(Error::Other)?;
335                    db.remove_rank_formula(&p.table, name);
336                    db.deregister_vector_index(&p.table, name);
337                    rewrite_vectors_after_alter = dropped_vector_column;
338                    if *cascade {
339                        // Remove IndexDecls referencing `name`, release storage.
340                        {
341                            let mut metas = store.table_meta.write();
342                            if let Some(m) = metas.get_mut(&p.table) {
343                                m.indexes
344                                    .retain(|i| !i.columns.iter().any(|(c, _)| c == name));
345                            }
346                        }
347                        for idx in &dependent_indexes {
348                            store.drop_index_storage(&p.table, idx);
349                            db.allocate_ddl_lsn(|lsn| db.log_drop_index_ddl(&p.table, idx, lsn))?;
350                        }
351                    }
352                    let mut meta = store.table_meta.write();
353                    if let Some(table_meta) = meta.get_mut(&p.table)
354                        && table_meta.expires_column.as_deref() == Some(name.as_str())
355                    {
356                        table_meta.expires_column = None;
357                    }
358                    drop(meta);
359                    if let Some(table_meta) = db.table_meta(&p.table) {
360                        db.persist_table_meta(&p.table, &table_meta)?;
361                        db.persist_table_rows(&p.table)?;
362                        if rewrite_vectors_after_alter {
363                            db.persist_vectors()?;
364                        }
365                        db.allocate_ddl_lsn(|lsn| {
366                            db.log_alter_table_ddl(&p.table, &table_meta, lsn)
367                        })?;
368                    }
369                    db.clear_statement_cache();
370                    return Ok(QueryResult {
371                        columns: vec![],
372                        rows: vec![],
373                        rows_affected: 0,
374                        trace: crate::database::QueryTrace::scan(),
375                        cascade: if *cascade {
376                            Some(crate::database::CascadeReport {
377                                dropped_indexes: dependent_indexes,
378                            })
379                        } else {
380                            None
381                        },
382                    });
383                }
384                AlterAction::RenameColumn { from, to } => {
385                    if let Some(block) = rank_policy_drop_column_blocker(db, &p.table, from) {
386                        return Err(block);
387                    }
388                    let renamed_vector_column = db
389                        .table_meta(&p.table)
390                        .and_then(|meta| {
391                            meta.columns.into_iter().find(|column| column.name == *from)
392                        })
393                        .is_some_and(|column| {
394                            matches!(column.column_type, contextdb_core::ColumnType::Vector(_))
395                        });
396                    if let Some(existing_meta) = db.table_meta(&p.table)
397                        && let Some(col) = existing_meta.columns.iter().find(|c| c.name == *from)
398                        && col.immutable
399                    {
400                        return Err(Error::ImmutableColumn {
401                            table: p.table.clone(),
402                            column: from.clone(),
403                        });
404                    }
405                    store
406                        .alter_table_rename_column(&p.table, from, to)
407                        .map_err(Error::Other)?;
408                    if renamed_vector_column {
409                        db.rename_vector_index(&p.table, from, to)?;
410                        rewrite_vectors_after_alter = true;
411                    }
412                    let mut meta = store.table_meta.write();
413                    if let Some(table_meta) = meta.get_mut(&p.table)
414                        && table_meta.expires_column.as_deref() == Some(from.as_str())
415                    {
416                        table_meta.expires_column = Some(to.clone());
417                    }
418                }
419                AlterAction::SetRetain {
420                    duration_seconds,
421                    sync_safe,
422                } => {
423                    let mut meta = store.table_meta.write();
424                    let table_meta = meta
425                        .get_mut(&p.table)
426                        .ok_or_else(|| Error::Other(format!("table '{}' not found", p.table)))?;
427                    if table_meta.immutable {
428                        return Err(Error::Other(
429                            "IMMUTABLE and RETAIN are mutually exclusive".to_string(),
430                        ));
431                    }
432                    table_meta.default_ttl_seconds = Some(*duration_seconds);
433                    table_meta.sync_safe = *sync_safe;
434                }
435                AlterAction::DropRetain => {
436                    let mut meta = store.table_meta.write();
437                    let table_meta = meta
438                        .get_mut(&p.table)
439                        .ok_or_else(|| Error::Other(format!("table '{}' not found", p.table)))?;
440                    table_meta.default_ttl_seconds = None;
441                    table_meta.sync_safe = false;
442                }
443                AlterAction::SetSyncConflictPolicy(policy) => {
444                    let cp = parse_conflict_policy(policy)?;
445                    db.set_table_conflict_policy(&p.table, cp);
446                }
447                AlterAction::DropSyncConflictPolicy => {
448                    db.drop_table_conflict_policy(&p.table);
449                }
450            }
451            if let Some(table_meta) = db.table_meta(&p.table) {
452                db.persist_table_meta(&p.table, &table_meta)?;
453                if !matches!(
454                    p.action,
455                    AlterAction::AddColumn(_)
456                        | AlterAction::SetRetain { .. }
457                        | AlterAction::DropRetain
458                        | AlterAction::SetSyncConflictPolicy(_)
459                        | AlterAction::DropSyncConflictPolicy
460                ) {
461                    db.persist_table_rows(&p.table)?;
462                }
463                if rewrite_vectors_after_alter {
464                    db.persist_vectors()?;
465                }
466                db.allocate_ddl_lsn(|lsn| db.log_alter_table_ddl(&p.table, &table_meta, lsn))?;
467            }
468            db.clear_statement_cache();
469            Ok(QueryResult::empty_with_affected(0))
470        }
471        PhysicalPlan::Insert(p) => exec_insert(db, p, params, tx),
472        PhysicalPlan::Delete(p) => exec_delete(db, p, params, tx),
473        PhysicalPlan::Update(p) => exec_update(db, p, params, tx),
474        PhysicalPlan::Scan { table, filter, .. } => {
475            if table == "dual" {
476                return Ok(QueryResult {
477                    columns: vec![],
478                    rows: vec![vec![]],
479                    rows_affected: 0,
480                    trace: crate::database::QueryTrace::scan(),
481                    cascade: None,
482                });
483            }
484            let snapshot = db.snapshot_for_read();
485            let schema_columns = db.table_meta(table).map(|meta| {
486                meta.columns
487                    .into_iter()
488                    .map(|column| column.name)
489                    .collect::<Vec<_>>()
490            });
491            let resolved_filter = filter
492                .as_ref()
493                .map(|expr| resolve_in_subqueries(db, expr, params, tx))
494                .transpose()?;
495
496            // Try to route through an IndexScan if the filter is index-eligible.
497            // Analyze the PRE-resolve filter so `a IN (SELECT …)` disqualifies
498            // the outer IndexScan even after the subquery has been executed.
499            let meta_for_indexes = db.table_meta(table);
500            let indexes: Vec<contextdb_core::IndexDecl> = meta_for_indexes
501                .as_ref()
502                .map(|m| m.indexes.clone())
503                .unwrap_or_default();
504            let analysis = filter
505                .as_ref()
506                .filter(|_| !indexes.is_empty())
507                .map(|f| analyze_filter_for_index(f, &indexes, params));
508
509            if let Some(a) = analysis {
510                if let Some(pick) = a.pick {
511                    // IndexScan path. Fetch by BTree range; apply residual filter.
512                    let (rows, examined) = execute_index_scan(db, table, &pick, snapshot, tx)?;
513                    db.__bump_rows_examined(examined);
514                    let mut result = materialize_rows(
515                        rows,
516                        resolved_filter.as_ref(),
517                        params,
518                        schema_columns.as_deref(),
519                    )?;
520                    let mut pushed: smallvec::SmallVec<[std::borrow::Cow<'static, str>; 4]> =
521                        smallvec::SmallVec::new();
522                    pushed.push(std::borrow::Cow::Owned(pick.pushed_column.clone()));
523                    let considered: smallvec::SmallVec<[crate::database::IndexCandidate; 4]> = a
524                        .considered
525                        .iter()
526                        .filter(|c| c.name != pick.name)
527                        .cloned()
528                        .collect();
529                    result.trace = crate::database::QueryTrace {
530                        physical_plan: "IndexScan",
531                        index_used: Some(pick.name.clone()),
532                        predicates_pushed: pushed,
533                        indexes_considered: considered,
534                        sort_elided: false,
535                    };
536                    return Ok(result);
537                } else {
538                    // Scan with rejection trace.
539                    let rows = db.scan(table, snapshot)?;
540                    db.__bump_rows_examined(rows.len() as u64);
541                    let mut result = materialize_rows(
542                        rows,
543                        resolved_filter.as_ref(),
544                        params,
545                        schema_columns.as_deref(),
546                    )?;
547                    let considered: smallvec::SmallVec<[crate::database::IndexCandidate; 4]> =
548                        a.considered.into_iter().collect();
549                    result.trace = crate::database::QueryTrace {
550                        physical_plan: "Scan",
551                        index_used: None,
552                        predicates_pushed: Default::default(),
553                        indexes_considered: considered,
554                        sort_elided: false,
555                    };
556                    return Ok(result);
557                }
558            }
559
560            let rows = db.scan(table, snapshot)?;
561            db.__bump_rows_examined(rows.len() as u64);
562            let mut result = materialize_rows(
563                rows,
564                resolved_filter.as_ref(),
565                params,
566                schema_columns.as_deref(),
567            )?;
568            result.trace = crate::database::QueryTrace::scan();
569            Ok(result)
570        }
571        PhysicalPlan::GraphBfs {
572            start_alias,
573            start_expr,
574            start_candidates,
575            steps,
576            filter,
577        } => {
578            let start_uuids = match resolve_uuid(start_expr, params) {
579                Ok(start) => vec![start],
580                Err(Error::PlanError(_))
581                    if matches!(
582                        start_expr,
583                        Expr::Column(contextdb_parser::ast::ColumnRef { table: None, .. })
584                    ) =>
585                {
586                    // Start node not directly specified — check if a subquery or filter can help
587                    if let Some(candidate_plan) = start_candidates {
588                        resolve_graph_start_nodes_from_plan(db, candidate_plan, params, tx)?
589                    } else if let Some(filter_expr) = filter {
590                        let resolved_filter = resolve_in_subqueries(db, filter_expr, params, tx)?;
591                        resolve_graph_start_nodes_from_filter(db, &resolved_filter, params)?
592                    } else {
593                        vec![]
594                    }
595                }
596                Err(err) => return Err(err),
597            };
598            if start_uuids.is_empty() {
599                return Ok(QueryResult {
600                    columns: vec!["id".to_string(), "depth".to_string()],
601                    rows: vec![],
602                    rows_affected: 0,
603                    trace: crate::database::QueryTrace::scan(),
604                    cascade: None,
605                });
606            }
607            let snapshot = db.snapshot();
608            let mut frontier = start_uuids
609                .into_iter()
610                .map(|id| (HashMap::from([(start_alias.clone(), id)]), id, 0_u32))
611                .collect::<Vec<_>>();
612            let bfs_bytes = estimate_bfs_working_bytes(&frontier, steps);
613            db.accountant().try_allocate_for(
614                bfs_bytes,
615                "bfs_frontier",
616                "graph_bfs",
617                "Reduce traversal depth/fan-out or raise MEMORY_LIMIT before running BFS.",
618            )?;
619
620            let result = (|| {
621                for step in steps {
622                    let edge_types_ref = if step.edge_types.is_empty() {
623                        None
624                    } else {
625                        Some(step.edge_types.as_slice())
626                    };
627                    let mut next = Vec::new();
628
629                    for (bindings, start, base_depth) in &frontier {
630                        let res = db.graph().bfs(
631                            *start,
632                            edge_types_ref,
633                            step.direction,
634                            step.min_depth,
635                            step.max_depth,
636                            snapshot,
637                        )?;
638                        for node in res.nodes {
639                            let total_depth = base_depth.saturating_add(node.depth);
640                            let mut next_bindings = bindings.clone();
641                            next_bindings.insert(step.target_alias.clone(), node.id);
642                            next.push((next_bindings, node.id, total_depth));
643                        }
644                    }
645
646                    frontier = dedupe_graph_frontier(next, steps);
647                    if frontier.is_empty() {
648                        break;
649                    }
650                }
651
652                let mut columns =
653                    steps
654                        .iter()
655                        .fold(vec![format!("{start_alias}.id")], |mut cols, step| {
656                            cols.push(format!("{}.id", step.target_alias));
657                            cols
658                        });
659                columns.push("id".to_string());
660                columns.push("depth".to_string());
661
662                Ok(QueryResult {
663                    columns,
664                    rows: project_graph_frontier_rows(frontier, start_alias, steps)?,
665                    rows_affected: 0,
666                    trace: crate::database::QueryTrace::scan(),
667                    cascade: None,
668                })
669            })();
670            db.accountant().release(bfs_bytes);
671
672            result
673        }
674        PhysicalPlan::VectorSearch {
675            table,
676            column,
677            query_expr,
678            k,
679            candidates,
680            sort_key,
681            ..
682        }
683        | PhysicalPlan::HnswSearch {
684            table,
685            column,
686            query_expr,
687            k,
688            candidates,
689            sort_key,
690            ..
691        } => {
692            let query_vec = resolve_vector_from_expr(query_expr, params)?;
693            let snapshot = db.snapshot();
694            let mut candidate_trace = None;
695            let candidate_bitmap = if let Some(cands_plan) = candidates {
696                let qr = execute_plan(db, cands_plan, params, tx)?;
697                candidate_trace = Some(qr.trace.clone());
698                let mut bm = RoaringTreemap::new();
699                let row_id_idx = qr.columns.iter().position(|column| {
700                    column == "row_id" || column.rsplit('.').next() == Some("row_id")
701                });
702                let id_idx = qr
703                    .columns
704                    .iter()
705                    .position(|column| column == "id" || column.rsplit('.').next() == Some("id"));
706
707                if let Some(idx) = row_id_idx {
708                    for row in qr.rows {
709                        if let Some(Value::Int64(id)) = row.get(idx) {
710                            bm.insert(*id as u64);
711                        }
712                    }
713                } else if let Some(idx) = id_idx {
714                    let uuid_to_row_id = uuid_to_row_id_map(db, table, snapshot)?;
715                    for row in qr.rows {
716                        if let Some(Value::Uuid(uuid)) = row.get(idx)
717                            && let Some(row_id) = uuid_to_row_id.get(uuid)
718                        {
719                            bm.insert(row_id.0);
720                        }
721                    }
722                }
723                Some(bm)
724            } else {
725                None
726            };
727
728            let vector_bytes = estimate_vector_search_bytes(query_vec.len(), *k as usize);
729            db.accountant().try_allocate_for(
730                vector_bytes,
731                "vector_search",
732                "search",
733                "Reduce LIMIT/dimensionality or raise MEMORY_LIMIT before vector search.",
734            )?;
735            if let Some(sort_key) = sort_key {
736                let mut semantic_query = crate::database::SemanticQuery::new(
737                    table.clone(),
738                    column.clone(),
739                    query_vec,
740                    *k as usize,
741                );
742                semantic_query.sort_key = Some(sort_key.clone());
743                let res = db.semantic_search_with_candidates(semantic_query, candidate_bitmap);
744                db.accountant().release(vector_bytes);
745                let results = res?;
746                let schema_columns = db.table_meta(table).map(|meta| {
747                    meta.columns
748                        .into_iter()
749                        .map(|column| column.name)
750                        .collect::<Vec<_>>()
751                });
752                let keys = schema_columns.unwrap_or_else(|| {
753                    let mut ks = BTreeSet::new();
754                    for result in &results {
755                        for key in result.values.keys() {
756                            ks.insert(key.clone());
757                        }
758                    }
759                    ks.into_iter().collect()
760                });
761                let mut columns = vec!["row_id".to_string()];
762                columns.extend(keys.iter().cloned());
763                columns.push("score".to_string());
764                let rows = results
765                    .into_iter()
766                    .map(|result| {
767                        let mut out = vec![Value::Int64(result.row_id.0 as i64)];
768                        for key in &keys {
769                            out.push(result.values.get(key).cloned().unwrap_or(Value::Null));
770                        }
771                        out.push(Value::Float64(result.rank as f64));
772                        out
773                    })
774                    .collect();
775                return Ok(QueryResult {
776                    columns,
777                    rows,
778                    rows_affected: 0,
779                    trace: vector_search_trace("VectorSearch", candidate_trace),
780                    cascade: None,
781                });
782            }
783            let res = db.query_vector_strict(
784                contextdb_core::VectorIndexRef::new(table.clone(), column.clone()),
785                &query_vec,
786                *k as usize,
787                candidate_bitmap.as_ref(),
788                db.snapshot(),
789            );
790            db.accountant().release(vector_bytes);
791            let res = res?;
792
793            // Re-materialize: look up actual rows by row_id so SELECT * returns user columns
794            let result_row_ids = res.iter().map(|(rid, _)| *rid).collect::<Vec<_>>();
795            let result_rows = rows_by_row_id(db, table, &result_row_ids, snapshot)?;
796            let schema_columns = db.table_meta(table).map(|meta| {
797                meta.columns
798                    .into_iter()
799                    .map(|column| column.name)
800                    .collect::<Vec<_>>()
801            });
802            let keys = if let Some(ref sc) = schema_columns {
803                sc.clone()
804            } else {
805                let mut ks = BTreeSet::new();
806                for r in &result_rows {
807                    for k in r.values.keys() {
808                        ks.insert(k.clone());
809                    }
810                }
811                ks.into_iter().collect::<Vec<_>>()
812            };
813
814            let row_map: HashMap<RowId, &VersionedRow> =
815                result_rows.iter().map(|r| (r.row_id, r)).collect();
816
817            let mut columns = vec!["row_id".to_string()];
818            columns.extend(keys.iter().cloned());
819            columns.push("score".to_string());
820
821            let rows = res
822                .into_iter()
823                .filter_map(|(rid, score)| {
824                    row_map.get(&rid).map(|row| {
825                        let mut out = vec![Value::Int64(rid.0 as i64)];
826                        for k in &keys {
827                            out.push(row.values.get(k).cloned().unwrap_or(Value::Null));
828                        }
829                        out.push(Value::Float64(score as f64));
830                        out
831                    })
832                })
833                .collect();
834
835            Ok(QueryResult {
836                columns,
837                rows,
838                rows_affected: 0,
839                trace: vector_search_trace(
840                    if db
841                        .__debug_vector_hnsw_len(contextdb_core::VectorIndexRef::new(
842                            table.clone(),
843                            column.clone(),
844                        ))
845                        .is_some()
846                    {
847                        "HNSWSearch"
848                    } else {
849                        "VectorSearch"
850                    },
851                    candidate_trace,
852                ),
853                cascade: None,
854            })
855        }
856        PhysicalPlan::MaterializeCte { input, .. } => execute_plan(db, input, params, tx),
857        PhysicalPlan::Project { input, columns } => {
858            let input_result = execute_plan(db, input, params, tx)?;
859            let has_aggregate = columns.iter().any(|column| {
860                matches!(
861                    &column.expr,
862                    Expr::FunctionCall { name, .. } if name.eq_ignore_ascii_case("count")
863                )
864            });
865            if has_aggregate {
866                if columns.iter().any(|column| {
867                    !matches!(
868                        &column.expr,
869                        Expr::FunctionCall { name, .. } if name.eq_ignore_ascii_case("count")
870                    )
871                }) {
872                    return Err(Error::PlanError(
873                        "mixed aggregate and non-aggregate columns without GROUP BY".to_string(),
874                    ));
875                }
876
877                let output_columns = columns
878                    .iter()
879                    .map(|column| {
880                        column.alias.clone().unwrap_or_else(|| match &column.expr {
881                            Expr::FunctionCall { name, .. } => name.clone(),
882                            _ => "expr".to_string(),
883                        })
884                    })
885                    .collect::<Vec<_>>();
886
887                let aggregate_row = columns
888                    .iter()
889                    .map(|column| match &column.expr {
890                        Expr::FunctionCall { name: _, args } => {
891                            let count = if matches!(
892                                args.as_slice(),
893                                [Expr::Column(contextdb_parser::ast::ColumnRef { table: None, column })]
894                                if column == "*"
895                            ) {
896                                input_result.rows.len() as i64
897                            } else {
898                                input_result
899                                    .rows
900                                    .iter()
901                                    .filter_map(|row| {
902                                        args.first().map(|arg| {
903                                            eval_query_result_expr(
904                                                arg,
905                                                row,
906                                                &input_result.columns,
907                                                params,
908                                            )
909                                        })
910                                    })
911                                    .collect::<Result<Vec<_>>>()?
912                                    .into_iter()
913                                    .filter(|value| *value != Value::Null)
914                                    .count() as i64
915                            };
916                            Ok(Value::Int64(count))
917                        }
918                        _ => Err(Error::PlanError(
919                            "mixed aggregate and non-aggregate columns without GROUP BY"
920                                .to_string(),
921                        )),
922                    })
923                    .collect::<Result<Vec<_>>>()?;
924
925                return Ok(QueryResult {
926                    columns: output_columns,
927                    rows: vec![aggregate_row],
928                    rows_affected: 0,
929                    trace: input_result.trace.clone(),
930                    cascade: None,
931                });
932            }
933
934            let output_columns = columns
935                .iter()
936                .map(|c| {
937                    c.alias.clone().unwrap_or_else(|| match &c.expr {
938                        Expr::Column(col) => col.column.clone(),
939                        _ => "expr".to_string(),
940                    })
941                })
942                .collect::<Vec<_>>();
943
944            let mut output_rows = Vec::with_capacity(input_result.rows.len());
945            for row in &input_result.rows {
946                let mut projected = Vec::with_capacity(columns.len());
947                for col in columns {
948                    projected.push(eval_project_expr(
949                        &col.expr,
950                        row,
951                        &input_result.columns,
952                        params,
953                    )?);
954                }
955                output_rows.push(projected);
956            }
957
958            Ok(QueryResult {
959                columns: output_columns,
960                rows: output_rows,
961                rows_affected: 0,
962                trace: input_result.trace.clone(),
963                cascade: None,
964            })
965        }
966        PhysicalPlan::Sort { input, keys } => {
967            // Sort elision path A: input is a Scan and an index's direction
968            // prefix matches `keys`. We rewrite the input into an IndexScan
969            // and skip the re-sort.
970            let elided = try_elide_sort(db, input, keys, params, tx)?;
971            if let Some(mut result) = elided {
972                result.trace.sort_elided = true;
973                return Ok(result);
974            }
975            let mut input_result = execute_plan(db, input, params, tx)?;
976
977            // Sort elision path B: the input already used an IndexScan whose
978            // column list + directions prefix-match the ORDER BY keys. The
979            // IndexScan already delivers rows in the requested order, so the
980            // Sort is a no-op; skip it and mark `sort_elided`.
981            if input_result.trace.physical_plan == "IndexScan"
982                && let Some(idx_name) = &input_result.trace.index_used
983                && sort_keys_match_index_prefix(db, input, idx_name, keys)
984            {
985                input_result.trace.sort_elided = true;
986                return Ok(input_result);
987            }
988            input_result.rows.sort_by(|left, right| {
989                for key in keys {
990                    let Expr::Column(column_ref) = &key.expr else {
991                        return Ordering::Equal;
992                    };
993                    let left_value =
994                        match lookup_query_result_column(left, &input_result.columns, column_ref) {
995                            Ok(value) => value,
996                            Err(_) => return Ordering::Equal,
997                        };
998                    let right_value = match lookup_query_result_column(
999                        right,
1000                        &input_result.columns,
1001                        column_ref,
1002                    ) {
1003                        Ok(value) => value,
1004                        Err(_) => return Ordering::Equal,
1005                    };
1006                    let ordering = compare_sort_values(&left_value, &right_value, key.direction);
1007                    if ordering != Ordering::Equal {
1008                        return ordering;
1009                    }
1010                }
1011                Ordering::Equal
1012            });
1013            // Preserve the child's physical_plan when it was an IndexScan:
1014            // the trace reports the data-source strategy, and Sort is
1015            // represented by `sort_elided = false` rather than overriding the
1016            // plan label. A plain `Scan` child gets relabeled to `Sort` to
1017            // match the plan's ORDER BY-without-index expectations.
1018            if input_result.trace.physical_plan != "IndexScan" {
1019                input_result.trace.physical_plan = "Sort";
1020            }
1021            input_result.trace.sort_elided = false;
1022            Ok(input_result)
1023        }
1024        PhysicalPlan::Limit { input, count } => {
1025            let mut input_result = execute_plan(db, input, params, tx)?;
1026            input_result.rows.truncate(*count as usize);
1027            Ok(input_result)
1028        }
1029        PhysicalPlan::Filter { input, predicate } => {
1030            let mut input_result = execute_plan(db, input, params, tx)?;
1031            input_result.rows.retain(|row| {
1032                query_result_row_matches(row, &input_result.columns, predicate, params)
1033                    .unwrap_or(false)
1034            });
1035            Ok(input_result)
1036        }
1037        PhysicalPlan::Distinct { input } => {
1038            let input_result = execute_plan(db, input, params, tx)?;
1039            let mut seen = HashSet::<Vec<u8>>::new();
1040            let rows = input_result
1041                .rows
1042                .into_iter()
1043                .filter(|row| seen.insert(distinct_row_key(row)))
1044                .collect();
1045            Ok(QueryResult {
1046                columns: input_result.columns,
1047                rows,
1048                rows_affected: input_result.rows_affected,
1049                trace: input_result.trace,
1050                cascade: None,
1051            })
1052        }
1053        PhysicalPlan::Join {
1054            left,
1055            right,
1056            condition,
1057            join_type,
1058            left_alias,
1059            right_alias,
1060        } => {
1061            let left_result = execute_plan(db, left, params, tx)?;
1062            let right_result = execute_plan(db, right, params, tx)?;
1063            let right_duplicate_names =
1064                duplicate_column_names(&left_result.columns, &right_result.columns);
1065            let right_prefix = right_alias
1066                .clone()
1067                .unwrap_or_else(|| right_table_name(right));
1068            let right_columns = right_result
1069                .columns
1070                .iter()
1071                .map(|column| {
1072                    if right_duplicate_names.contains(column) {
1073                        format!("{right_prefix}.{column}")
1074                    } else {
1075                        column.clone()
1076                    }
1077                })
1078                .collect::<Vec<_>>();
1079
1080            let mut columns = left_result.columns.clone();
1081            columns.extend(right_columns);
1082
1083            let mut rows = Vec::new();
1084            for left_row in &left_result.rows {
1085                let mut matched = false;
1086                for right_row in &right_result.rows {
1087                    let combined = concatenate_rows(left_row, right_row);
1088                    if query_result_row_matches(&combined, &columns, condition, params)? {
1089                        matched = true;
1090                        rows.push(combined);
1091                    }
1092                }
1093
1094                if !matched && matches!(join_type, contextdb_planner::JoinType::Left) {
1095                    let mut combined = left_row.clone();
1096                    combined.extend(std::iter::repeat_n(Value::Null, right_result.columns.len()));
1097                    rows.push(combined);
1098                }
1099            }
1100
1101            let output_columns = qualify_join_columns(
1102                &columns,
1103                &left_result.columns,
1104                &right_result.columns,
1105                left_alias,
1106                &right_prefix,
1107            );
1108
1109            Ok(QueryResult {
1110                columns: output_columns,
1111                rows,
1112                rows_affected: 0,
1113                trace: crate::database::QueryTrace::scan(),
1114                cascade: None,
1115            })
1116        }
1117        PhysicalPlan::CreateIndex(p) => exec_create_index(db, p),
1118        PhysicalPlan::DropIndex(p) => exec_drop_index(db, p),
1119        PhysicalPlan::IndexScan {
1120            table,
1121            index,
1122            range: _,
1123        } => {
1124            // Stub: always return empty rows + an IndexScan trace marker. Impl
1125            // must walk the BTreeMap at the named index, apply visibility,
1126            // materialize rows, and populate the trace fully.
1127            let _ = (table, index);
1128            Ok(QueryResult {
1129                columns: vec![],
1130                rows: vec![],
1131                rows_affected: 0,
1132                trace: crate::database::QueryTrace {
1133                    physical_plan: "IndexScan",
1134                    index_used: None,
1135                    ..crate::database::QueryTrace::default()
1136                },
1137                cascade: None,
1138            })
1139        }
1140        PhysicalPlan::SetMemoryLimit(val) => {
1141            let limit = match val {
1142                SetMemoryLimitValue::Bytes(bytes) => Some(*bytes),
1143                SetMemoryLimitValue::None => None,
1144            };
1145            db.accountant().set_budget(limit)?;
1146            db.persist_memory_limit(limit)?;
1147            Ok(QueryResult::empty())
1148        }
1149        PhysicalPlan::ShowMemoryLimit => {
1150            let usage = db.accountant().usage();
1151            Ok(QueryResult {
1152                columns: vec![
1153                    "limit".to_string(),
1154                    "used".to_string(),
1155                    "available".to_string(),
1156                    "startup_ceiling".to_string(),
1157                ],
1158                rows: vec![vec![
1159                    usage
1160                        .limit
1161                        .map(|value| Value::Int64(value as i64))
1162                        .unwrap_or_else(|| Value::Text("none".to_string())),
1163                    Value::Int64(usage.used as i64),
1164                    usage
1165                        .available
1166                        .map(|value| Value::Int64(value as i64))
1167                        .unwrap_or_else(|| Value::Text("none".to_string())),
1168                    usage
1169                        .startup_ceiling
1170                        .map(|value| Value::Int64(value as i64))
1171                        .unwrap_or_else(|| Value::Text("none".to_string())),
1172                ]],
1173                rows_affected: 0,
1174                trace: crate::database::QueryTrace::scan(),
1175                cascade: None,
1176            })
1177        }
1178        PhysicalPlan::SetDiskLimit(val) => {
1179            let limit = match val {
1180                SetDiskLimitValue::Bytes(bytes) => Some(*bytes),
1181                SetDiskLimitValue::None => None,
1182            };
1183            db.set_disk_limit(limit)?;
1184            db.persist_disk_limit(limit)?;
1185            Ok(QueryResult::empty())
1186        }
1187        PhysicalPlan::ShowDiskLimit => {
1188            let limit = db.disk_limit();
1189            let used = db.disk_file_size();
1190            let startup_ceiling = db.disk_limit_startup_ceiling();
1191            Ok(QueryResult {
1192                columns: vec![
1193                    "limit".to_string(),
1194                    "used".to_string(),
1195                    "available".to_string(),
1196                    "startup_ceiling".to_string(),
1197                ],
1198                rows: vec![vec![
1199                    limit
1200                        .map(|value| Value::Int64(value as i64))
1201                        .unwrap_or_else(|| Value::Text("none".to_string())),
1202                    used.map(|value| Value::Int64(value as i64))
1203                        .unwrap_or(Value::Null),
1204                    match (limit, used) {
1205                        (Some(limit), Some(used)) => {
1206                            Value::Int64(limit.saturating_sub(used) as i64)
1207                        }
1208                        _ => Value::Null,
1209                    },
1210                    startup_ceiling
1211                        .map(|value| Value::Int64(value as i64))
1212                        .unwrap_or_else(|| Value::Text("none".to_string())),
1213                ]],
1214                rows_affected: 0,
1215                trace: crate::database::QueryTrace::scan(),
1216                cascade: None,
1217            })
1218        }
1219        PhysicalPlan::SetSyncConflictPolicy(policy) => {
1220            let cp = parse_conflict_policy(policy)?;
1221            db.set_default_conflict_policy(cp);
1222            Ok(QueryResult::empty())
1223        }
1224        PhysicalPlan::ShowSyncConflictPolicy => {
1225            let policies = db.conflict_policies();
1226            let default_str = conflict_policy_to_string(policies.default);
1227            let mut rows = vec![vec![Value::Text(default_str)]];
1228            for (table, policy) in &policies.per_table {
1229                rows.push(vec![Value::Text(format!(
1230                    "{}={}",
1231                    table,
1232                    conflict_policy_to_string(*policy)
1233                ))]);
1234            }
1235            Ok(QueryResult {
1236                columns: vec!["policy".to_string()],
1237                rows,
1238                rows_affected: 0,
1239                trace: crate::database::QueryTrace::scan(),
1240                cascade: None,
1241            })
1242        }
1243        PhysicalPlan::ShowVectorIndexes => {
1244            let rows = db
1245                .vector_index_infos()
1246                .into_iter()
1247                .map(|info| {
1248                    vec![
1249                        Value::Text(info.index.table),
1250                        Value::Text(info.index.column),
1251                        Value::Int64(info.dimension as i64),
1252                        Value::Text(info.quantization.as_str().to_string()),
1253                        Value::Int64(info.vector_count as i64),
1254                        Value::Int64(info.bytes as i64),
1255                    ]
1256                })
1257                .collect();
1258            Ok(QueryResult {
1259                columns: vec![
1260                    "table".to_string(),
1261                    "column".to_string(),
1262                    "dimension".to_string(),
1263                    "quantization".to_string(),
1264                    "vector_count".to_string(),
1265                    "bytes".to_string(),
1266                ],
1267                rows,
1268                rows_affected: 0,
1269                trace: crate::database::QueryTrace::scan(),
1270                cascade: None,
1271            })
1272        }
1273        PhysicalPlan::Pipeline(plans) => {
1274            let mut last = QueryResult::empty();
1275            for p in plans {
1276                last = execute_plan(db, p, params, tx)?;
1277            }
1278            Ok(last)
1279        }
1280        _ => Err(Error::PlanError(
1281            "unsupported plan node in executor".to_string(),
1282        )),
1283    }
1284}
1285
1286fn eval_project_expr(
1287    expr: &Expr,
1288    row: &[Value],
1289    input_columns: &[String],
1290    params: &HashMap<String, Value>,
1291) -> Result<Value> {
1292    match expr {
1293        Expr::Column(c) => lookup_query_result_column(row, input_columns, c),
1294        Expr::Literal(lit) => resolve_expr(&Expr::Literal(lit.clone()), params),
1295        Expr::Parameter(name) => params
1296            .get(name)
1297            .cloned()
1298            .ok_or_else(|| Error::NotFound(format!("missing parameter: {}", name))),
1299        Expr::BinaryOp { left, op, right } => {
1300            let left = eval_query_result_expr(left, row, input_columns, params)?;
1301            let right = eval_query_result_expr(right, row, input_columns, params)?;
1302            eval_binary_op(op, &left, &right)
1303        }
1304        Expr::UnaryOp { op, operand } => {
1305            let value = eval_query_result_expr(operand, row, input_columns, params)?;
1306            match op {
1307                UnaryOp::Not => Ok(Value::Bool(!value_to_bool(&value))),
1308                UnaryOp::Neg => match value {
1309                    Value::Int64(v) => Ok(Value::Int64(-v)),
1310                    Value::Float64(v) => Ok(Value::Float64(-v)),
1311                    _ => Err(Error::PlanError(
1312                        "cannot negate non-numeric value".to_string(),
1313                    )),
1314                },
1315            }
1316        }
1317        Expr::FunctionCall { name, args } => {
1318            let values = args
1319                .iter()
1320                .map(|arg| eval_query_result_expr(arg, row, input_columns, params))
1321                .collect::<Result<Vec<_>>>()?;
1322            eval_function(name, &values)
1323        }
1324        Expr::IsNull { expr, negated } => {
1325            let is_null = eval_query_result_expr(expr, row, input_columns, params)? == Value::Null;
1326            Ok(Value::Bool(if *negated { !is_null } else { is_null }))
1327        }
1328        Expr::InList {
1329            expr,
1330            list,
1331            negated,
1332        } => {
1333            let needle = eval_query_result_expr(expr, row, input_columns, params)?;
1334            let matched = list.iter().try_fold(false, |found, item| {
1335                if found {
1336                    Ok(true)
1337                } else {
1338                    let candidate = eval_query_result_expr(item, row, input_columns, params)?;
1339                    Ok(
1340                        matches!(compare_values(&needle, &candidate), Some(Ordering::Equal))
1341                            || (needle != Value::Null
1342                                && candidate != Value::Null
1343                                && needle == candidate),
1344                    )
1345                }
1346            })?;
1347            Ok(Value::Bool(if *negated { !matched } else { matched }))
1348        }
1349        Expr::Like {
1350            expr,
1351            pattern,
1352            negated,
1353        } => {
1354            let matches = match (
1355                eval_query_result_expr(expr, row, input_columns, params)?,
1356                eval_query_result_expr(pattern, row, input_columns, params)?,
1357            ) {
1358                (Value::Text(value), Value::Text(pattern)) => like_matches(&value, &pattern),
1359                _ => false,
1360            };
1361            Ok(Value::Bool(if *negated { !matches } else { matches }))
1362        }
1363        _ => resolve_expr(expr, params),
1364    }
1365}
1366
1367fn eval_query_result_expr(
1368    expr: &Expr,
1369    row: &[Value],
1370    input_columns: &[String],
1371    params: &HashMap<String, Value>,
1372) -> Result<Value> {
1373    match expr {
1374        Expr::Column(c) => lookup_query_result_column(row, input_columns, c),
1375        Expr::Literal(lit) => resolve_expr(&Expr::Literal(lit.clone()), params),
1376        Expr::Parameter(name) => params
1377            .get(name)
1378            .cloned()
1379            .ok_or_else(|| Error::NotFound(format!("missing parameter: {}", name))),
1380        Expr::FunctionCall { name, args } => {
1381            let values = args
1382                .iter()
1383                .map(|arg| eval_query_result_expr(arg, row, input_columns, params))
1384                .collect::<Result<Vec<_>>>()?;
1385            eval_function(name, &values)
1386        }
1387        _ => resolve_expr(expr, params),
1388    }
1389}
1390
1391fn exec_insert(
1392    db: &Database,
1393    p: &InsertPlan,
1394    params: &HashMap<String, Value>,
1395    tx: Option<TxId>,
1396) -> Result<QueryResult> {
1397    db.check_disk_budget("INSERT")?;
1398    let txid = tx.ok_or_else(|| Error::Other("missing tx for insert".to_string()))?;
1399
1400    let insert_meta = db
1401        .table_meta(&p.table)
1402        .ok_or_else(|| Error::TableNotFound(p.table.clone()))?;
1403    // When no column list is provided (INSERT INTO t VALUES (...)),
1404    // infer column names from table metadata in declaration order.
1405    let columns: Vec<String> = if p.columns.is_empty() {
1406        insert_meta.columns.iter().map(|c| c.name.clone()).collect()
1407    } else {
1408        p.columns.clone()
1409    };
1410
1411    // Statement-scoped snapshot of the committed TxId watermark for TXID bound checks.
1412    let current_tx_max = Some(db.committed_watermark());
1413    let route_inserts_to_graph =
1414        p.table.eq_ignore_ascii_case("edges") || !insert_meta.dag_edge_types.is_empty();
1415    let vector_columns = vector_columns_for_meta(&insert_meta);
1416    let has_column_defaults = insert_meta
1417        .columns
1418        .iter()
1419        .any(|column| column.default.is_some());
1420
1421    if !vector_columns.is_empty() {
1422        for row in &p.values {
1423            let mut values = HashMap::new();
1424            for (idx, expr) in row.iter().enumerate() {
1425                let col = columns
1426                    .get(idx)
1427                    .ok_or_else(|| Error::PlanError("column/value count mismatch".to_string()))?;
1428                let v = resolve_expr(expr, params)?;
1429                values.insert(
1430                    col.clone(),
1431                    coerce_value_for_column_with_meta(
1432                        &p.table,
1433                        &insert_meta,
1434                        col,
1435                        v,
1436                        current_tx_max,
1437                        Some(txid),
1438                    )?,
1439                );
1440            }
1441            if has_column_defaults {
1442                apply_missing_column_defaults(db, &p.table, &mut values, Some(txid))?;
1443            }
1444            validate_vector_columns(db, &p.table, &values)?;
1445        }
1446    }
1447
1448    let mut rows_affected = 0;
1449    for row in &p.values {
1450        let mut values = HashMap::new();
1451        for (idx, expr) in row.iter().enumerate() {
1452            let col = columns
1453                .get(idx)
1454                .ok_or_else(|| Error::PlanError("column/value count mismatch".to_string()))?;
1455            let v = resolve_expr(expr, params)?;
1456            values.insert(
1457                col.clone(),
1458                coerce_value_for_column_with_meta(
1459                    &p.table,
1460                    &insert_meta,
1461                    col,
1462                    v,
1463                    current_tx_max,
1464                    Some(txid),
1465                )?,
1466            );
1467        }
1468
1469        if has_column_defaults {
1470            apply_missing_column_defaults(db, &p.table, &mut values, Some(txid))?;
1471        }
1472
1473        if !vector_columns.is_empty() {
1474            validate_vector_columns(db, &p.table, &values)?;
1475        }
1476        let row_bytes = estimate_row_bytes_for_meta(&values, &insert_meta, false);
1477        db.accountant().try_allocate_for(
1478            row_bytes,
1479            "insert",
1480            "row_insert",
1481            "Reduce row size or raise MEMORY_LIMIT before inserting more data.",
1482        )?;
1483        let checkpoint = db.write_set_checkpoint(txid)?;
1484        let mut vector_allocations = Vec::new();
1485        let graph_edge = if route_inserts_to_graph {
1486            match (
1487                values.get("source_id"),
1488                values.get("target_id"),
1489                values.get("edge_type"),
1490            ) {
1491                (
1492                    Some(Value::Uuid(source)),
1493                    Some(Value::Uuid(target)),
1494                    Some(Value::Text(edge_type)),
1495                ) => Some((*source, *target, edge_type.clone())),
1496                _ => None,
1497            }
1498        } else {
1499            None
1500        };
1501        let vector_values = vector_values_for_table(db, &p.table, &values);
1502
1503        let row_id = if let Some(on_conflict) = &p.on_conflict {
1504            let conflict_col = &on_conflict.columns[0];
1505            let conflict_value = values
1506                .get(conflict_col)
1507                .ok_or_else(|| Error::Other("conflict column not in values".to_string()))?;
1508            let existing =
1509                db.point_lookup(&p.table, conflict_col, conflict_value, db.snapshot())?;
1510            let existing_row_id = existing.as_ref().map(|row| row.row_id);
1511            let existing_has_vector = existing
1512                .as_ref()
1513                .is_some_and(|row| db.has_live_vector(row.row_id, db.snapshot()));
1514            let upsert_values = if let Some(existing_row) = existing.as_ref() {
1515                match apply_on_conflict_updates(
1516                    db,
1517                    &p.table,
1518                    values.clone(),
1519                    existing_row,
1520                    on_conflict,
1521                    params,
1522                    Some(txid),
1523                ) {
1524                    Ok(v) => v,
1525                    Err(err) => {
1526                        db.accountant().release(row_bytes);
1527                        let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1528                        return Err(err);
1529                    }
1530                }
1531            } else {
1532                values.clone()
1533            };
1534
1535            match db.upsert_row(txid, &p.table, conflict_col, upsert_values) {
1536                Ok(UpsertResult::Inserted) => {
1537                    db.point_lookup_in_tx(
1538                        txid,
1539                        &p.table,
1540                        conflict_col,
1541                        conflict_value,
1542                        db.snapshot(),
1543                    )?
1544                    .ok_or_else(|| {
1545                        Error::Other("inserted upsert row not visible in tx".to_string())
1546                    })?
1547                    .row_id
1548                }
1549                Ok(UpsertResult::Updated) => {
1550                    if existing_has_vector && let Some(existing_row_id) = existing_row_id {
1551                        for index in vector_indexes_for_table(db, &p.table) {
1552                            if db
1553                                .vector_store_live_entry_for_row(
1554                                    &index,
1555                                    existing_row_id,
1556                                    db.snapshot(),
1557                                )
1558                                .is_some()
1559                            {
1560                                db.delete_vector(txid, index, existing_row_id)?;
1561                            }
1562                        }
1563                    }
1564                    db.point_lookup_in_tx(
1565                        txid,
1566                        &p.table,
1567                        conflict_col,
1568                        conflict_value,
1569                        db.snapshot(),
1570                    )?
1571                    .ok_or_else(|| {
1572                        Error::Other("updated upsert row not visible in tx".to_string())
1573                    })?
1574                    .row_id
1575                }
1576                Ok(UpsertResult::NoOp) => {
1577                    db.accountant().release(row_bytes);
1578                    RowId(0)
1579                }
1580                Err(err) => {
1581                    db.accountant().release(row_bytes);
1582                    return Err(err);
1583                }
1584            }
1585        } else {
1586            match db.insert_row_with_unique_noop(txid, &p.table, values) {
1587                Ok(InsertRowResult::Inserted(row_id)) => row_id,
1588                Ok(InsertRowResult::NoOp) => {
1589                    db.accountant().release(row_bytes);
1590                    continue;
1591                }
1592                Err(err) => {
1593                    db.accountant().release(row_bytes);
1594                    return Err(err);
1595                }
1596            }
1597        };
1598
1599        if let Some((source, target, edge_type)) = graph_edge {
1600            match db.insert_edge(txid, source, target, edge_type, HashMap::new()) {
1601                Ok(true) => {}
1602                Ok(false) => {
1603                    let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1604                    db.accountant().release(row_bytes);
1605                    continue;
1606                }
1607                Err(err) => {
1608                    let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1609                    db.accountant().release(row_bytes);
1610                    return Err(err);
1611                }
1612            }
1613        }
1614
1615        if row_id != RowId(0) {
1616            for (column, v) in &vector_values {
1617                let index = contextdb_core::VectorIndexRef::new(&p.table, column.clone());
1618                let vector_bytes = db.vector_insert_accounted_bytes(&index, v.len());
1619                if let Err(err) = db.insert_vector_strict(txid, index, row_id, v.clone()) {
1620                    let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1621                    db.accountant().release(row_bytes);
1622                    release_accounted_bytes(db, &vector_allocations);
1623                    return Err(err);
1624                }
1625                vector_allocations.push(vector_bytes);
1626            }
1627        }
1628
1629        rows_affected += 1;
1630    }
1631
1632    Ok(QueryResult::empty_with_affected(rows_affected))
1633}
1634
1635fn exec_delete(
1636    db: &Database,
1637    p: &DeletePlan,
1638    params: &HashMap<String, Value>,
1639    tx: Option<TxId>,
1640) -> Result<QueryResult> {
1641    let txid = tx.ok_or_else(|| Error::Other("missing tx for delete".to_string()))?;
1642    let snapshot = db.snapshot();
1643    let rows = db.scan(&p.table, snapshot)?;
1644    let resolved_where = p
1645        .where_clause
1646        .as_ref()
1647        .map(|expr| resolve_in_subqueries(db, expr, params, tx))
1648        .transpose()?;
1649    let matched: Vec<_> = rows
1650        .into_iter()
1651        .filter(|r| {
1652            resolved_where
1653                .as_ref()
1654                .is_none_or(|w| row_matches(r, w, params).unwrap_or(false))
1655        })
1656        .collect();
1657
1658    for row in &matched {
1659        for index in vector_indexes_for_table(db, &p.table) {
1660            if db
1661                .vector_store_live_entry_for_row(&index, row.row_id, snapshot)
1662                .is_some()
1663            {
1664                db.delete_vector(txid, index, row.row_id)?;
1665            }
1666        }
1667        db.delete_row(txid, &p.table, row.row_id)?;
1668    }
1669
1670    Ok(QueryResult::empty_with_affected(matched.len() as u64))
1671}
1672
1673fn exec_update(
1674    db: &Database,
1675    p: &UpdatePlan,
1676    params: &HashMap<String, Value>,
1677    tx: Option<TxId>,
1678) -> Result<QueryResult> {
1679    db.check_disk_budget("UPDATE")?;
1680    let txid = tx.ok_or_else(|| Error::Other("missing tx for update".to_string()))?;
1681    let snapshot = db.snapshot();
1682    // Use in-tx scan so prior statements in a BEGIN/COMMIT block are visible:
1683    // the old row must not shadow a previously-updated in-tx row.
1684    let rows = db.scan_in_tx(txid, &p.table, snapshot)?;
1685    let resolved_where = p
1686        .where_clause
1687        .as_ref()
1688        .map(|expr| resolve_in_subqueries(db, expr, params, tx))
1689        .transpose()?;
1690    let matched: Vec<_> = rows
1691        .into_iter()
1692        .filter(|r| {
1693            resolved_where
1694                .as_ref()
1695                .is_none_or(|w| row_matches(r, w, params).unwrap_or(false))
1696        })
1697        .collect();
1698
1699    let current_tx_max = Some(db.committed_watermark());
1700
1701    for row in &matched {
1702        let mut values = row.values.clone();
1703        for (k, vexpr) in &p.assignments {
1704            let value = eval_assignment_expr(vexpr, &row.values, params)?;
1705            values.insert(
1706                k.clone(),
1707                coerce_value_for_column(db, &p.table, k, value, current_tx_max, Some(txid))?,
1708            );
1709        }
1710        validate_update_state_transition(db, &p.table, row, &values)?;
1711        let row_uuid = values.get("id").and_then(Value::as_uuid).copied();
1712        let new_state = db
1713            .table_meta(&p.table)
1714            .as_ref()
1715            .and_then(|meta| meta.state_machine.as_ref())
1716            .and_then(|sm| values.get(&sm.column))
1717            .and_then(Value::as_text)
1718            .map(std::borrow::ToOwned::to_owned);
1719
1720        validate_vector_columns(db, &p.table, &values)?;
1721        let assigned_vector_values: Vec<(String, Vec<f32>)> = p
1722            .assignments
1723            .iter()
1724            .filter_map(|(column, _)| match values.get(column) {
1725                Some(Value::Vector(vector)) => Some((column.clone(), vector.clone())),
1726                _ => None,
1727            })
1728            .collect();
1729        let assigned_vector_columns: HashSet<String> = assigned_vector_values
1730            .iter()
1731            .map(|(column, _)| column.clone())
1732            .collect();
1733        let new_row_bytes = estimate_table_row_bytes(db, &p.table, &values)?;
1734        db.accountant().try_allocate_for(
1735            new_row_bytes,
1736            "update",
1737            "row_replace",
1738            "Reduce row growth or raise MEMORY_LIMIT before updating this row.",
1739        )?;
1740        let checkpoint = db.write_set_checkpoint(txid)?;
1741        let mut vector_allocations = Vec::new();
1742
1743        if let Err(err) = db.delete_row(txid, &p.table, row.row_id) {
1744            db.accountant().release(new_row_bytes);
1745            return Err(err);
1746        }
1747        for column in &assigned_vector_columns {
1748            if let Err(err) = db.delete_vector(
1749                txid,
1750                contextdb_core::VectorIndexRef::new(&p.table, column.clone()),
1751                row.row_id,
1752            ) {
1753                db.accountant().release(new_row_bytes);
1754                let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1755                return Err(err);
1756            }
1757        }
1758
1759        let new_row_id = match db.insert_row_replacing(txid, &p.table, values, row.row_id) {
1760            Ok(row_id) => row_id,
1761            Err(err) => {
1762                db.accountant().release(new_row_bytes);
1763                let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1764                return Err(err);
1765            }
1766        };
1767        for index in vector_indexes_for_table(db, &p.table) {
1768            if assigned_vector_columns.contains(&index.column) {
1769                continue;
1770            }
1771            if let Some(old_entry) =
1772                db.vector_store_live_entry_for_row(&index, row.row_id, snapshot)
1773                && old_entry.deleted_tx.is_none()
1774                && let Err(err) = db.move_vector(txid, index, row.row_id, new_row_id)
1775            {
1776                db.accountant().release(new_row_bytes);
1777                let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1778                return Err(err);
1779            }
1780        }
1781        for (column, vector) in assigned_vector_values {
1782            let index = contextdb_core::VectorIndexRef::new(&p.table, column);
1783            let vector_bytes = db.vector_insert_accounted_bytes(&index, vector.len());
1784            if let Err(err) = db.insert_vector_strict(txid, index, new_row_id, vector) {
1785                db.accountant().release(new_row_bytes);
1786                release_accounted_bytes(db, &vector_allocations);
1787                let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1788                return Err(err);
1789            }
1790            vector_allocations.push(vector_bytes);
1791        }
1792        if let Err(err) =
1793            db.propagate_state_change_if_needed(txid, &p.table, row_uuid, new_state.as_deref())
1794        {
1795            db.accountant().release(new_row_bytes);
1796            release_accounted_bytes(db, &vector_allocations);
1797            let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1798            return Err(err);
1799        }
1800    }
1801
1802    Ok(QueryResult::empty_with_affected(matched.len() as u64))
1803}
1804
1805fn exec_create_index(
1806    db: &Database,
1807    plan: &contextdb_planner::CreateIndexPlan,
1808) -> Result<QueryResult> {
1809    // Reserved-prefix guard: user-declared indexes must not collide with the
1810    // auto-index namespace used for PRIMARY KEY / UNIQUE backing indexes.
1811    for prefix in ["__pk_", "__unique_"] {
1812        if plan.name.starts_with(prefix) {
1813            return Err(Error::ReservedIndexName {
1814                table: plan.table.clone(),
1815                name: plan.name.clone(),
1816                prefix: prefix.to_string(),
1817            });
1818        }
1819    }
1820
1821    // Error precedence (plan §Error precedence): TableNotFound > ColumnNotFound
1822    // > ColumnNotIndexable > DuplicateIndex. Check in that exact order so
1823    // "structural" bugs surface before "naming" bugs.
1824    let meta = db
1825        .table_meta(&plan.table)
1826        .ok_or_else(|| Error::TableNotFound(plan.table.clone()))?;
1827
1828    // 2. Check every column exists.
1829    for (col_name, _) in &plan.columns {
1830        if !meta.columns.iter().any(|c| c.name == *col_name) {
1831            return Err(Error::ColumnNotFound {
1832                table: plan.table.clone(),
1833                column: col_name.clone(),
1834            });
1835        }
1836    }
1837
1838    // 3. Check every column type is B-tree indexable.
1839    for (col_name, _) in &plan.columns {
1840        let col = meta
1841            .columns
1842            .iter()
1843            .find(|c| c.name == *col_name)
1844            .expect("column existence verified above");
1845        if matches!(col.column_type, ColumnType::Json | ColumnType::Vector(_)) {
1846            return Err(Error::ColumnNotIndexable {
1847                table: plan.table.clone(),
1848                column: col_name.clone(),
1849                column_type: col.column_type.clone(),
1850            });
1851        }
1852    }
1853
1854    // 4. Duplicate-name check (last).
1855    if meta.indexes.iter().any(|i| i.name == plan.name) {
1856        return Err(Error::DuplicateIndex {
1857            table: plan.table.clone(),
1858            index: plan.name.clone(),
1859        });
1860    }
1861
1862    // All validations passed. Persist the IndexDecl into TableMeta.indexes,
1863    // register the IndexStorage, rebuild it over existing rows, flush meta.
1864    {
1865        let store = db.relational_store();
1866        let mut metas = store.table_meta.write();
1867        let m = metas
1868            .get_mut(&plan.table)
1869            .ok_or_else(|| Error::TableNotFound(plan.table.clone()))?;
1870        m.indexes.push(contextdb_core::IndexDecl {
1871            name: plan.name.clone(),
1872            columns: plan.columns.clone(),
1873            kind: contextdb_core::IndexKind::UserDeclared,
1874        });
1875    }
1876    db.relational_store()
1877        .create_index_storage(&plan.table, &plan.name, plan.columns.clone());
1878    db.relational_store().rebuild_index(&plan.table, &plan.name);
1879
1880    if let Some(table_meta) = db.table_meta(&plan.table) {
1881        db.persist_table_meta(&plan.table, &table_meta)?;
1882    }
1883
1884    db.allocate_ddl_lsn(|lsn| {
1885        db.log_create_index_ddl(&plan.table, &plan.name, &plan.columns, lsn)
1886    })?;
1887
1888    db.clear_statement_cache();
1889    Ok(QueryResult::empty_with_affected(0))
1890}
1891
1892fn exec_drop_index(db: &Database, plan: &contextdb_planner::DropIndexPlan) -> Result<QueryResult> {
1893    let meta = db
1894        .table_meta(&plan.table)
1895        .ok_or_else(|| Error::TableNotFound(plan.table.clone()))?;
1896    let exists = meta.indexes.iter().any(|i| i.name == plan.name);
1897    if !exists {
1898        if plan.if_exists {
1899            return Ok(QueryResult::empty_with_affected(0));
1900        }
1901        return Err(Error::IndexNotFound {
1902            table: plan.table.clone(),
1903            index: plan.name.clone(),
1904        });
1905    }
1906    if let Some(block) = rank_policy_drop_index_blocker(db, &plan.table, &plan.name) {
1907        return Err(block);
1908    }
1909    {
1910        let store = db.relational_store();
1911        let mut metas = store.table_meta.write();
1912        if let Some(m) = metas.get_mut(&plan.table) {
1913            m.indexes.retain(|i| i.name != plan.name);
1914        }
1915    }
1916    db.relational_store()
1917        .drop_index_storage(&plan.table, &plan.name);
1918    if let Some(table_meta) = db.table_meta(&plan.table) {
1919        db.persist_table_meta(&plan.table, &table_meta)?;
1920    }
1921    db.allocate_ddl_lsn(|lsn| db.log_drop_index_ddl(&plan.table, &plan.name, lsn))?;
1922    db.clear_statement_cache();
1923    Ok(QueryResult::empty_with_affected(0))
1924}
1925
1926fn estimate_table_row_bytes(
1927    db: &Database,
1928    table: &str,
1929    values: &HashMap<String, Value>,
1930) -> Result<usize> {
1931    let meta = db
1932        .table_meta(table)
1933        .ok_or_else(|| Error::TableNotFound(table.to_string()))?;
1934    Ok(estimate_row_bytes_for_meta(values, &meta, false))
1935}
1936
1937// ========================= Index scan planning + execution =========================
1938
1939/// Shape of a predicate on the first indexed column. Drives IndexScan
1940/// eligibility: equality narrows to a point, range to a range walk, IN-list
1941/// to multiple point lookups, IS NULL to the NULL partition.
1942#[derive(Debug, Clone)]
1943pub(crate) enum IndexPredicateShape {
1944    Equality(Value),
1945    NotEqual(Value),
1946    Range {
1947        lower: std::ops::Bound<Value>,
1948        upper: std::ops::Bound<Value>,
1949    },
1950    InList(Vec<Value>),
1951    IsNull,
1952    IsNotNull,
1953}
1954
1955impl IndexPredicateShape {
1956    /// Selectivity tier — lower is more selective.
1957    fn selectivity_tier(&self) -> u8 {
1958        match self {
1959            IndexPredicateShape::Equality(_) | IndexPredicateShape::InList(_) => 0,
1960            IndexPredicateShape::Range { .. } | IndexPredicateShape::NotEqual(_) => 1,
1961            IndexPredicateShape::IsNull | IndexPredicateShape::IsNotNull => 2,
1962        }
1963    }
1964}
1965
1966#[derive(Debug, Clone)]
1967struct IndexPick {
1968    name: String,
1969    columns: Vec<(String, contextdb_core::SortDirection)>,
1970    /// Shape on the FIRST indexed column. Only one shape drives the scan.
1971    shape: IndexPredicateShape,
1972    /// Pushed column name (engine column name) for trace.
1973    pushed_column: String,
1974}
1975
1976/// Top-level decision: did we rewrite to IndexScan?
1977/// Carries index pick + the rejected candidates (for trace) + residual filter.
1978struct IndexAnalysis {
1979    pick: Option<IndexPick>,
1980    considered: Vec<crate::database::IndexCandidate>,
1981}
1982
1983/// Coerce every literal value inside `pick.shape` to `pick.pushed_column`'s
1984/// declared type. B-tree walks use variant-exact comparisons by design, so a
1985/// SELECT `WHERE uuid_col = 'text-literal'` must arrive at the executor with
1986/// the text already converted to Uuid. Coercion failure propagates so callers
1987/// can fall back to zero-rows — matching the semantics a predicate-evaluating
1988/// scan would produce on an un-coercible literal.
1989fn coerce_pick_shape_to_column_type(
1990    db: &Database,
1991    table: &str,
1992    pick: &IndexPick,
1993) -> Result<IndexPick> {
1994    use std::ops::Bound;
1995    let col = &pick.pushed_column;
1996    let coerce = |v: Value| coerce_value_for_column(db, table, col, v, None, None);
1997    let new_shape = match &pick.shape {
1998        IndexPredicateShape::Equality(v) => IndexPredicateShape::Equality(coerce(v.clone())?),
1999        IndexPredicateShape::InList(vs) => IndexPredicateShape::InList(
2000            vs.iter()
2001                .cloned()
2002                .map(&coerce)
2003                .collect::<Result<Vec<_>>>()?,
2004        ),
2005        IndexPredicateShape::Range { lower, upper } => {
2006            let lower = match lower {
2007                Bound::Included(v) => Bound::Included(coerce(v.clone())?),
2008                Bound::Excluded(v) => Bound::Excluded(coerce(v.clone())?),
2009                Bound::Unbounded => Bound::Unbounded,
2010            };
2011            let upper = match upper {
2012                Bound::Included(v) => Bound::Included(coerce(v.clone())?),
2013                Bound::Excluded(v) => Bound::Excluded(coerce(v.clone())?),
2014                Bound::Unbounded => Bound::Unbounded,
2015            };
2016            IndexPredicateShape::Range { lower, upper }
2017        }
2018        IndexPredicateShape::NotEqual(v) => IndexPredicateShape::NotEqual(coerce(v.clone())?),
2019        IndexPredicateShape::IsNull => IndexPredicateShape::IsNull,
2020        IndexPredicateShape::IsNotNull => IndexPredicateShape::IsNotNull,
2021    };
2022    Ok(IndexPick {
2023        name: pick.name.clone(),
2024        columns: pick.columns.clone(),
2025        shape: new_shape,
2026        pushed_column: pick.pushed_column.clone(),
2027    })
2028}
2029
2030/// Inspect `filter` looking for an eligible predicate on the first column of
2031/// any declared index. Returns the chosen pick + list of considered/rejected.
2032fn analyze_filter_for_index(
2033    filter: &Expr,
2034    indexes: &[contextdb_core::IndexDecl],
2035    params: &HashMap<String, Value>,
2036) -> IndexAnalysis {
2037    use std::borrow::Cow;
2038    let mut considered: Vec<crate::database::IndexCandidate> = Vec::new();
2039
2040    // Find each conjunct (split on AND) and map to (column, shape).
2041    let conjuncts = split_conjuncts(filter);
2042    let mut conjunct_shapes: Vec<(String, IndexPredicateShape)> = Vec::new();
2043    for conjunct in &conjuncts {
2044        if let Some((col, shape)) = classify_index_predicate(conjunct, params) {
2045            conjunct_shapes.push((col, shape));
2046        }
2047    }
2048
2049    // Annotate rejections on indexes that can't apply, for the trace.
2050    let mut candidates: Vec<(IndexPick, u8, usize)> = Vec::new();
2051    for (i_idx, decl) in indexes.iter().enumerate() {
2052        let first_col = match decl.columns.first() {
2053            Some((c, _)) => c.clone(),
2054            None => continue,
2055        };
2056        // Find the most-selective matching conjunct on the first column.
2057        let matching: Vec<&(String, IndexPredicateShape)> = conjunct_shapes
2058            .iter()
2059            .filter(|(c, _)| c == &first_col)
2060            .collect();
2061        if matching.is_empty() {
2062            // Check whether the filter mentions first_col in an un-usable way
2063            // (function call / arithmetic / col-to-col / subquery) to produce
2064            // a useful rejection reason.
2065            let reason = classify_rejection_reason(filter, &first_col);
2066            considered.push(crate::database::IndexCandidate {
2067                name: decl.name.clone(),
2068                rejected_reason: Cow::Borrowed(reason),
2069            });
2070            continue;
2071        }
2072        // Combine range conjuncts on the same column (BETWEEN = `>= X AND <= Y`).
2073        let shape = combine_shapes(matching.iter().map(|(_, s)| s.clone()).collect());
2074        let tier = shape.selectivity_tier();
2075        candidates.push((
2076            IndexPick {
2077                name: decl.name.clone(),
2078                columns: decl.columns.clone(),
2079                shape,
2080                pushed_column: first_col.clone(),
2081            },
2082            tier,
2083            i_idx,
2084        ));
2085    }
2086
2087    // Selection: most-selective tier wins; tie-break by creation order (index i).
2088    let pick = candidates
2089        .into_iter()
2090        .min_by(|a, b| a.1.cmp(&b.1).then(a.2.cmp(&b.2)))
2091        .map(|(p, _, _)| p);
2092
2093    IndexAnalysis { pick, considered }
2094}
2095
2096/// Combine multiple index-shapes on the same column into the most-selective
2097/// composite form. Used by BETWEEN (which becomes `col >= X AND col <= Y`).
2098fn combine_shapes(mut shapes: Vec<IndexPredicateShape>) -> IndexPredicateShape {
2099    // Find the single best (most selective) shape.
2100    shapes.sort_by_key(|s| s.selectivity_tier());
2101    let head = shapes.remove(0);
2102    // If the head is a Range, try to merge subsequent Range conjuncts into it.
2103    if let IndexPredicateShape::Range {
2104        mut lower,
2105        mut upper,
2106    } = head.clone()
2107    {
2108        for s in shapes {
2109            if let IndexPredicateShape::Range { lower: l, upper: u } = s {
2110                // Merge lower: more restrictive is higher.
2111                lower = tighter_lower(&lower, &l);
2112                upper = tighter_upper(&upper, &u);
2113            }
2114        }
2115        return IndexPredicateShape::Range { lower, upper };
2116    }
2117    head
2118}
2119
2120fn tighter_lower(a: &std::ops::Bound<Value>, b: &std::ops::Bound<Value>) -> std::ops::Bound<Value> {
2121    use std::ops::Bound;
2122    match (a, b) {
2123        (Bound::Unbounded, _) => b.clone(),
2124        (_, Bound::Unbounded) => a.clone(),
2125        (Bound::Included(va), Bound::Included(vb)) => {
2126            if compare_values(va, vb).is_some_and(|o| o == std::cmp::Ordering::Greater) {
2127                a.clone()
2128            } else {
2129                b.clone()
2130            }
2131        }
2132        (Bound::Excluded(va), Bound::Excluded(vb)) => {
2133            if compare_values(va, vb).is_some_and(|o| o == std::cmp::Ordering::Greater) {
2134                a.clone()
2135            } else {
2136                b.clone()
2137            }
2138        }
2139        (Bound::Included(va), Bound::Excluded(vb)) => {
2140            if compare_values(va, vb).is_some_and(|o| o == std::cmp::Ordering::Greater) {
2141                a.clone()
2142            } else {
2143                b.clone()
2144            }
2145        }
2146        (Bound::Excluded(va), Bound::Included(vb)) => {
2147            if compare_values(va, vb).is_some_and(|o| o == std::cmp::Ordering::Less) {
2148                b.clone()
2149            } else {
2150                a.clone()
2151            }
2152        }
2153    }
2154}
2155
2156fn tighter_upper(a: &std::ops::Bound<Value>, b: &std::ops::Bound<Value>) -> std::ops::Bound<Value> {
2157    use std::ops::Bound;
2158    match (a, b) {
2159        (Bound::Unbounded, _) => b.clone(),
2160        (_, Bound::Unbounded) => a.clone(),
2161        (Bound::Included(va), Bound::Included(vb)) => {
2162            if compare_values(va, vb).is_some_and(|o| o == std::cmp::Ordering::Less) {
2163                a.clone()
2164            } else {
2165                b.clone()
2166            }
2167        }
2168        (Bound::Excluded(va), Bound::Excluded(vb)) => {
2169            if compare_values(va, vb).is_some_and(|o| o == std::cmp::Ordering::Less) {
2170                a.clone()
2171            } else {
2172                b.clone()
2173            }
2174        }
2175        (Bound::Included(va), Bound::Excluded(vb)) => {
2176            if compare_values(va, vb).is_some_and(|o| o == std::cmp::Ordering::Less) {
2177                a.clone()
2178            } else {
2179                b.clone()
2180            }
2181        }
2182        (Bound::Excluded(va), Bound::Included(vb)) => {
2183            if compare_values(va, vb).is_some_and(|o| o == std::cmp::Ordering::Greater) {
2184                b.clone()
2185            } else {
2186                a.clone()
2187            }
2188        }
2189    }
2190}
2191
2192/// Split a boolean expression on top-level AND.
2193fn split_conjuncts(expr: &Expr) -> Vec<Expr> {
2194    match expr {
2195        Expr::BinaryOp {
2196            left,
2197            op: BinOp::And,
2198            right,
2199        } => {
2200            let mut out = split_conjuncts(left);
2201            out.extend(split_conjuncts(right));
2202            out
2203        }
2204        other => vec![other.clone()],
2205    }
2206}
2207
2208/// Look at a predicate of the form `<col-ref> <op> <rhs>` where both sides are
2209/// simple. Return Some((column, shape)) if it's index-eligible, None otherwise.
2210fn classify_index_predicate(
2211    expr: &Expr,
2212    params: &HashMap<String, Value>,
2213) -> Option<(String, IndexPredicateShape)> {
2214    match expr {
2215        Expr::BinaryOp { left, op, right } => {
2216            let col = extract_simple_col_ref(left)?;
2217            // Reject function / arithmetic / column-ref RHS / subquery.
2218            if !is_literal_or_param(right) {
2219                return None;
2220            }
2221            let rhs = resolve_simple_rhs(right, params)?;
2222            let shape = match op {
2223                BinOp::Eq => IndexPredicateShape::Equality(rhs),
2224                BinOp::Neq => IndexPredicateShape::NotEqual(rhs),
2225                BinOp::Lt => IndexPredicateShape::Range {
2226                    lower: std::ops::Bound::Unbounded,
2227                    upper: std::ops::Bound::Excluded(rhs),
2228                },
2229                BinOp::Lte => IndexPredicateShape::Range {
2230                    lower: std::ops::Bound::Unbounded,
2231                    upper: std::ops::Bound::Included(rhs),
2232                },
2233                BinOp::Gt => IndexPredicateShape::Range {
2234                    lower: std::ops::Bound::Excluded(rhs),
2235                    upper: std::ops::Bound::Unbounded,
2236                },
2237                BinOp::Gte => IndexPredicateShape::Range {
2238                    lower: std::ops::Bound::Included(rhs),
2239                    upper: std::ops::Bound::Unbounded,
2240                },
2241                _ => return None,
2242            };
2243            Some((col, shape))
2244        }
2245        Expr::InList {
2246            expr: e,
2247            list,
2248            negated: false,
2249        } => {
2250            let col = extract_simple_col_ref(e)?;
2251            let mut values = Vec::with_capacity(list.len());
2252            for v in list {
2253                if !is_literal_or_param(v) {
2254                    return None;
2255                }
2256                values.push(resolve_simple_rhs(v, params)?);
2257            }
2258            Some((col, IndexPredicateShape::InList(values)))
2259        }
2260        Expr::IsNull { expr: e, negated } => {
2261            let col = extract_simple_col_ref(e)?;
2262            Some((
2263                col,
2264                if *negated {
2265                    IndexPredicateShape::IsNotNull
2266                } else {
2267                    IndexPredicateShape::IsNull
2268                },
2269            ))
2270        }
2271        _ => None,
2272    }
2273}
2274
2275/// Classify why `filter` rejected `column` for IndexScan. Returns a static
2276/// reason string matching the plan's trace-reason vocabulary.
2277fn classify_rejection_reason(filter: &Expr, column: &str) -> &'static str {
2278    // Walk the expression tree and find a predicate mentioning `column`; report
2279    // the first structural reason we detect.
2280    fn walk(expr: &Expr, column: &str) -> Option<&'static str> {
2281        match expr {
2282            Expr::BinaryOp {
2283                left,
2284                op: BinOp::And | BinOp::Or,
2285                right,
2286            } => walk(left, column).or_else(|| walk(right, column)),
2287            Expr::BinaryOp { left, op, right } => {
2288                // Detect arithmetic-on-column specifically (parser lowers
2289                // `a + 1` to FunctionCall { name: "__add", .. }).
2290                if expr_uses_arithmetic_on(left, column) || expr_uses_arithmetic_on(right, column) {
2291                    return Some("arithmetic in predicate");
2292                }
2293                // Generic function call (UPPER(col) etc.)
2294                if expr_uses_function_on(left, column) || expr_uses_function_on(right, column) {
2295                    return Some("function call in predicate");
2296                }
2297                // Column-ref RHS?
2298                if mentions_column_ref(left, column) || mentions_column_ref(right, column) {
2299                    let left_is_col = extract_simple_col_ref(left).as_deref() == Some(column);
2300                    let right_is_col_ref = matches!(right.as_ref(), Expr::Column(_));
2301                    if left_is_col && right_is_col_ref {
2302                        return Some("non-literal rhs");
2303                    }
2304                }
2305                let _ = op;
2306                None
2307            }
2308            Expr::Like { expr: e, .. } => {
2309                if mentions_column_ref(e, column) {
2310                    Some("LIKE is residual-only")
2311                } else {
2312                    None
2313                }
2314            }
2315            Expr::InSubquery { expr: e, .. } => {
2316                if mentions_column_ref(e, column) {
2317                    Some("non-literal rhs")
2318                } else {
2319                    None
2320                }
2321            }
2322            _ => None,
2323        }
2324    }
2325    walk(filter, column).unwrap_or("first column not in WHERE")
2326}
2327
2328fn extract_simple_col_ref(expr: &Expr) -> Option<String> {
2329    match expr {
2330        Expr::Column(r) => Some(r.column.clone()),
2331        _ => None,
2332    }
2333}
2334
2335fn is_literal_or_param(expr: &Expr) -> bool {
2336    match expr {
2337        Expr::Literal(_) | Expr::Parameter(_) => true,
2338        Expr::FunctionCall { name, args } => {
2339            // Arithmetic-of-literals (e.g., `0.0 / 0.0`, `1 + 2`) counts as
2340            // a const RHS for planning purposes; we evaluate at execute time.
2341            matches!(name.as_str(), "__add" | "__sub" | "__mul" | "__div")
2342                && args.iter().all(is_literal_or_param)
2343        }
2344        _ => false,
2345    }
2346}
2347
2348fn resolve_simple_rhs(expr: &Expr, params: &HashMap<String, Value>) -> Option<Value> {
2349    match expr {
2350        Expr::Literal(lit) => Some(match lit {
2351            Literal::Null => Value::Null,
2352            Literal::Bool(b) => Value::Bool(*b),
2353            Literal::Integer(i) => Value::Int64(*i),
2354            Literal::Real(f) => Value::Float64(*f),
2355            Literal::Text(s) => Value::Text(s.clone()),
2356            Literal::Vector(_) => return None,
2357        }),
2358        Expr::Parameter(name) => params.get(name).cloned(),
2359        Expr::FunctionCall { name, args }
2360            if matches!(name.as_str(), "__add" | "__sub" | "__mul" | "__div") =>
2361        {
2362            if args.len() != 2 {
2363                return None;
2364            }
2365            let a = resolve_simple_rhs(&args[0], params)?;
2366            let b = resolve_simple_rhs(&args[1], params)?;
2367            match (a, b, name.as_str()) {
2368                (Value::Int64(x), Value::Int64(y), "__add") => {
2369                    Some(Value::Int64(x.wrapping_add(y)))
2370                }
2371                (Value::Int64(x), Value::Int64(y), "__sub") => {
2372                    Some(Value::Int64(x.wrapping_sub(y)))
2373                }
2374                (Value::Int64(x), Value::Int64(y), "__mul") => {
2375                    Some(Value::Int64(x.wrapping_mul(y)))
2376                }
2377                (Value::Int64(x), Value::Int64(y), "__div") if y != 0 => Some(Value::Int64(x / y)),
2378                (Value::Float64(x), Value::Float64(y), "__add") => Some(Value::Float64(x + y)),
2379                (Value::Float64(x), Value::Float64(y), "__sub") => Some(Value::Float64(x - y)),
2380                (Value::Float64(x), Value::Float64(y), "__mul") => Some(Value::Float64(x * y)),
2381                (Value::Float64(x), Value::Float64(y), "__div") => Some(Value::Float64(x / y)),
2382                _ => None,
2383            }
2384        }
2385        _ => None,
2386    }
2387}
2388
2389fn expr_uses_function_on(expr: &Expr, column: &str) -> bool {
2390    match expr {
2391        Expr::FunctionCall { name, args } => {
2392            // Skip known arithmetic-lowering function-call names; those are
2393            // classified separately as "arithmetic in predicate".
2394            if matches!(name.as_str(), "__add" | "__sub" | "__mul" | "__div") {
2395                return false;
2396            }
2397            args.iter().any(|a| mentions_column_ref(a, column))
2398        }
2399        Expr::BinaryOp { left, right, .. } => {
2400            expr_uses_function_on(left, column) || expr_uses_function_on(right, column)
2401        }
2402        _ => false,
2403    }
2404}
2405
2406fn expr_uses_arithmetic_on(expr: &Expr, column: &str) -> bool {
2407    // The parser lowers `a + 1`, `a - 1`, etc. into FunctionCall with reserved
2408    // names `__add` / `__sub` / `__mul` / `__div`. We detect that shape here.
2409    match expr {
2410        Expr::FunctionCall { name, args } => {
2411            matches!(name.as_str(), "__add" | "__sub" | "__mul" | "__div")
2412                && args.iter().any(|a| mentions_column_ref(a, column))
2413        }
2414        Expr::BinaryOp { left, right, .. } => {
2415            expr_uses_arithmetic_on(left, column) || expr_uses_arithmetic_on(right, column)
2416        }
2417        _ => false,
2418    }
2419}
2420
2421fn mentions_column_ref(expr: &Expr, column: &str) -> bool {
2422    match expr {
2423        Expr::Column(r) => r.column == column,
2424        Expr::FunctionCall { args, .. } => args.iter().any(|a| mentions_column_ref(a, column)),
2425        Expr::BinaryOp { left, right, .. } => {
2426            mentions_column_ref(left, column) || mentions_column_ref(right, column)
2427        }
2428        Expr::UnaryOp { operand, .. } => mentions_column_ref(operand, column),
2429        Expr::IsNull { expr: e, .. } => mentions_column_ref(e, column),
2430        Expr::Like { expr: e, .. } => mentions_column_ref(e, column),
2431        Expr::InList { expr: e, .. } => mentions_column_ref(e, column),
2432        Expr::InSubquery { expr: e, .. } => mentions_column_ref(e, column),
2433        _ => false,
2434    }
2435}
2436
2437/// Walk the index's B-tree per the picked shape, fetch matching rows by
2438/// row_id, apply residual filter, return VersionedRow list.
2439#[allow(clippy::too_many_arguments)]
2440fn execute_index_scan(
2441    db: &Database,
2442    table: &str,
2443    pick: &IndexPick,
2444    snapshot: contextdb_core::SnapshotId,
2445    tx: Option<TxId>,
2446) -> Result<(Vec<VersionedRow>, u64)> {
2447    use contextdb_core::{DirectedValue, SortDirection, TotalOrdAsc, TotalOrdDesc};
2448    use std::ops::Bound;
2449
2450    // NaN equality short-circuit (I19): `col = NaN` or bound param NaN → empty.
2451    if let IndexPredicateShape::Equality(rhs) = &pick.shape
2452        && let Value::Float64(f) = rhs
2453        && f.is_nan()
2454    {
2455        return Ok((Vec::new(), 0));
2456    }
2457    // NULL equality short-circuit: `col = $p` with $p = NULL → empty (NULL
2458    // comparisons are UNKNOWN in SQL).
2459    if let IndexPredicateShape::Equality(Value::Null) = &pick.shape {
2460        return Ok((Vec::new(), 0));
2461    }
2462
2463    // Coerce pick.shape's literal values to the pushed column's declared type.
2464    // A SELECT WHERE uuid_col = 'uuid-string' arrives here with Text(..) even
2465    // though the indexed column stores Uuid(..). B-tree walks use variant-exact
2466    // comparisons (value_total_cmp panics on mismatched variants by design),
2467    // so we must match the stored key-type before walking. Coercion failure
2468    // (e.g. Text that is not a valid UUID) is treated as zero rows matched —
2469    // same semantics a full-scan predicate would produce.
2470    let pick = match coerce_pick_shape_to_column_type(db, table, pick) {
2471        Ok(coerced) => coerced,
2472        Err(_) => return Ok((Vec::new(), 0)),
2473    };
2474    let pick = &pick;
2475
2476    let indexes = db.relational_store().indexes.read();
2477    let storage = match indexes.get(&(table.to_string(), pick.name.clone())) {
2478        Some(s) => s,
2479        None => return Ok((Vec::new(), 0)),
2480    };
2481
2482    // Build bound keys as single-column IndexKey prefixes. Composite indexes:
2483    // we walk the entire range for the first-col match and rely on residual
2484    // filter for subsequent columns.
2485    let first_dir = pick
2486        .columns
2487        .first()
2488        .map(|(_, d)| *d)
2489        .unwrap_or(SortDirection::Asc);
2490
2491    let wrap = |v: Value| -> DirectedValue {
2492        match first_dir {
2493            SortDirection::Asc => DirectedValue::Asc(TotalOrdAsc(v)),
2494            SortDirection::Desc => DirectedValue::Desc(TotalOrdDesc(v)),
2495        }
2496    };
2497
2498    // Collect matching postings then filter by MVCC visibility.
2499    let mut postings: Vec<contextdb_relational::IndexEntry> = Vec::new();
2500    let mut rows_examined: u64 = 0;
2501
2502    let collect_range = |postings: &mut Vec<contextdb_relational::IndexEntry>,
2503                         examined: &mut u64,
2504                         lower: Bound<Vec<DirectedValue>>,
2505                         upper: Bound<Vec<DirectedValue>>| {
2506        for (_k, entries) in storage.tree.range((lower, upper)) {
2507            for e in entries {
2508                *examined += 1;
2509                if e.visible_at(snapshot) {
2510                    postings.push(e.clone());
2511                }
2512            }
2513        }
2514    };
2515
2516    // For composite indexes, a first-column equality must walk ALL keys
2517    // whose first component equals the target (i.e. the prefix range). We
2518    // iterate the whole tree and filter by first-component match to cover
2519    // both single-column and composite shapes uniformly.
2520    let is_composite = pick.columns.len() > 1;
2521
2522    match &pick.shape {
2523        IndexPredicateShape::Equality(v) => {
2524            if is_composite {
2525                // Walk every posting whose first component equals `v`.
2526                let want = wrap(v.clone());
2527                for (key, entries) in storage.tree.iter() {
2528                    if key.first() != Some(&want) {
2529                        continue;
2530                    }
2531                    for e in entries {
2532                        rows_examined += 1;
2533                        if e.visible_at(snapshot) {
2534                            postings.push(e.clone());
2535                        }
2536                    }
2537                }
2538            } else {
2539                let lower = vec![wrap(v.clone())];
2540                let upper = lower.clone();
2541                collect_range(
2542                    &mut postings,
2543                    &mut rows_examined,
2544                    Bound::Included(lower),
2545                    Bound::Included(upper),
2546                );
2547            }
2548        }
2549        IndexPredicateShape::InList(vs) => {
2550            for v in vs {
2551                if is_composite {
2552                    let want = wrap(v.clone());
2553                    for (key, entries) in storage.tree.iter() {
2554                        if key.first() != Some(&want) {
2555                            continue;
2556                        }
2557                        for e in entries {
2558                            rows_examined += 1;
2559                            if e.visible_at(snapshot) {
2560                                postings.push(e.clone());
2561                            }
2562                        }
2563                    }
2564                } else {
2565                    let k = vec![wrap(v.clone())];
2566                    collect_range(
2567                        &mut postings,
2568                        &mut rows_examined,
2569                        Bound::Included(k.clone()),
2570                        Bound::Included(k),
2571                    );
2572                }
2573            }
2574        }
2575        IndexPredicateShape::Range { lower, upper } => {
2576            if is_composite {
2577                // Composite + range on first column: walk entries whose first
2578                // component falls in the range.
2579                for (key, entries) in storage.tree.iter() {
2580                    let Some(first) = key.first() else { continue };
2581                    let in_lower = match lower {
2582                        Bound::Unbounded => true,
2583                        Bound::Included(v) => first >= &wrap(v.clone()),
2584                        Bound::Excluded(v) => first > &wrap(v.clone()),
2585                    };
2586                    let in_upper = match upper {
2587                        Bound::Unbounded => true,
2588                        Bound::Included(v) => first <= &wrap(v.clone()),
2589                        Bound::Excluded(v) => first < &wrap(v.clone()),
2590                    };
2591                    if !(in_lower && in_upper) {
2592                        continue;
2593                    }
2594                    for e in entries {
2595                        rows_examined += 1;
2596                        if e.visible_at(snapshot) {
2597                            postings.push(e.clone());
2598                        }
2599                    }
2600                }
2601            } else {
2602                let l = match lower {
2603                    Bound::Included(v) => Bound::Included(vec![wrap(v.clone())]),
2604                    Bound::Excluded(v) => Bound::Excluded(vec![wrap(v.clone())]),
2605                    Bound::Unbounded => Bound::Unbounded,
2606                };
2607                let u = match upper {
2608                    Bound::Included(v) => Bound::Included(vec![wrap(v.clone())]),
2609                    Bound::Excluded(v) => Bound::Excluded(vec![wrap(v.clone())]),
2610                    Bound::Unbounded => Bound::Unbounded,
2611                };
2612                collect_range(&mut postings, &mut rows_examined, l, u);
2613            }
2614        }
2615        IndexPredicateShape::NotEqual(v) => {
2616            // Full walk; skip exact key. For IndexScan-trace we still attribute
2617            // all postings touched to __rows_examined (trace counts postings).
2618            let except_key = vec![wrap(v.clone())];
2619            for (k, entries) in storage.tree.iter() {
2620                if *k == except_key {
2621                    continue;
2622                }
2623                for e in entries {
2624                    rows_examined += 1;
2625                    if e.visible_at(snapshot) {
2626                        postings.push(e.clone());
2627                    }
2628                }
2629            }
2630        }
2631        IndexPredicateShape::IsNull => {
2632            let k = vec![wrap(Value::Null)];
2633            collect_range(
2634                &mut postings,
2635                &mut rows_examined,
2636                Bound::Included(k.clone()),
2637                Bound::Included(k),
2638            );
2639        }
2640        IndexPredicateShape::IsNotNull => {
2641            // Everything except NULL partition.
2642            let null_key = vec![wrap(Value::Null)];
2643            for (k, entries) in storage.tree.iter() {
2644                if *k == null_key {
2645                    continue;
2646                }
2647                for e in entries {
2648                    rows_examined += 1;
2649                    if e.visible_at(snapshot) {
2650                        postings.push(e.clone());
2651                    }
2652                }
2653            }
2654        }
2655    }
2656
2657    // Now fetch base rows by row_id while preserving index-order. The index
2658    // already enumerates postings in index sort order; rows[] preserve it.
2659    drop(indexes);
2660    let row_ids: Vec<RowId> = postings.iter().map(|p| p.row_id).collect();
2661    if row_ids.is_empty() {
2662        return Ok((Vec::new(), rows_examined));
2663    }
2664    let tables = db.relational_store().tables.read();
2665    let rows_slice = tables.get(table);
2666    let mut out: Vec<VersionedRow> = Vec::with_capacity(row_ids.len());
2667    if let Some(rows) = rows_slice {
2668        let by_id: HashMap<RowId, &VersionedRow> = rows.iter().map(|r| (r.row_id, r)).collect();
2669        for rid in &row_ids {
2670            if let Some(r) = by_id.get(rid) {
2671                // Layered-visibility check: apply same rule as scan_with_tx so
2672                // deletes/inserts from the active tx are honored.
2673                if (*r).visible_at(snapshot) {
2674                    out.push((**r).clone());
2675                }
2676            }
2677        }
2678    }
2679    // Layer tx-scoped inserts / deletes on top, matching the semantics of
2680    // scan_with_tx.
2681    drop(tables);
2682    if let Some(tx_id) = tx {
2683        let overlay = db.index_scan_tx_overlay(tx_id, table, &pick.pushed_column, &pick.shape)?;
2684        let deleted_row_ids = overlay.deleted_row_ids;
2685        out.retain(|row| !deleted_row_ids.contains(&row.row_id));
2686        out.extend(overlay.matching_inserts);
2687    }
2688    Ok((out, rows_examined))
2689}
2690
2691pub(crate) fn range_includes(
2692    v: &Value,
2693    lower: &std::ops::Bound<Value>,
2694    upper: &std::ops::Bound<Value>,
2695) -> bool {
2696    use std::ops::Bound;
2697    let ok_lower = match lower {
2698        Bound::Unbounded => true,
2699        Bound::Included(b) => compare_values(v, b).is_some_and(|o| o != std::cmp::Ordering::Less),
2700        Bound::Excluded(b) => {
2701            compare_values(v, b).is_some_and(|o| o == std::cmp::Ordering::Greater)
2702        }
2703    };
2704    let ok_upper = match upper {
2705        Bound::Unbounded => true,
2706        Bound::Included(b) => {
2707            compare_values(v, b).is_some_and(|o| o != std::cmp::Ordering::Greater)
2708        }
2709        Bound::Excluded(b) => compare_values(v, b).is_some_and(|o| o == std::cmp::Ordering::Less),
2710    };
2711    ok_lower && ok_upper
2712}
2713
2714/// Try to elide the `Sort` node when the child's Scan can be rewritten as an
2715/// IndexScan whose ordering matches `keys`. The common case with no WHERE
2716/// filter uses a full-range index walk. If the Scan has a WHERE filter that
2717/// does NOT match this specific index's first column, we refuse to elide
2718/// (the Scan arm will still pick the best-matching index for the filter and
2719/// the Sort arm's path-B check handles the elision).
2720fn try_elide_sort(
2721    db: &Database,
2722    input: &PhysicalPlan,
2723    keys: &[contextdb_planner::SortKey],
2724    params: &HashMap<String, Value>,
2725    tx: Option<TxId>,
2726) -> Result<Option<QueryResult>> {
2727    fn find_scan(plan: &PhysicalPlan) -> Option<(&String, &Option<String>, &Option<Expr>)> {
2728        match plan {
2729            PhysicalPlan::Scan {
2730                table,
2731                alias,
2732                filter,
2733            } => Some((table, alias, filter)),
2734            PhysicalPlan::Project { input, .. }
2735            | PhysicalPlan::Filter { input, .. }
2736            | PhysicalPlan::Distinct { input }
2737            | PhysicalPlan::Limit { input, .. } => find_scan(input),
2738            _ => None,
2739        }
2740    }
2741    let Some((table, _alias, filter)) = find_scan(input) else {
2742        return Ok(None);
2743    };
2744    // If the underlying Scan has a WHERE, route through the Scan executor
2745    // path so it gets the narrow range / correct rows_examined accounting.
2746    // Path B on the Sort arm will detect the IndexScan trace + matching
2747    // prefix and flip `sort_elided` on the result.
2748    if filter.is_some() {
2749        return Ok(None);
2750    }
2751    // Keys must all be simple column references.
2752    let key_cols: Option<Vec<(&str, &contextdb_parser::ast::SortDirection)>> = keys
2753        .iter()
2754        .map(|k| match &k.expr {
2755            Expr::Column(r) => Some((r.column.as_str(), &k.direction)),
2756            _ => None,
2757        })
2758        .collect();
2759    let Some(key_cols) = key_cols else {
2760        return Ok(None);
2761    };
2762    let meta = match db.table_meta(table) {
2763        Some(m) => m,
2764        None => return Ok(None),
2765    };
2766    let matching_index = meta.indexes.iter().find(|decl| {
2767        if decl.columns.len() < key_cols.len() {
2768            return false;
2769        }
2770        decl.columns
2771            .iter()
2772            .zip(key_cols.iter())
2773            .all(|((col, dir), (kcol, kdir))| col == kcol && core_dir_matches_ast(*dir, **kdir))
2774    });
2775    let Some(matching) = matching_index else {
2776        return Ok(None);
2777    };
2778    run_index_scan_with_order(db, table, matching, filter.as_ref(), params, tx)
2779}
2780
2781/// Execute an IndexScan over `table` with `index`, applying the optional
2782/// residual filter. Constructs predicates_pushed / indexes_considered the
2783/// same way the Scan arm does.
2784fn run_index_scan_with_order(
2785    db: &Database,
2786    table: &str,
2787    decl: &contextdb_core::IndexDecl,
2788    filter: Option<&Expr>,
2789    params: &HashMap<String, Value>,
2790    tx: Option<TxId>,
2791) -> Result<Option<QueryResult>> {
2792    use std::borrow::Cow;
2793    let snapshot = db.snapshot_for_read();
2794    let schema_columns = db.table_meta(table).map(|meta| {
2795        meta.columns
2796            .into_iter()
2797            .map(|column| column.name)
2798            .collect::<Vec<_>>()
2799    });
2800    let resolved_filter = filter
2801        .map(|expr| resolve_in_subqueries(db, expr, params, tx))
2802        .transpose()?;
2803
2804    // Pick: full-range walk for ORDER BY elision. Shape is "unbounded range"
2805    // so we walk every posting. Residual filter applies.
2806    let pick = IndexPick {
2807        name: decl.name.clone(),
2808        columns: decl.columns.clone(),
2809        shape: IndexPredicateShape::Range {
2810            lower: std::ops::Bound::Unbounded,
2811            upper: std::ops::Bound::Unbounded,
2812        },
2813        pushed_column: decl.columns[0].0.clone(),
2814    };
2815    let (rows, examined) = execute_index_scan(db, table, &pick, snapshot, tx)?;
2816    db.__bump_rows_examined(examined);
2817    let mut result = materialize_rows(
2818        rows,
2819        resolved_filter.as_ref(),
2820        params,
2821        schema_columns.as_deref(),
2822    )?;
2823    let mut pushed: smallvec::SmallVec<[Cow<'static, str>; 4]> = smallvec::SmallVec::new();
2824    pushed.push(Cow::Owned(decl.columns[0].0.clone()));
2825    result.trace = crate::database::QueryTrace {
2826        physical_plan: "IndexScan",
2827        index_used: Some(decl.name.clone()),
2828        predicates_pushed: pushed,
2829        indexes_considered: Default::default(),
2830        sort_elided: true,
2831    };
2832    Ok(Some(result))
2833}
2834
2835fn sort_keys_match_index_prefix(
2836    db: &Database,
2837    input: &PhysicalPlan,
2838    index_name: &str,
2839    keys: &[contextdb_planner::SortKey],
2840) -> bool {
2841    fn find_scan_and_filter(plan: &PhysicalPlan) -> Option<(&String, &Option<Expr>)> {
2842        match plan {
2843            PhysicalPlan::Scan { table, filter, .. } => Some((table, filter)),
2844            PhysicalPlan::Project { input, .. }
2845            | PhysicalPlan::Filter { input, .. }
2846            | PhysicalPlan::Distinct { input }
2847            | PhysicalPlan::Limit { input, .. } => find_scan_and_filter(input),
2848            _ => None,
2849        }
2850    }
2851    let (table, filter) = match find_scan_and_filter(input) {
2852        Some(t) => t,
2853        None => return false,
2854    };
2855    let meta = match db.table_meta(table) {
2856        Some(m) => m,
2857        None => return false,
2858    };
2859    let decl = meta.indexes.iter().find(|i| i.name == index_name);
2860    let Some(decl) = decl else {
2861        return false;
2862    };
2863    // Shape guard: IndexScan with InList or NotEqual shape on the leading
2864    // indexed column walks fragmented posting-list ranges, so rows are
2865    // emitted per-value, not globally sorted. Refuse sort elision for those
2866    // shapes — the Sort node must run.
2867    if let Some(filter_expr) = filter.as_ref()
2868        && let Some(leading_col) = decl.columns.first().map(|(c, _)| c.as_str())
2869    {
2870        let conjuncts = split_conjuncts(filter_expr);
2871        let empty_params = HashMap::new();
2872        for conjunct in &conjuncts {
2873            if let Some((col, shape)) = classify_index_predicate(conjunct, &empty_params)
2874                && col == leading_col
2875                && matches!(
2876                    shape,
2877                    IndexPredicateShape::InList(_) | IndexPredicateShape::NotEqual(_)
2878                )
2879            {
2880                return false;
2881            }
2882        }
2883    }
2884    // Determine how many leading index columns the WHERE filter pins to a
2885    // single equality. Those columns are effectively "used up" by the
2886    // IndexScan's range; subsequent ORDER BY keys matching the remaining
2887    // index columns still elide the Sort.
2888    let pinned_prefix_len = count_equality_prefix(filter.as_ref(), &decl.columns);
2889    let remaining_index_cols = &decl.columns[pinned_prefix_len..];
2890    if remaining_index_cols.len() < keys.len() {
2891        return false;
2892    }
2893    remaining_index_cols
2894        .iter()
2895        .zip(keys.iter())
2896        .all(|((col, dir), k)| match &k.expr {
2897            Expr::Column(r) => r.column == *col && core_dir_matches_ast(*dir, k.direction),
2898            _ => false,
2899        })
2900}
2901
2902fn count_equality_prefix(
2903    filter: Option<&Expr>,
2904    columns: &[(String, contextdb_core::SortDirection)],
2905) -> usize {
2906    let Some(filter) = filter else {
2907        return 0;
2908    };
2909    let conjuncts = split_conjuncts(filter);
2910    let mut pinned = 0usize;
2911    for (col, _) in columns {
2912        let has_eq = conjuncts.iter().any(|c| match c {
2913            Expr::BinaryOp {
2914                left,
2915                op: BinOp::Eq,
2916                right,
2917            } => {
2918                let left_is_col = matches!(left.as_ref(), Expr::Column(r) if r.column == *col);
2919                let right_is_simple =
2920                    matches!(right.as_ref(), Expr::Literal(_) | Expr::Parameter(_));
2921                left_is_col && right_is_simple
2922            }
2923            _ => false,
2924        });
2925        if has_eq {
2926            pinned += 1;
2927        } else {
2928            break;
2929        }
2930    }
2931    pinned
2932}
2933
2934fn core_dir_matches_ast(
2935    core: contextdb_core::SortDirection,
2936    ast: contextdb_parser::ast::SortDirection,
2937) -> bool {
2938    matches!(
2939        (core, ast),
2940        (
2941            contextdb_core::SortDirection::Asc,
2942            contextdb_parser::ast::SortDirection::Asc
2943        ) | (
2944            contextdb_core::SortDirection::Desc,
2945            contextdb_parser::ast::SortDirection::Desc
2946        )
2947    )
2948}
2949
2950// ========================= End of index scan planning =========================
2951
2952fn validate_update_state_transition(
2953    db: &Database,
2954    table: &str,
2955    existing: &VersionedRow,
2956    next_values: &HashMap<String, Value>,
2957) -> Result<()> {
2958    let Some(meta) = db.table_meta(table) else {
2959        return Ok(());
2960    };
2961    let Some(state_machine) = meta.state_machine else {
2962        return Ok(());
2963    };
2964
2965    let old_state = existing
2966        .values
2967        .get(&state_machine.column)
2968        .and_then(Value::as_text);
2969    let new_state = next_values
2970        .get(&state_machine.column)
2971        .and_then(Value::as_text);
2972
2973    let (Some(old_state), Some(new_state)) = (old_state, new_state) else {
2974        return Ok(());
2975    };
2976
2977    if old_state == new_state
2978        || db.relational_store().validate_state_transition(
2979            table,
2980            &state_machine.column,
2981            old_state,
2982            new_state,
2983        )
2984    {
2985        return Ok(());
2986    }
2987
2988    Err(Error::InvalidStateTransition(format!(
2989        "{old_state} -> {new_state}"
2990    )))
2991}
2992
2993fn estimate_row_bytes_for_meta(
2994    values: &HashMap<String, Value>,
2995    meta: &TableMeta,
2996    include_vectors: bool,
2997) -> usize {
2998    let mut bytes = 96usize;
2999    for column in &meta.columns {
3000        let Some(value) = values.get(&column.name) else {
3001            continue;
3002        };
3003        if !include_vectors && matches!(column.column_type, ColumnType::Vector(_)) {
3004            continue;
3005        }
3006        bytes = bytes.saturating_add(32 + column.name.len() * 8 + value.estimated_bytes());
3007    }
3008    bytes
3009}
3010
3011fn estimate_vector_search_bytes(dimension: usize, k: usize) -> usize {
3012    k.saturating_mul(3)
3013        .saturating_mul(dimension)
3014        .saturating_mul(std::mem::size_of::<f32>())
3015}
3016
3017fn estimate_bfs_working_bytes<T>(
3018    frontier: &[T],
3019    steps: &[contextdb_planner::GraphStepPlan],
3020) -> usize {
3021    let max_hops = steps.iter().fold(0usize, |acc, step| {
3022        acc.saturating_add(step.max_depth as usize)
3023    });
3024    frontier
3025        .len()
3026        .saturating_mul(2048)
3027        .saturating_mul(max_hops.max(1))
3028}
3029
3030fn dedupe_graph_frontier(
3031    frontier: Vec<(HashMap<String, uuid::Uuid>, uuid::Uuid, u32)>,
3032    steps: &[contextdb_planner::GraphStepPlan],
3033) -> Vec<(HashMap<String, uuid::Uuid>, uuid::Uuid, u32)> {
3034    let mut best =
3035        HashMap::<Vec<uuid::Uuid>, (HashMap<String, uuid::Uuid>, uuid::Uuid, u32)>::new();
3036
3037    for (bindings, current_id, depth) in frontier {
3038        let mut key = Vec::with_capacity(steps.len());
3039        for step in steps {
3040            if let Some(id) = bindings.get(&step.target_alias) {
3041                key.push(*id);
3042            }
3043        }
3044
3045        best.entry(key)
3046            .and_modify(|existing| {
3047                if depth < existing.2 {
3048                    *existing = (bindings.clone(), current_id, depth);
3049                }
3050            })
3051            .or_insert((bindings, current_id, depth));
3052    }
3053
3054    best.into_values().collect()
3055}
3056
3057fn estimate_drop_table_bytes(db: &Database, table: &str) -> usize {
3058    let meta = db.table_meta(table);
3059    let metadata_bytes = meta.as_ref().map(TableMeta::estimated_bytes).unwrap_or(0);
3060    let snapshot = db.snapshot();
3061    let rows = db.scan(table, snapshot).unwrap_or_default();
3062    let row_bytes = rows.iter().fold(0usize, |acc, row| {
3063        acc.saturating_add(meta.as_ref().map_or_else(
3064            || row.estimated_bytes(),
3065            |meta| estimate_row_bytes_for_meta(&row.values, meta, false),
3066        ))
3067    });
3068    let vector_bytes = rows
3069        .iter()
3070        .filter_map(|row| db.live_vector_entry(row.row_id, snapshot))
3071        .fold(0usize, |acc, entry| {
3072            acc.saturating_add(entry.estimated_bytes())
3073        });
3074    let edge_bytes = rows.iter().fold(0usize, |acc, row| {
3075        match (
3076            row.values.get("source_id").and_then(Value::as_uuid),
3077            row.values.get("target_id").and_then(Value::as_uuid),
3078            row.values.get("edge_type").and_then(Value::as_text),
3079        ) {
3080            (Some(_), Some(_), Some(edge_type)) => acc.saturating_add(
3081                96 + edge_type.len().saturating_mul(16) + estimate_row_value_bytes(&HashMap::new()),
3082            ),
3083            _ => acc,
3084        }
3085    });
3086    metadata_bytes
3087        .saturating_add(row_bytes)
3088        .saturating_add(vector_bytes)
3089        .saturating_add(edge_bytes)
3090}
3091
3092fn materialize_rows(
3093    rows: Vec<VersionedRow>,
3094    filter: Option<&Expr>,
3095    params: &HashMap<String, Value>,
3096    schema_columns: Option<&[String]>,
3097) -> Result<QueryResult> {
3098    let filtered: Vec<VersionedRow> = rows
3099        .into_iter()
3100        .filter(|r| filter.is_none_or(|f| row_matches(r, f, params).unwrap_or(false)))
3101        .collect();
3102
3103    let keys = if let Some(schema_columns) = schema_columns {
3104        schema_columns.to_vec()
3105    } else {
3106        let mut keys = BTreeSet::new();
3107        for r in &filtered {
3108            for k in r.values.keys() {
3109                keys.insert(k.clone());
3110            }
3111        }
3112        keys.into_iter().collect::<Vec<_>>()
3113    };
3114
3115    let mut columns = vec!["row_id".to_string()];
3116    columns.extend(keys.iter().cloned());
3117
3118    let rows = filtered
3119        .into_iter()
3120        .map(|r| {
3121            let mut out = vec![Value::Int64(r.row_id.0 as i64)];
3122            for k in &keys {
3123                out.push(r.values.get(k).cloned().unwrap_or(Value::Null));
3124            }
3125            out
3126        })
3127        .collect();
3128
3129    Ok(QueryResult {
3130        columns,
3131        rows,
3132        rows_affected: 0,
3133        trace: crate::database::QueryTrace::scan(),
3134        cascade: None,
3135    })
3136}
3137
3138fn rows_by_row_id(
3139    db: &Database,
3140    table: &str,
3141    row_ids: &[RowId],
3142    snapshot: SnapshotId,
3143) -> Result<Vec<VersionedRow>> {
3144    if row_ids.is_empty() {
3145        return Ok(Vec::new());
3146    }
3147
3148    let wanted = row_ids.iter().copied().collect::<HashSet<_>>();
3149    let tables = db.relational_store().tables.read();
3150    let rows = tables
3151        .get(table)
3152        .ok_or_else(|| Error::TableNotFound(table.to_string()))?;
3153    let mut found = HashMap::with_capacity(wanted.len());
3154    for row in rows {
3155        if wanted.contains(&row.row_id) && row.visible_at(snapshot) {
3156            found.insert(row.row_id, row.clone());
3157            if found.len() == wanted.len() {
3158                break;
3159            }
3160        }
3161    }
3162
3163    Ok(row_ids
3164        .iter()
3165        .filter_map(|row_id| found.remove(row_id))
3166        .collect())
3167}
3168
3169fn uuid_to_row_id_map(
3170    db: &Database,
3171    table: &str,
3172    snapshot: SnapshotId,
3173) -> Result<HashMap<uuid::Uuid, RowId>> {
3174    let tables = db.relational_store().tables.read();
3175    let rows = tables
3176        .get(table)
3177        .ok_or_else(|| Error::TableNotFound(table.to_string()))?;
3178    Ok(rows
3179        .iter()
3180        .filter(|row| row.visible_at(snapshot))
3181        .filter_map(|row| match row.values.get("id") {
3182            Some(Value::Uuid(uuid)) => Some((*uuid, row.row_id)),
3183            _ => None,
3184        })
3185        .collect())
3186}
3187
3188fn vector_search_trace(operator: &'static str, candidate_trace: Option<QueryTrace>) -> QueryTrace {
3189    let Some(mut trace) = candidate_trace else {
3190        return QueryTrace {
3191            physical_plan: operator,
3192            ..Default::default()
3193        };
3194    };
3195
3196    trace.physical_plan = match (trace.physical_plan, operator) {
3197        ("IndexScan", "HNSWSearch") => "IndexScan -> HNSWSearch",
3198        ("IndexScan", _) => "IndexScan -> VectorSearch",
3199        ("Scan", "HNSWSearch") => "Scan -> HNSWSearch",
3200        ("Scan", _) => "Scan -> VectorSearch",
3201        (_, "HNSWSearch") => "HNSWSearch",
3202        _ => "VectorSearch",
3203    };
3204    trace.sort_elided = false;
3205    trace
3206}
3207
3208pub(crate) fn row_matches(
3209    row: &VersionedRow,
3210    expr: &Expr,
3211    params: &HashMap<String, Value>,
3212) -> Result<bool> {
3213    Ok(eval_bool_expr(row, expr, params)?.unwrap_or(false))
3214}
3215
3216fn eval_expr_value(
3217    row: &VersionedRow,
3218    expr: &Expr,
3219    params: &HashMap<String, Value>,
3220) -> Result<Value> {
3221    match expr {
3222        Expr::Column(c) => {
3223            if c.column == "row_id" {
3224                Ok(Value::Int64(row.row_id.0 as i64))
3225            } else {
3226                Ok(row.values.get(&c.column).cloned().unwrap_or(Value::Null))
3227            }
3228        }
3229        Expr::BinaryOp { left, op, right } => {
3230            let left = eval_expr_value(row, left, params)?;
3231            let right = eval_expr_value(row, right, params)?;
3232            eval_binary_op(op, &left, &right)
3233        }
3234        Expr::UnaryOp { op, operand } => {
3235            let value = eval_expr_value(row, operand, params)?;
3236            match op {
3237                UnaryOp::Not => Ok(Value::Bool(!value_to_bool(&value))),
3238                UnaryOp::Neg => match value {
3239                    Value::Int64(v) => Ok(Value::Int64(-v)),
3240                    Value::Float64(v) => Ok(Value::Float64(-v)),
3241                    _ => Err(Error::PlanError(
3242                        "cannot negate non-numeric value".to_string(),
3243                    )),
3244                },
3245            }
3246        }
3247        Expr::FunctionCall { name, args } => eval_function_in_row_context(row, name, args, params),
3248        Expr::IsNull { expr, negated } => {
3249            let is_null = eval_expr_value(row, expr, params)? == Value::Null;
3250            Ok(Value::Bool(if *negated { !is_null } else { is_null }))
3251        }
3252        Expr::InList {
3253            expr,
3254            list,
3255            negated,
3256        } => {
3257            let needle = eval_expr_value(row, expr, params)?;
3258            let matched = list.iter().try_fold(false, |found, item| {
3259                if found {
3260                    Ok(true)
3261                } else {
3262                    let candidate = eval_expr_value(row, item, params)?;
3263                    Ok(
3264                        matches!(compare_values(&needle, &candidate), Some(Ordering::Equal))
3265                            || (needle != Value::Null
3266                                && candidate != Value::Null
3267                                && needle == candidate),
3268                    )
3269                }
3270            })?;
3271            Ok(Value::Bool(if *negated { !matched } else { matched }))
3272        }
3273        Expr::Like {
3274            expr,
3275            pattern,
3276            negated,
3277        } => {
3278            let matches = match (
3279                eval_expr_value(row, expr, params)?,
3280                eval_expr_value(row, pattern, params)?,
3281            ) {
3282                (Value::Text(value), Value::Text(pattern)) => like_matches(&value, &pattern),
3283                _ => false,
3284            };
3285            Ok(Value::Bool(if *negated { !matches } else { matches }))
3286        }
3287        _ => resolve_expr(expr, params),
3288    }
3289}
3290
3291pub fn resolve_expr(expr: &Expr, params: &HashMap<String, Value>) -> Result<Value> {
3292    match expr {
3293        Expr::Literal(l) => Ok(match l {
3294            Literal::Null => Value::Null,
3295            Literal::Bool(v) => Value::Bool(*v),
3296            Literal::Integer(v) => Value::Int64(*v),
3297            Literal::Real(v) => Value::Float64(*v),
3298            Literal::Text(v) => Value::Text(v.clone()),
3299            Literal::Vector(v) => Value::Vector(v.clone()),
3300        }),
3301        Expr::Parameter(p) => params
3302            .get(p)
3303            .cloned()
3304            .ok_or_else(|| Error::NotFound(format!("missing parameter: {}", p))),
3305        Expr::Column(c) => Ok(Value::Text(c.column.clone())),
3306        Expr::UnaryOp { op, operand } => match op {
3307            UnaryOp::Neg => match resolve_expr(operand, params)? {
3308                Value::Int64(v) => Ok(Value::Int64(-v)),
3309                Value::Float64(v) => Ok(Value::Float64(-v)),
3310                _ => Err(Error::PlanError(
3311                    "cannot negate non-numeric value".to_string(),
3312                )),
3313            },
3314            UnaryOp::Not => Err(Error::PlanError(
3315                "boolean NOT requires row context".to_string(),
3316            )),
3317        },
3318        Expr::FunctionCall { name, args } => {
3319            let values = args
3320                .iter()
3321                .map(|arg| resolve_expr(arg, params))
3322                .collect::<Result<Vec<_>>>()?;
3323            eval_function(name, &values)
3324        }
3325        Expr::CosineDistance { right, .. } => resolve_expr(right, params),
3326        _ => Err(Error::PlanError("unsupported expression".to_string())),
3327    }
3328}
3329
3330fn compare_values(a: &Value, b: &Value) -> Option<Ordering> {
3331    match (a, b) {
3332        (Value::Int64(left), Value::Int64(right)) => Some(left.cmp(right)),
3333        (Value::Float64(left), Value::Float64(right)) => Some(left.total_cmp(right)),
3334        (Value::Text(left), Value::Text(right)) => Some(left.cmp(right)),
3335        (Value::Timestamp(left), Value::Timestamp(right)) => Some(left.cmp(right)),
3336        (Value::Int64(left), Value::Float64(right)) => Some((*left as f64).total_cmp(right)),
3337        (Value::Float64(left), Value::Int64(right)) => Some(left.total_cmp(&(*right as f64))),
3338        (Value::Timestamp(left), Value::Int64(right)) => Some(left.cmp(right)),
3339        (Value::Int64(left), Value::Timestamp(right)) => Some(left.cmp(right)),
3340        (Value::Bool(left), Value::Bool(right)) => Some(left.cmp(right)),
3341        (Value::Uuid(left), Value::Uuid(right)) => Some(left.cmp(right)),
3342        (Value::Uuid(u), Value::Text(t)) => {
3343            if let Ok(parsed) = t.parse::<uuid::Uuid>() {
3344                Some(u.cmp(&parsed))
3345            } else {
3346                None
3347            }
3348        }
3349        (Value::Text(t), Value::Uuid(u)) => {
3350            if let Ok(parsed) = t.parse::<uuid::Uuid>() {
3351                Some(parsed.cmp(u))
3352            } else {
3353                None
3354            }
3355        }
3356        (Value::TxId(a), Value::TxId(b)) => Some(a.0.cmp(&b.0)),
3357        (Value::TxId(a), Value::Int64(b)) => {
3358            if *b < 0 {
3359                Some(Ordering::Greater)
3360            } else {
3361                Some(a.0.cmp(&(*b as u64)))
3362            }
3363        }
3364        (Value::Int64(a), Value::TxId(b)) => {
3365            if *a < 0 {
3366                Some(Ordering::Less)
3367            } else {
3368                Some((*a as u64).cmp(&b.0))
3369            }
3370        }
3371        (Value::TxId(_), Value::Timestamp(_)) | (Value::Timestamp(_), Value::TxId(_)) => None,
3372        (Value::Null, _) | (_, Value::Null) => None,
3373        _ => None,
3374    }
3375}
3376
3377fn eval_bool_expr(
3378    row: &VersionedRow,
3379    expr: &Expr,
3380    params: &HashMap<String, Value>,
3381) -> Result<Option<bool>> {
3382    match expr {
3383        Expr::BinaryOp { left, op, right } => match op {
3384            BinOp::Eq | BinOp::Neq | BinOp::Lt | BinOp::Lte | BinOp::Gt | BinOp::Gte => {
3385                let left = eval_expr_value(row, left, params)?;
3386                let right = eval_expr_value(row, right, params)?;
3387                if left == Value::Null || right == Value::Null {
3388                    return Ok(None);
3389                }
3390
3391                let result = match op {
3392                    BinOp::Eq => {
3393                        compare_values(&left, &right) == Some(Ordering::Equal) || left == right
3394                    }
3395                    BinOp::Neq => {
3396                        !(compare_values(&left, &right) == Some(Ordering::Equal) || left == right)
3397                    }
3398                    BinOp::Lt => compare_values(&left, &right) == Some(Ordering::Less),
3399                    BinOp::Lte => matches!(
3400                        compare_values(&left, &right),
3401                        Some(Ordering::Less | Ordering::Equal)
3402                    ),
3403                    BinOp::Gt => compare_values(&left, &right) == Some(Ordering::Greater),
3404                    BinOp::Gte => matches!(
3405                        compare_values(&left, &right),
3406                        Some(Ordering::Greater | Ordering::Equal)
3407                    ),
3408                    BinOp::And | BinOp::Or => unreachable!(),
3409                };
3410                Ok(Some(result))
3411            }
3412            BinOp::And => {
3413                let left = eval_bool_expr(row, left, params)?;
3414                if left == Some(false) {
3415                    return Ok(Some(false));
3416                }
3417                let right = eval_bool_expr(row, right, params)?;
3418                Ok(match (left, right) {
3419                    (Some(true), Some(true)) => Some(true),
3420                    (Some(true), other) => other,
3421                    (None, Some(false)) => Some(false),
3422                    (None, Some(true)) | (None, None) => None,
3423                    (Some(false), _) => Some(false),
3424                })
3425            }
3426            BinOp::Or => {
3427                let left = eval_bool_expr(row, left, params)?;
3428                if left == Some(true) {
3429                    return Ok(Some(true));
3430                }
3431                let right = eval_bool_expr(row, right, params)?;
3432                Ok(match (left, right) {
3433                    (Some(false), Some(false)) => Some(false),
3434                    (Some(false), other) => other,
3435                    (None, Some(true)) => Some(true),
3436                    (None, Some(false)) | (None, None) => None,
3437                    (Some(true), _) => Some(true),
3438                })
3439            }
3440        },
3441        Expr::UnaryOp {
3442            op: UnaryOp::Not,
3443            operand,
3444        } => Ok(eval_bool_expr(row, operand, params)?.map(|value| !value)),
3445        Expr::InList {
3446            expr,
3447            list,
3448            negated,
3449        } => {
3450            let needle = eval_expr_value(row, expr, params)?;
3451            if needle == Value::Null {
3452                return Ok(None);
3453            }
3454
3455            let matched = list.iter().try_fold(false, |found, item| {
3456                if found {
3457                    Ok(true)
3458                } else {
3459                    let candidate = eval_expr_value(row, item, params)?;
3460                    Ok(
3461                        matches!(compare_values(&needle, &candidate), Some(Ordering::Equal))
3462                            || (candidate != Value::Null && needle == candidate),
3463                    )
3464                }
3465            })?;
3466            Ok(Some(if *negated { !matched } else { matched }))
3467        }
3468        Expr::InSubquery { .. } => Err(Error::PlanError(
3469            "IN (subquery) must be resolved before execution".to_string(),
3470        )),
3471        Expr::Like {
3472            expr,
3473            pattern,
3474            negated,
3475        } => {
3476            let left = eval_expr_value(row, expr, params)?;
3477            let right = eval_expr_value(row, pattern, params)?;
3478            let matched = match (left, right) {
3479                (Value::Text(value), Value::Text(pattern)) => like_matches(&value, &pattern),
3480                _ => false,
3481            };
3482            Ok(Some(if *negated { !matched } else { matched }))
3483        }
3484        Expr::IsNull { expr, negated } => {
3485            let is_null = eval_expr_value(row, expr, params)? == Value::Null;
3486            Ok(Some(if *negated { !is_null } else { is_null }))
3487        }
3488        Expr::FunctionCall { .. } => match eval_expr_value(row, expr, params)? {
3489            Value::Bool(value) => Ok(Some(value)),
3490            Value::Null => Ok(None),
3491            _ => Err(Error::PlanError(format!(
3492                "unsupported WHERE expression: {:?}",
3493                expr
3494            ))),
3495        },
3496        _ => Err(Error::PlanError(format!(
3497            "unsupported WHERE expression: {:?}",
3498            expr
3499        ))),
3500    }
3501}
3502
3503fn eval_binary_op(op: &BinOp, left: &Value, right: &Value) -> Result<Value> {
3504    let bool_value = match op {
3505        BinOp::Eq => {
3506            if left == &Value::Null || right == &Value::Null {
3507                false
3508            } else {
3509                compare_values(left, right) == Some(Ordering::Equal) || left == right
3510            }
3511        }
3512        BinOp::Neq => {
3513            if left == &Value::Null || right == &Value::Null {
3514                false
3515            } else {
3516                !(compare_values(left, right) == Some(Ordering::Equal) || left == right)
3517            }
3518        }
3519        BinOp::Lt => compare_values(left, right) == Some(Ordering::Less),
3520        BinOp::Lte => matches!(
3521            compare_values(left, right),
3522            Some(Ordering::Less | Ordering::Equal)
3523        ),
3524        BinOp::Gt => compare_values(left, right) == Some(Ordering::Greater),
3525        BinOp::Gte => matches!(
3526            compare_values(left, right),
3527            Some(Ordering::Greater | Ordering::Equal)
3528        ),
3529        BinOp::And => value_to_bool(left) && value_to_bool(right),
3530        BinOp::Or => value_to_bool(left) || value_to_bool(right),
3531    };
3532    Ok(Value::Bool(bool_value))
3533}
3534
3535fn value_to_bool(value: &Value) -> bool {
3536    matches!(value, Value::Bool(true))
3537}
3538
3539fn compare_sort_values(left: &Value, right: &Value, direction: SortDirection) -> Ordering {
3540    match (left, right) {
3541        (Value::Null, Value::Null) => Ordering::Equal,
3542        (Value::Null, _) => match direction {
3543            SortDirection::Asc => Ordering::Greater,
3544            SortDirection::Desc => Ordering::Less,
3545            SortDirection::CosineDistance => Ordering::Equal,
3546        },
3547        (_, Value::Null) => match direction {
3548            SortDirection::Asc => Ordering::Less,
3549            SortDirection::Desc => Ordering::Greater,
3550            SortDirection::CosineDistance => Ordering::Equal,
3551        },
3552        _ => {
3553            let ordering = compare_values(left, right).unwrap_or(Ordering::Equal);
3554            match direction {
3555                SortDirection::Asc => ordering,
3556                SortDirection::Desc => ordering.reverse(),
3557                SortDirection::CosineDistance => ordering,
3558            }
3559        }
3560    }
3561}
3562
3563fn eval_assignment_expr(
3564    expr: &Expr,
3565    row_values: &HashMap<String, Value>,
3566    params: &HashMap<String, Value>,
3567) -> Result<Value> {
3568    match expr {
3569        Expr::Literal(lit) => literal_to_value(lit),
3570        Expr::Parameter(name) => params
3571            .get(name)
3572            .cloned()
3573            .ok_or_else(|| Error::Other(format!("unknown parameter: {}", name))),
3574        Expr::Column(col_ref) => row_values
3575            .get(&col_ref.column)
3576            .cloned()
3577            .ok_or_else(|| Error::Other(format!("column not found: {}", col_ref.column))),
3578        Expr::BinaryOp { left, op, right } => {
3579            let left = eval_assignment_expr(left, row_values, params)?;
3580            let right = eval_assignment_expr(right, row_values, params)?;
3581            eval_binary_op(op, &left, &right)
3582        }
3583        Expr::UnaryOp { op, operand } => match op {
3584            UnaryOp::Neg => match eval_assignment_expr(operand, row_values, params)? {
3585                Value::Int64(value) => Ok(Value::Int64(-value)),
3586                Value::Float64(value) => Ok(Value::Float64(-value)),
3587                _ => Err(Error::Other(format!(
3588                    "unsupported expression in UPDATE SET: {:?}",
3589                    expr
3590                ))),
3591            },
3592            UnaryOp::Not => Err(Error::Other(format!(
3593                "unsupported expression in UPDATE SET: {:?}",
3594                expr
3595            ))),
3596        },
3597        Expr::FunctionCall { name, args } => {
3598            let evaluated = args
3599                .iter()
3600                .map(|arg| eval_assignment_expr(arg, row_values, params))
3601                .collect::<Result<Vec<_>>>()?;
3602            eval_function(name, &evaluated)
3603        }
3604        _ => Err(Error::Other(format!(
3605            "unsupported expression in UPDATE SET: {:?}",
3606            expr
3607        ))),
3608    }
3609}
3610
3611fn apply_on_conflict_updates(
3612    db: &Database,
3613    table: &str,
3614    mut insert_values: HashMap<String, Value>,
3615    existing_row: &VersionedRow,
3616    on_conflict: &OnConflictPlan,
3617    params: &HashMap<String, Value>,
3618    active_tx: Option<TxId>,
3619) -> Result<HashMap<String, Value>> {
3620    if on_conflict.update_columns.is_empty() {
3621        return Ok(insert_values);
3622    }
3623
3624    // Reject column-level IMMUTABLE updates at the ON CONFLICT DO UPDATE merge
3625    // point. First flagged column in update-list order wins. Rejection returns
3626    // Err here; the caller (exec_insert) is responsible for releasing any
3627    // allocator bytes and restoring the write-set checkpoint.
3628    if let Some(meta) = db.table_meta(table) {
3629        for (column, _) in &on_conflict.update_columns {
3630            if let Some(col_def) = meta.columns.iter().find(|c| c.name == *column)
3631                && col_def.immutable
3632            {
3633                return Err(Error::ImmutableColumn {
3634                    table: table.to_string(),
3635                    column: column.clone(),
3636                });
3637            }
3638        }
3639    }
3640
3641    let current_tx_max = Some(db.committed_watermark());
3642
3643    let mut merged = existing_row.values.clone();
3644    for (column, expr) in &on_conflict.update_columns {
3645        let value = eval_assignment_expr(expr, &existing_row.values, params)?;
3646        merged.insert(
3647            column.clone(),
3648            coerce_value_for_column(db, table, column, value, current_tx_max, active_tx)?,
3649        );
3650    }
3651
3652    for (column, value) in insert_values.drain() {
3653        merged.entry(column).or_insert(value);
3654    }
3655
3656    Ok(merged)
3657}
3658
3659fn literal_to_value(lit: &Literal) -> Result<Value> {
3660    Ok(match lit {
3661        Literal::Null => Value::Null,
3662        Literal::Bool(v) => Value::Bool(*v),
3663        Literal::Integer(v) => Value::Int64(*v),
3664        Literal::Real(v) => Value::Float64(*v),
3665        Literal::Text(v) => Value::Text(v.clone()),
3666        Literal::Vector(v) => Value::Vector(v.clone()),
3667    })
3668}
3669
3670fn eval_arithmetic(name: &str, args: &[Value]) -> Result<Value> {
3671    let [left, right] = args else {
3672        return Err(Error::PlanError(format!(
3673            "function {} expects 2 arguments",
3674            name
3675        )));
3676    };
3677
3678    match (left, right) {
3679        (Value::Int64(left), Value::Int64(right)) => match name {
3680            "__add" => Ok(Value::Int64(left + right)),
3681            "__sub" => Ok(Value::Int64(left - right)),
3682            "__mul" => Ok(Value::Int64(left * right)),
3683            "__div" => Ok(Value::Int64(left / right)),
3684            _ => Err(Error::PlanError(format!("unknown function: {}", name))),
3685        },
3686        (Value::Float64(left), Value::Float64(right)) => match name {
3687            "__add" => Ok(Value::Float64(left + right)),
3688            "__sub" => Ok(Value::Float64(left - right)),
3689            "__mul" => Ok(Value::Float64(left * right)),
3690            "__div" => Ok(Value::Float64(left / right)),
3691            _ => Err(Error::PlanError(format!("unknown function: {}", name))),
3692        },
3693        (Value::Int64(left), Value::Float64(right)) => match name {
3694            "__add" => Ok(Value::Float64(*left as f64 + right)),
3695            "__sub" => Ok(Value::Float64(*left as f64 - right)),
3696            "__mul" => Ok(Value::Float64(*left as f64 * right)),
3697            "__div" => Ok(Value::Float64(*left as f64 / right)),
3698            _ => Err(Error::PlanError(format!("unknown function: {}", name))),
3699        },
3700        (Value::Float64(left), Value::Int64(right)) => match name {
3701            "__add" => Ok(Value::Float64(left + *right as f64)),
3702            "__sub" => Ok(Value::Float64(left - *right as f64)),
3703            "__mul" => Ok(Value::Float64(left * *right as f64)),
3704            "__div" => Ok(Value::Float64(left / *right as f64)),
3705            _ => Err(Error::PlanError(format!("unknown function: {}", name))),
3706        },
3707        _ => Err(Error::PlanError(format!(
3708            "function {} expects numeric arguments",
3709            name
3710        ))),
3711    }
3712}
3713
3714fn eval_function_in_row_context(
3715    row: &VersionedRow,
3716    name: &str,
3717    args: &[Expr],
3718    params: &HashMap<String, Value>,
3719) -> Result<Value> {
3720    let values = args
3721        .iter()
3722        .map(|arg| eval_expr_value(row, arg, params))
3723        .collect::<Result<Vec<_>>>()?;
3724    eval_function(name, &values)
3725}
3726
3727fn eval_function(name: &str, args: &[Value]) -> Result<Value> {
3728    match name.to_ascii_lowercase().as_str() {
3729        "__add" | "__sub" | "__mul" | "__div" => eval_arithmetic(name, args),
3730        "coalesce" => Ok(args
3731            .iter()
3732            .find(|value| **value != Value::Null)
3733            .cloned()
3734            .unwrap_or(Value::Null)),
3735        "now" => Ok(Value::Timestamp(
3736            SystemTime::now()
3737                .duration_since(UNIX_EPOCH)
3738                .map_err(|err| Error::PlanError(err.to_string()))?
3739                .as_secs() as i64,
3740        )),
3741        _ => Err(Error::PlanError(format!("unknown function: {}", name))),
3742    }
3743}
3744
3745fn like_matches(value: &str, pattern: &str) -> bool {
3746    let value_chars = value.chars().collect::<Vec<_>>();
3747    let pattern_chars = pattern.chars().collect::<Vec<_>>();
3748    let (mut vi, mut pi) = (0usize, 0usize);
3749    let (mut star_idx, mut match_idx) = (None, 0usize);
3750
3751    while vi < value_chars.len() {
3752        if pi < pattern_chars.len()
3753            && (pattern_chars[pi] == '_' || pattern_chars[pi] == value_chars[vi])
3754        {
3755            vi += 1;
3756            pi += 1;
3757        } else if pi < pattern_chars.len() && pattern_chars[pi] == '%' {
3758            star_idx = Some(pi);
3759            match_idx = vi;
3760            pi += 1;
3761        } else if let Some(star) = star_idx {
3762            pi = star + 1;
3763            match_idx += 1;
3764            vi = match_idx;
3765        } else {
3766            return false;
3767        }
3768    }
3769
3770    while pi < pattern_chars.len() && pattern_chars[pi] == '%' {
3771        pi += 1;
3772    }
3773
3774    pi == pattern_chars.len()
3775}
3776
3777fn resolve_in_subqueries(
3778    db: &Database,
3779    expr: &Expr,
3780    params: &HashMap<String, Value>,
3781    tx: Option<TxId>,
3782) -> Result<Expr> {
3783    resolve_in_subqueries_with_ctes(db, expr, params, tx, &[])
3784}
3785
3786pub(crate) fn resolve_in_subqueries_with_ctes(
3787    db: &Database,
3788    expr: &Expr,
3789    params: &HashMap<String, Value>,
3790    tx: Option<TxId>,
3791    ctes: &[Cte],
3792) -> Result<Expr> {
3793    match expr {
3794        Expr::InSubquery {
3795            expr,
3796            subquery,
3797            negated,
3798        } => {
3799            // Detect correlated subqueries: WHERE references to outer tables
3800            let mut subquery_tables: std::collections::HashSet<String> = subquery
3801                .from
3802                .iter()
3803                .filter_map(|item| match item {
3804                    contextdb_parser::ast::FromItem::Table { name, .. } => Some(name.clone()),
3805                    _ => None,
3806                })
3807                .collect();
3808            // CTE names are valid table references within the subquery
3809            for cte in ctes {
3810                match cte {
3811                    Cte::SqlCte { name, .. } | Cte::MatchCte { name, .. } => {
3812                        subquery_tables.insert(name.clone());
3813                    }
3814                }
3815            }
3816            if let Some(where_clause) = &subquery.where_clause
3817                && has_outer_table_ref(where_clause, &subquery_tables)
3818            {
3819                return Err(Error::Other(
3820                    "correlated subqueries are not supported".to_string(),
3821                ));
3822            }
3823
3824            let query_plan = plan(&Statement::Select(SelectStatement {
3825                ctes: ctes.to_vec(),
3826                body: (**subquery).clone(),
3827            }))?;
3828            let result = execute_plan(db, &query_plan, params, tx)?;
3829            let select_expr = subquery
3830                .columns
3831                .first()
3832                .map(|column| column.expr.clone())
3833                .ok_or_else(|| Error::PlanError("subquery must select one column".to_string()))?;
3834            let list = result
3835                .rows
3836                .iter()
3837                .map(|row| eval_project_expr(&select_expr, row, &result.columns, params))
3838                .collect::<Result<Vec<_>>>()?
3839                .into_iter()
3840                .map(value_to_literal)
3841                .collect::<Result<Vec<_>>>()?;
3842            Ok(Expr::InList {
3843                expr: Box::new(resolve_in_subqueries_with_ctes(db, expr, params, tx, ctes)?),
3844                list,
3845                negated: *negated,
3846            })
3847        }
3848        Expr::BinaryOp { left, op, right } => Ok(Expr::BinaryOp {
3849            left: Box::new(resolve_in_subqueries_with_ctes(db, left, params, tx, ctes)?),
3850            op: *op,
3851            right: Box::new(resolve_in_subqueries_with_ctes(
3852                db, right, params, tx, ctes,
3853            )?),
3854        }),
3855        Expr::UnaryOp { op, operand } => Ok(Expr::UnaryOp {
3856            op: *op,
3857            operand: Box::new(resolve_in_subqueries_with_ctes(
3858                db, operand, params, tx, ctes,
3859            )?),
3860        }),
3861        Expr::InList {
3862            expr,
3863            list,
3864            negated,
3865        } => Ok(Expr::InList {
3866            expr: Box::new(resolve_in_subqueries_with_ctes(db, expr, params, tx, ctes)?),
3867            list: list
3868                .iter()
3869                .map(|item| resolve_in_subqueries_with_ctes(db, item, params, tx, ctes))
3870                .collect::<Result<Vec<_>>>()?,
3871            negated: *negated,
3872        }),
3873        Expr::Like {
3874            expr,
3875            pattern,
3876            negated,
3877        } => Ok(Expr::Like {
3878            expr: Box::new(resolve_in_subqueries_with_ctes(db, expr, params, tx, ctes)?),
3879            pattern: Box::new(resolve_in_subqueries_with_ctes(
3880                db, pattern, params, tx, ctes,
3881            )?),
3882            negated: *negated,
3883        }),
3884        Expr::IsNull { expr, negated } => Ok(Expr::IsNull {
3885            expr: Box::new(resolve_in_subqueries_with_ctes(db, expr, params, tx, ctes)?),
3886            negated: *negated,
3887        }),
3888        Expr::FunctionCall { name, args } => Ok(Expr::FunctionCall {
3889            name: name.clone(),
3890            args: args
3891                .iter()
3892                .map(|arg| resolve_in_subqueries_with_ctes(db, arg, params, tx, ctes))
3893                .collect::<Result<Vec<_>>>()?,
3894        }),
3895        _ => Ok(expr.clone()),
3896    }
3897}
3898
3899fn has_outer_table_ref(expr: &Expr, subquery_tables: &std::collections::HashSet<String>) -> bool {
3900    match expr {
3901        Expr::Column(ColumnRef {
3902            table: Some(table), ..
3903        }) => !subquery_tables.contains(table),
3904        Expr::BinaryOp { left, right, .. } => {
3905            has_outer_table_ref(left, subquery_tables)
3906                || has_outer_table_ref(right, subquery_tables)
3907        }
3908        Expr::UnaryOp { operand, .. } => has_outer_table_ref(operand, subquery_tables),
3909        Expr::InList { expr, list, .. } => {
3910            has_outer_table_ref(expr, subquery_tables)
3911                || list
3912                    .iter()
3913                    .any(|item| has_outer_table_ref(item, subquery_tables))
3914        }
3915        Expr::IsNull { expr, .. } => has_outer_table_ref(expr, subquery_tables),
3916        Expr::Like { expr, pattern, .. } => {
3917            has_outer_table_ref(expr, subquery_tables)
3918                || has_outer_table_ref(pattern, subquery_tables)
3919        }
3920        Expr::FunctionCall { args, .. } => args
3921            .iter()
3922            .any(|arg| has_outer_table_ref(arg, subquery_tables)),
3923        _ => false,
3924    }
3925}
3926
3927fn value_to_literal(value: Value) -> Result<Expr> {
3928    Ok(Expr::Literal(match value {
3929        Value::Null => Literal::Null,
3930        Value::Bool(v) => Literal::Bool(v),
3931        Value::Int64(v) => Literal::Integer(v),
3932        Value::Float64(v) => Literal::Real(v),
3933        Value::Text(v) => Literal::Text(v),
3934        Value::Uuid(v) => Literal::Text(v.to_string()),
3935        Value::Timestamp(v) => Literal::Integer(v),
3936        other => {
3937            return Err(Error::PlanError(format!(
3938                "unsupported subquery result value: {:?}",
3939                other
3940            )));
3941        }
3942    }))
3943}
3944
3945fn query_result_row_matches(
3946    row: &[Value],
3947    columns: &[String],
3948    expr: &Expr,
3949    params: &HashMap<String, Value>,
3950) -> Result<bool> {
3951    Ok(eval_query_result_bool_expr(row, columns, expr, params)?.unwrap_or(false))
3952}
3953
3954fn eval_query_result_bool_expr(
3955    row: &[Value],
3956    columns: &[String],
3957    expr: &Expr,
3958    params: &HashMap<String, Value>,
3959) -> Result<Option<bool>> {
3960    match expr {
3961        Expr::BinaryOp { left, op, right } => match op {
3962            BinOp::Eq | BinOp::Neq | BinOp::Lt | BinOp::Lte | BinOp::Gt | BinOp::Gte => {
3963                let left = eval_query_result_expr(left, row, columns, params)?;
3964                let right = eval_query_result_expr(right, row, columns, params)?;
3965                if left == Value::Null || right == Value::Null {
3966                    return Ok(None);
3967                }
3968
3969                let result = match op {
3970                    BinOp::Eq => {
3971                        compare_values(&left, &right) == Some(Ordering::Equal) || left == right
3972                    }
3973                    BinOp::Neq => {
3974                        !(compare_values(&left, &right) == Some(Ordering::Equal) || left == right)
3975                    }
3976                    BinOp::Lt => compare_values(&left, &right) == Some(Ordering::Less),
3977                    BinOp::Lte => matches!(
3978                        compare_values(&left, &right),
3979                        Some(Ordering::Less | Ordering::Equal)
3980                    ),
3981                    BinOp::Gt => compare_values(&left, &right) == Some(Ordering::Greater),
3982                    BinOp::Gte => matches!(
3983                        compare_values(&left, &right),
3984                        Some(Ordering::Greater | Ordering::Equal)
3985                    ),
3986                    BinOp::And | BinOp::Or => unreachable!(),
3987                };
3988                Ok(Some(result))
3989            }
3990            BinOp::And => {
3991                let left = eval_query_result_bool_expr(row, columns, left, params)?;
3992                if left == Some(false) {
3993                    return Ok(Some(false));
3994                }
3995                let right = eval_query_result_bool_expr(row, columns, right, params)?;
3996                Ok(match (left, right) {
3997                    (Some(true), Some(true)) => Some(true),
3998                    (Some(true), other) => other,
3999                    (None, Some(false)) => Some(false),
4000                    (None, Some(true)) | (None, None) => None,
4001                    (Some(false), _) => Some(false),
4002                })
4003            }
4004            BinOp::Or => {
4005                let left = eval_query_result_bool_expr(row, columns, left, params)?;
4006                if left == Some(true) {
4007                    return Ok(Some(true));
4008                }
4009                let right = eval_query_result_bool_expr(row, columns, right, params)?;
4010                Ok(match (left, right) {
4011                    (Some(false), Some(false)) => Some(false),
4012                    (Some(false), other) => other,
4013                    (None, Some(true)) => Some(true),
4014                    (None, Some(false)) | (None, None) => None,
4015                    (Some(true), _) => Some(true),
4016                })
4017            }
4018        },
4019        Expr::UnaryOp {
4020            op: UnaryOp::Not,
4021            operand,
4022        } => Ok(eval_query_result_bool_expr(row, columns, operand, params)?.map(|value| !value)),
4023        Expr::InList {
4024            expr,
4025            list,
4026            negated,
4027        } => {
4028            let needle = eval_query_result_expr(expr, row, columns, params)?;
4029            if needle == Value::Null {
4030                return Ok(None);
4031            }
4032
4033            let matched = list.iter().try_fold(false, |found, item| {
4034                if found {
4035                    Ok(true)
4036                } else {
4037                    let candidate = eval_query_result_expr(item, row, columns, params)?;
4038                    Ok(
4039                        matches!(compare_values(&needle, &candidate), Some(Ordering::Equal))
4040                            || (candidate != Value::Null && needle == candidate),
4041                    )
4042                }
4043            })?;
4044            Ok(Some(if *negated { !matched } else { matched }))
4045        }
4046        Expr::InSubquery { .. } => Err(Error::PlanError(
4047            "IN (subquery) must be resolved before execution".to_string(),
4048        )),
4049        Expr::Like {
4050            expr,
4051            pattern,
4052            negated,
4053        } => {
4054            let left = eval_query_result_expr(expr, row, columns, params)?;
4055            let right = eval_query_result_expr(pattern, row, columns, params)?;
4056            let matched = match (left, right) {
4057                (Value::Text(value), Value::Text(pattern)) => like_matches(&value, &pattern),
4058                _ => false,
4059            };
4060            Ok(Some(if *negated { !matched } else { matched }))
4061        }
4062        Expr::IsNull { expr, negated } => {
4063            let is_null = eval_query_result_expr(expr, row, columns, params)? == Value::Null;
4064            Ok(Some(if *negated { !is_null } else { is_null }))
4065        }
4066        Expr::FunctionCall { .. } => match eval_query_result_expr(expr, row, columns, params)? {
4067            Value::Bool(value) => Ok(Some(value)),
4068            Value::Null => Ok(None),
4069            _ => Err(Error::PlanError(format!(
4070                "unsupported WHERE expression: {:?}",
4071                expr
4072            ))),
4073        },
4074        _ => Err(Error::PlanError(format!(
4075            "unsupported WHERE expression: {:?}",
4076            expr
4077        ))),
4078    }
4079}
4080
4081fn lookup_query_result_column(
4082    row: &[Value],
4083    input_columns: &[String],
4084    column_ref: &ColumnRef,
4085) -> Result<Value> {
4086    if let Some(table) = &column_ref.table {
4087        let qualified = format!("{table}.{}", column_ref.column);
4088        // Prioritize qualified match (e.g., "e.id") over unqualified (e.g., "id")
4089        // to avoid picking the wrong table's column in JOINs.
4090        let idx = input_columns
4091            .iter()
4092            .position(|name| name == &qualified)
4093            .or_else(|| {
4094                input_columns
4095                    .iter()
4096                    .position(|name| name == &column_ref.column)
4097            })
4098            .ok_or_else(|| Error::PlanError(format!("project column not found: {}", qualified)))?;
4099        return Ok(row.get(idx).cloned().unwrap_or(Value::Null));
4100    }
4101
4102    let matches = input_columns
4103        .iter()
4104        .enumerate()
4105        .filter_map(|(idx, name)| {
4106            (name == &column_ref.column
4107                || name.rsplit('.').next() == Some(column_ref.column.as_str()))
4108            .then_some(idx)
4109        })
4110        .collect::<Vec<_>>();
4111
4112    match matches.as_slice() {
4113        [] => Err(Error::PlanError(format!(
4114            "project column not found: {}",
4115            column_ref.column
4116        ))),
4117        [idx] => Ok(row.get(*idx).cloned().unwrap_or(Value::Null)),
4118        _ => Err(Error::PlanError(format!(
4119            "ambiguous column reference: {}",
4120            column_ref.column
4121        ))),
4122    }
4123}
4124
4125fn concatenate_rows(left: &[Value], right: &[Value]) -> Vec<Value> {
4126    let mut combined = Vec::with_capacity(left.len() + right.len());
4127    combined.extend_from_slice(left);
4128    combined.extend_from_slice(right);
4129    combined
4130}
4131
4132fn duplicate_column_names(left: &[String], right: &[String]) -> BTreeSet<String> {
4133    let left_names = left
4134        .iter()
4135        .map(|column| column.rsplit('.').next().unwrap_or(column.as_str()))
4136        .collect::<BTreeSet<_>>();
4137    right
4138        .iter()
4139        .filter_map(|column| {
4140            let bare = column.rsplit('.').next().unwrap_or(column.as_str());
4141            left_names.contains(bare).then(|| bare.to_string())
4142        })
4143        .collect()
4144}
4145
4146fn qualify_join_columns(
4147    columns: &[String],
4148    left_columns: &[String],
4149    right_columns: &[String],
4150    left_alias: &Option<String>,
4151    right_prefix: &str,
4152) -> Vec<String> {
4153    let left_prefix = left_alias.as_deref();
4154    columns
4155        .iter()
4156        .enumerate()
4157        .map(|(idx, column)| {
4158            if idx < left_columns.len() {
4159                if let Some(prefix) = left_prefix {
4160                    format!(
4161                        "{prefix}.{}",
4162                        left_columns[idx].rsplit('.').next().unwrap_or(column)
4163                    )
4164                } else {
4165                    left_columns[idx].clone()
4166                }
4167            } else {
4168                let right_idx = idx - left_columns.len();
4169                let bare = right_columns[right_idx]
4170                    .rsplit('.')
4171                    .next()
4172                    .unwrap_or(right_columns[right_idx].as_str());
4173                if column == bare {
4174                    format!("{right_prefix}.{bare}")
4175                } else {
4176                    column.clone()
4177                }
4178            }
4179        })
4180        .collect()
4181}
4182
4183fn right_table_name(plan: &PhysicalPlan) -> String {
4184    match plan {
4185        PhysicalPlan::Scan { table, alias, .. } => alias.clone().unwrap_or_else(|| table.clone()),
4186        _ => "right".to_string(),
4187    }
4188}
4189
4190fn distinct_row_key(row: &[Value]) -> Vec<u8> {
4191    bincode::serde::encode_to_vec(row, bincode::config::standard())
4192        .expect("query rows should serialize for DISTINCT")
4193}
4194
4195fn resolve_uuid(expr: &Expr, params: &HashMap<String, Value>) -> Result<uuid::Uuid> {
4196    match resolve_expr(expr, params)? {
4197        Value::Uuid(u) => Ok(u),
4198        Value::Text(t) => uuid::Uuid::parse_str(&t)
4199            .map_err(|e| Error::PlanError(format!("invalid uuid '{}': {}", t, e))),
4200        _ => Err(Error::PlanError(
4201            "graph start node must be UUID".to_string(),
4202        )),
4203    }
4204}
4205
4206/// Resolve start nodes for a graph BFS from a WHERE filter like `a.name = 'entity-0'`.
4207/// Scans all relational tables for rows matching the filter condition.
4208fn resolve_graph_start_nodes_from_filter(
4209    db: &Database,
4210    filter: &Expr,
4211    params: &HashMap<String, Value>,
4212) -> Result<Vec<uuid::Uuid>> {
4213    if let Some(ids) = resolve_graph_start_ids_from_filter(filter, params)? {
4214        return Ok(ids);
4215    }
4216
4217    // Extract column name and expected value from the filter (e.g., a.name = 'entity-0')
4218    let (col_name, expected_value) = match filter {
4219        Expr::BinaryOp {
4220            left,
4221            op: BinOp::Eq,
4222            right,
4223        } => {
4224            if let Some(col) = extract_column_name(left) {
4225                (col, resolve_expr(right, params)?)
4226            } else if let Some(col) = extract_column_name(right) {
4227                (col, resolve_expr(left, params)?)
4228            } else {
4229                return Ok(vec![]);
4230            }
4231        }
4232        _ => return Ok(vec![]),
4233    };
4234
4235    let snapshot = db.snapshot();
4236    let mut uuids = Vec::new();
4237    for table_name in db.table_names() {
4238        let meta = match db.table_meta(&table_name) {
4239            Some(m) => m,
4240            None => continue,
4241        };
4242        // Only scan tables that have the referenced column and an id column
4243        let has_col = meta.columns.iter().any(|c| c.name == col_name);
4244        let has_id = meta.columns.iter().any(|c| c.name == "id");
4245        if !has_col || !has_id {
4246            continue;
4247        }
4248        let rows = db.scan_filter(&table_name, snapshot, &|row| {
4249            row.values.get(&col_name) == Some(&expected_value)
4250        })?;
4251        for row in rows {
4252            if let Some(Value::Uuid(id)) = row.values.get("id") {
4253                uuids.push(*id);
4254            }
4255        }
4256    }
4257    Ok(uuids)
4258}
4259
4260fn resolve_graph_start_nodes_from_plan(
4261    db: &Database,
4262    plan: &PhysicalPlan,
4263    params: &HashMap<String, Value>,
4264    tx: Option<TxId>,
4265) -> Result<Vec<uuid::Uuid>> {
4266    let result = execute_plan(db, plan, params, tx)?;
4267    result
4268        .rows
4269        .into_iter()
4270        .filter_map(|row| row.into_iter().next())
4271        .map(|value| match value {
4272            Value::Uuid(id) => Ok(id),
4273            Value::Text(text) => uuid::Uuid::parse_str(&text)
4274                .map_err(|_| Error::PlanError(format!("invalid UUID in graph start plan: {text}"))),
4275            other => Err(Error::PlanError(format!(
4276                "invalid graph start identifier from plan: {other:?}"
4277            ))),
4278        })
4279        .collect()
4280}
4281
4282fn resolve_graph_start_ids_from_filter(
4283    filter: &Expr,
4284    params: &HashMap<String, Value>,
4285) -> Result<Option<Vec<uuid::Uuid>>> {
4286    match filter {
4287        Expr::BinaryOp {
4288            left,
4289            op: BinOp::Eq,
4290            right,
4291        } if is_graph_id_ref(left) || is_graph_id_ref(right) => {
4292            let value = if is_graph_id_ref(left) {
4293                resolve_expr(right, params)?
4294            } else {
4295                resolve_expr(left, params)?
4296            };
4297            let id = match value {
4298                Value::Uuid(id) => id,
4299                Value::Text(text) => uuid::Uuid::parse_str(&text).map_err(|_| {
4300                    Error::PlanError(format!("invalid UUID in graph filter: {text}"))
4301                })?,
4302                other => {
4303                    return Err(Error::PlanError(format!(
4304                        "invalid graph start identifier in filter: {other:?}"
4305                    )));
4306                }
4307            };
4308            Ok(Some(vec![id]))
4309        }
4310        Expr::InList { expr, list, .. } if is_graph_id_ref(expr) => {
4311            let ids = list
4312                .iter()
4313                .map(|item| resolve_expr(item, params))
4314                .map(|value| match value? {
4315                    Value::Uuid(id) => Ok(id),
4316                    Value::Text(text) => uuid::Uuid::parse_str(&text).map_err(|_| {
4317                        Error::PlanError(format!("invalid UUID in graph filter: {text}"))
4318                    }),
4319                    other => Err(Error::PlanError(format!(
4320                        "invalid graph start identifier in filter: {other:?}"
4321                    ))),
4322                })
4323                .collect::<Result<Vec<_>>>()?;
4324            Ok(Some(ids))
4325        }
4326        Expr::BinaryOp { left, right, .. } => {
4327            if let Some(ids) = resolve_graph_start_ids_from_filter(left, params)? {
4328                return Ok(Some(ids));
4329            }
4330            resolve_graph_start_ids_from_filter(right, params)
4331        }
4332        Expr::UnaryOp { operand, .. } => resolve_graph_start_ids_from_filter(operand, params),
4333        _ => Ok(None),
4334    }
4335}
4336
4337fn is_graph_id_ref(expr: &Expr) -> bool {
4338    matches!(
4339        expr,
4340        Expr::Column(contextdb_parser::ast::ColumnRef { column, .. }) if column == "id"
4341    )
4342}
4343
4344/// Extract a bare column name from an Expr::Column, ignoring table alias.
4345fn extract_column_name(expr: &Expr) -> Option<String> {
4346    match expr {
4347        Expr::Column(contextdb_parser::ast::ColumnRef { column, .. }) => Some(column.clone()),
4348        _ => None,
4349    }
4350}
4351
4352fn resolve_vector_from_expr(expr: &Expr, params: &HashMap<String, Value>) -> Result<Vec<f32>> {
4353    match resolve_expr(expr, params)? {
4354        Value::Vector(v) => Ok(v),
4355        Value::Text(text) if text.trim_start().starts_with('[') => parse_text_vector_literal(&text),
4356        Value::Text(name) => match params.get(&name) {
4357            Some(Value::Vector(v)) => Ok(v.clone()),
4358            _ => Err(Error::PlanError("vector parameter missing".to_string())),
4359        },
4360        _ => Err(Error::PlanError(
4361            "invalid vector query expression".to_string(),
4362        )),
4363    }
4364}
4365
4366fn validate_vector_columns(
4367    db: &Database,
4368    table: &str,
4369    values: &HashMap<String, Value>,
4370) -> Result<()> {
4371    let Some(meta) = db.table_meta(table) else {
4372        return Ok(());
4373    };
4374
4375    for column in &meta.columns {
4376        if let contextdb_core::ColumnType::Vector(expected) = column.column_type
4377            && let Some(Value::Vector(vector)) = values.get(&column.name)
4378        {
4379            let got = vector.len();
4380            if got != expected {
4381                return Err(vector_dimension_error(table, &column.name, expected, got));
4382            }
4383        }
4384    }
4385
4386    Ok(())
4387}
4388
4389fn vector_columns_for_meta(meta: &TableMeta) -> Vec<String> {
4390    meta.columns
4391        .iter()
4392        .filter(|column| matches!(column.column_type, contextdb_core::ColumnType::Vector(_)))
4393        .map(|column| column.name.clone())
4394        .collect()
4395}
4396
4397fn vector_values_for_table(
4398    db: &Database,
4399    table: &str,
4400    values: &HashMap<String, Value>,
4401) -> Vec<(String, Vec<f32>)> {
4402    db.table_meta(table)
4403        .map(|meta| {
4404            meta.columns
4405                .iter()
4406                .filter_map(|column| match column.column_type {
4407                    contextdb_core::ColumnType::Vector(_) => match values.get(&column.name) {
4408                        Some(Value::Vector(vector)) => Some((column.name.clone(), vector.clone())),
4409                        _ => None,
4410                    },
4411                    _ => None,
4412                })
4413                .collect()
4414        })
4415        .unwrap_or_default()
4416}
4417
4418fn vector_indexes_for_table(db: &Database, table: &str) -> Vec<contextdb_core::VectorIndexRef> {
4419    db.table_meta(table)
4420        .map(|meta| {
4421            meta.columns
4422                .iter()
4423                .filter(|column| {
4424                    matches!(column.column_type, contextdb_core::ColumnType::Vector(_))
4425                })
4426                .map(|column| contextdb_core::VectorIndexRef::new(table, column.name.clone()))
4427                .collect()
4428        })
4429        .unwrap_or_default()
4430}
4431
4432pub(crate) fn coerce_into_column(
4433    db: &Database,
4434    table: &str,
4435    col: &str,
4436    v: Value,
4437    current_tx_max: Option<TxId>,
4438    active_tx: Option<TxId>,
4439) -> Result<Value> {
4440    coerce_value_for_column(db, table, col, v, current_tx_max, active_tx)
4441}
4442
4443fn coerce_value_for_column(
4444    db: &Database,
4445    table: &str,
4446    col_name: &str,
4447    v: Value,
4448    current_tx_max: Option<TxId>,
4449    active_tx: Option<TxId>,
4450) -> Result<Value> {
4451    let Some(meta) = db.table_meta(table) else {
4452        // Non-TxId variant: pass through with lenient id-name coercion.
4453        if let Value::TxId(_) = &v {
4454            return Err(Error::ColumnTypeMismatch {
4455                table: table.to_string(),
4456                column: col_name.to_string(),
4457                expected: "UNKNOWN",
4458                actual: "TxId",
4459            });
4460        }
4461        return Ok(coerce_uuid_if_needed(col_name, v));
4462    };
4463    coerce_value_for_column_with_meta(table, &meta, col_name, v, current_tx_max, active_tx)
4464}
4465
4466fn coerce_value_for_column_with_meta(
4467    table: &str,
4468    meta: &TableMeta,
4469    col_name: &str,
4470    v: Value,
4471    current_tx_max: Option<TxId>,
4472    active_tx: Option<TxId>,
4473) -> Result<Value> {
4474    let Some(col) = meta.columns.iter().find(|c| c.name == col_name) else {
4475        if let Value::TxId(_) = &v {
4476            return Err(Error::ColumnTypeMismatch {
4477                table: table.to_string(),
4478                column: col_name.to_string(),
4479                expected: "UNKNOWN",
4480                actual: "TxId",
4481            });
4482        }
4483        return Ok(coerce_uuid_if_needed(col_name, v));
4484    };
4485
4486    match col.column_type {
4487        contextdb_core::ColumnType::Uuid => match v {
4488            Value::TxId(_) => Err(Error::ColumnTypeMismatch {
4489                table: table.to_string(),
4490                column: col_name.to_string(),
4491                expected: "UUID",
4492                actual: "TxId",
4493            }),
4494            other => coerce_uuid_value(other),
4495        },
4496        contextdb_core::ColumnType::Timestamp => match v {
4497            Value::TxId(_) => Err(Error::ColumnTypeMismatch {
4498                table: table.to_string(),
4499                column: col_name.to_string(),
4500                expected: "TIMESTAMP",
4501                actual: "TxId",
4502            }),
4503            other => coerce_timestamp_value(other),
4504        },
4505        contextdb_core::ColumnType::Vector(dim) => match v {
4506            Value::TxId(_) => Err(Error::ColumnTypeMismatch {
4507                table: table.to_string(),
4508                column: col_name.to_string(),
4509                expected: format_vector_type(dim),
4510                actual: "TxId",
4511            }),
4512            other => coerce_vector_value(table, col_name, other, dim),
4513        },
4514        contextdb_core::ColumnType::Integer => match v {
4515            Value::TxId(_) => Err(Error::ColumnTypeMismatch {
4516                table: table.to_string(),
4517                column: col_name.to_string(),
4518                expected: "INTEGER",
4519                actual: "TxId",
4520            }),
4521            other => Ok(coerce_uuid_if_needed(col_name, other)),
4522        },
4523        contextdb_core::ColumnType::Real => match v {
4524            Value::TxId(_) => Err(Error::ColumnTypeMismatch {
4525                table: table.to_string(),
4526                column: col_name.to_string(),
4527                expected: "REAL",
4528                actual: "TxId",
4529            }),
4530            other => Ok(coerce_uuid_if_needed(col_name, other)),
4531        },
4532        contextdb_core::ColumnType::Text => match v {
4533            Value::TxId(_) => Err(Error::ColumnTypeMismatch {
4534                table: table.to_string(),
4535                column: col_name.to_string(),
4536                expected: "TEXT",
4537                actual: "TxId",
4538            }),
4539            other => Ok(coerce_uuid_if_needed(col_name, other)),
4540        },
4541        contextdb_core::ColumnType::Boolean => match v {
4542            Value::TxId(_) => Err(Error::ColumnTypeMismatch {
4543                table: table.to_string(),
4544                column: col_name.to_string(),
4545                expected: "BOOLEAN",
4546                actual: "TxId",
4547            }),
4548            other => Ok(coerce_uuid_if_needed(col_name, other)),
4549        },
4550        contextdb_core::ColumnType::Json => match v {
4551            Value::TxId(_) => Err(Error::ColumnTypeMismatch {
4552                table: table.to_string(),
4553                column: col_name.to_string(),
4554                expected: "JSON",
4555                actual: "TxId",
4556            }),
4557            other => Ok(coerce_uuid_if_needed(col_name, other)),
4558        },
4559        contextdb_core::ColumnType::TxId => {
4560            coerce_txid_value(table, col_name, v, col.nullable, current_tx_max, active_tx)
4561        }
4562    }
4563}
4564
4565fn format_vector_type(dim: usize) -> &'static str {
4566    // We need &'static str for the error variant. Fall back to a lookup for common dims.
4567    match dim {
4568        1 => "VECTOR(1)",
4569        2 => "VECTOR(2)",
4570        3 => "VECTOR(3)",
4571        4 => "VECTOR(4)",
4572        8 => "VECTOR(8)",
4573        16 => "VECTOR(16)",
4574        32 => "VECTOR(32)",
4575        64 => "VECTOR(64)",
4576        128 => "VECTOR(128)",
4577        256 => "VECTOR(256)",
4578        512 => "VECTOR(512)",
4579        768 => "VECTOR(768)",
4580        1024 => "VECTOR(1024)",
4581        1536 => "VECTOR(1536)",
4582        3072 => "VECTOR(3072)",
4583        _ => "VECTOR",
4584    }
4585}
4586
4587fn coerce_txid_value(
4588    table: &str,
4589    col: &str,
4590    v: Value,
4591    nullable: bool,
4592    current_tx_max: Option<TxId>,
4593    active_tx: Option<TxId>,
4594) -> Result<Value> {
4595    match v {
4596        Value::Null => {
4597            if nullable {
4598                Ok(Value::Null)
4599            } else {
4600                Err(Error::ColumnNotNullable {
4601                    table: table.to_string(),
4602                    column: col.to_string(),
4603                })
4604            }
4605        }
4606        Value::TxId(tx) => {
4607            // Plan B7: `Value::TxId(n)` into a TXID column requires
4608            // `n <= max(committed_watermark, active_tx)`. The watermark is the
4609            // statement-scoped `current_tx_max` snapshot from
4610            // `TxManager::current_tx_max()`; `active_tx` is the in-flight
4611            // transaction that allocated the caller's TxId, which is permitted
4612            // as a self-reference. The error reports the watermark so callers
4613            // see what their edge has committed. Non-SQL callers pass `None`
4614            // for `current_tx_max` and skip the check.
4615            if let Some(max) = current_tx_max {
4616                let ceiling = max.0.max(active_tx.map(|t| t.0).unwrap_or(0));
4617                if tx.0 > ceiling {
4618                    return Err(Error::TxIdOutOfRange {
4619                        table: table.to_string(),
4620                        column: col.to_string(),
4621                        value: tx.0,
4622                        max: max.0,
4623                    });
4624                }
4625            }
4626            Ok(Value::TxId(tx))
4627        }
4628        other => Err(Error::ColumnTypeMismatch {
4629            table: table.to_string(),
4630            column: col.to_string(),
4631            expected: "TXID",
4632            actual: value_variant_name(&other),
4633        }),
4634    }
4635}
4636
4637fn value_variant_name(v: &Value) -> &'static str {
4638    match v {
4639        Value::Null => "Null",
4640        Value::Bool(_) => "Bool",
4641        Value::Int64(_) => "Int64",
4642        Value::Float64(_) => "Float64",
4643        Value::Text(_) => "Text",
4644        Value::Uuid(_) => "Uuid",
4645        Value::Timestamp(_) => "Timestamp",
4646        Value::Json(_) => "Json",
4647        Value::Vector(_) => "Vector",
4648        Value::TxId(_) => "TxId",
4649    }
4650}
4651
4652fn coerce_uuid_value(v: Value) -> Result<Value> {
4653    match v {
4654        Value::Null => Ok(Value::Null),
4655        Value::Uuid(id) => Ok(Value::Uuid(id)),
4656        Value::Text(text) => uuid::Uuid::parse_str(&text)
4657            .map(Value::Uuid)
4658            .map_err(|err| Error::Other(format!("invalid UUID literal '{text}': {err}"))),
4659        other => Err(Error::Other(format!(
4660            "UUID column requires UUID or text literal, got {other:?}"
4661        ))),
4662    }
4663}
4664
4665fn coerce_uuid_if_needed(col: &str, v: Value) -> Value {
4666    if (col == "id" || col.ends_with("_id"))
4667        && let Value::Text(s) = &v
4668        && let Ok(u) = uuid::Uuid::parse_str(s)
4669    {
4670        return Value::Uuid(u);
4671    }
4672    v
4673}
4674
4675fn coerce_timestamp_value(v: Value) -> Result<Value> {
4676    match v {
4677        Value::Null => Ok(Value::Null),
4678        Value::Text(text) if text.eq_ignore_ascii_case("infinity") => {
4679            Ok(Value::Timestamp(i64::MAX))
4680        }
4681        Value::Text(text) => {
4682            let parsed = OffsetDateTime::parse(&text, &Rfc3339).map_err(|err| {
4683                Error::Other(format!("invalid TIMESTAMP literal '{text}': {err}"))
4684            })?;
4685            Ok(Value::Timestamp(
4686                parsed.unix_timestamp_nanos() as i64 / 1_000_000,
4687            ))
4688        }
4689        other => Ok(other),
4690    }
4691}
4692
4693fn coerce_vector_value(table: &str, column: &str, v: Value, expected_dim: usize) -> Result<Value> {
4694    let vector = match v {
4695        Value::Null => return Ok(Value::Null),
4696        Value::Vector(vector) => vector,
4697        Value::Text(text) => parse_text_vector_literal(&text)?,
4698        other => return Ok(other),
4699    };
4700
4701    if vector.len() != expected_dim {
4702        return Err(vector_dimension_error(
4703            table,
4704            column,
4705            expected_dim,
4706            vector.len(),
4707        ));
4708    }
4709
4710    Ok(Value::Vector(vector))
4711}
4712
4713fn vector_dimension_error(table: &str, column: &str, expected: usize, got: usize) -> Error {
4714    Error::VectorIndexDimensionMismatch {
4715        index: contextdb_core::VectorIndexRef::new(table, column),
4716        expected,
4717        actual: got,
4718    }
4719}
4720
4721fn parse_text_vector_literal(text: &str) -> Result<Vec<f32>> {
4722    let trimmed = text.trim();
4723    let inner = trimmed
4724        .strip_prefix('[')
4725        .and_then(|s| s.strip_suffix(']'))
4726        .ok_or_else(|| Error::Other(format!("invalid VECTOR literal '{text}'")))?;
4727
4728    if inner.trim().is_empty() {
4729        return Ok(Vec::new());
4730    }
4731
4732    inner
4733        .split(',')
4734        .map(|part| {
4735            part.trim().parse::<f32>().map_err(|err| {
4736                Error::Other(format!("invalid VECTOR component '{}': {err}", part.trim()))
4737            })
4738        })
4739        .collect()
4740}
4741
4742fn apply_missing_column_defaults(
4743    db: &Database,
4744    table: &str,
4745    values: &mut HashMap<String, Value>,
4746    active_tx: Option<TxId>,
4747) -> Result<()> {
4748    let Some(meta) = db.table_meta(table) else {
4749        return Ok(());
4750    };
4751
4752    let current_tx_max = Some(db.committed_watermark());
4753
4754    for column in &meta.columns {
4755        if values.contains_key(&column.name) {
4756            continue;
4757        }
4758        let Some(default) = &column.default else {
4759            continue;
4760        };
4761        let value = evaluate_stored_default_expr(default)?;
4762        values.insert(
4763            column.name.clone(),
4764            coerce_value_for_column(db, table, &column.name, value, current_tx_max, active_tx)?,
4765        );
4766    }
4767
4768    Ok(())
4769}
4770
4771fn evaluate_stored_default_expr(default: &str) -> Result<Value> {
4772    if default.eq_ignore_ascii_case("NOW()") {
4773        return eval_function("now", &[]);
4774    }
4775    if default.contains("FunctionCall") && default.contains("name: \"NOW\"") {
4776        return eval_function("now", &[]);
4777    }
4778    if default == "Literal(Null)" || default.eq_ignore_ascii_case("NULL") {
4779        return Ok(Value::Null);
4780    }
4781    if default.eq_ignore_ascii_case("TRUE") {
4782        return Ok(Value::Bool(true));
4783    }
4784    if default.eq_ignore_ascii_case("FALSE") {
4785        return Ok(Value::Bool(false));
4786    }
4787    if default.starts_with('\'') && default.ends_with('\'') && default.len() >= 2 {
4788        return Ok(Value::Text(
4789            default[1..default.len() - 1].replace("''", "'"),
4790        ));
4791    }
4792    if let Some(text) = default
4793        .strip_prefix("Literal(Text(\"")
4794        .and_then(|value| value.strip_suffix("\"))"))
4795    {
4796        return Ok(Value::Text(text.to_string()));
4797    }
4798    if let Some(value) = default
4799        .strip_prefix("Literal(Integer(")
4800        .and_then(|value| value.strip_suffix("))"))
4801    {
4802        let parsed = value.parse::<i64>().map_err(|err| {
4803            Error::Other(format!("invalid stored integer default '{value}': {err}"))
4804        })?;
4805        return Ok(Value::Int64(parsed));
4806    }
4807    if let Some(value) = default
4808        .strip_prefix("Literal(Real(")
4809        .and_then(|value| value.strip_suffix("))"))
4810    {
4811        let parsed = value
4812            .parse::<f64>()
4813            .map_err(|err| Error::Other(format!("invalid stored real default '{value}': {err}")))?;
4814        return Ok(Value::Float64(parsed));
4815    }
4816    if let Some(value) = default
4817        .strip_prefix("Literal(Bool(")
4818        .and_then(|value| value.strip_suffix("))"))
4819    {
4820        let parsed = value
4821            .parse::<bool>()
4822            .map_err(|err| Error::Other(format!("invalid stored bool default '{value}': {err}")))?;
4823        return Ok(Value::Bool(parsed));
4824    }
4825
4826    Err(Error::Other(format!(
4827        "unsupported stored DEFAULT expression: {default}"
4828    )))
4829}
4830
4831pub(crate) fn stored_default_expr(expr: &Expr) -> String {
4832    match expr {
4833        Expr::Literal(Literal::Null) => "NULL".to_string(),
4834        Expr::Literal(Literal::Bool(value)) => {
4835            if *value {
4836                "TRUE".to_string()
4837            } else {
4838                "FALSE".to_string()
4839            }
4840        }
4841        Expr::Literal(Literal::Integer(value)) => value.to_string(),
4842        Expr::Literal(Literal::Real(value)) => value.to_string(),
4843        Expr::Literal(Literal::Text(value)) => format!("'{}'", value.replace('\'', "''")),
4844        Expr::FunctionCall { name, args }
4845            if name.eq_ignore_ascii_case("NOW") && args.is_empty() =>
4846        {
4847            "NOW()".to_string()
4848        }
4849        _ => format!("{expr:?}"),
4850    }
4851}
4852
4853fn validate_expires_column(col: &contextdb_parser::ast::ColumnDef) -> Result<()> {
4854    if col.expires && !matches!(col.data_type, DataType::Timestamp) {
4855        return Err(Error::Other(
4856            "EXPIRES is only valid on TIMESTAMP columns".to_string(),
4857        ));
4858    }
4859    Ok(())
4860}
4861
4862fn expires_column_name(columns: &[contextdb_parser::ast::ColumnDef]) -> Result<Option<String>> {
4863    let mut expires_column = None;
4864    for col in columns {
4865        validate_expires_column(col)?;
4866        if col.expires {
4867            if expires_column.is_some() {
4868                return Err(Error::Other(
4869                    "only one EXPIRES column is supported per table".to_string(),
4870                ));
4871            }
4872            expires_column = Some(col.name.clone());
4873        }
4874    }
4875    Ok(expires_column)
4876}
4877
4878pub(crate) fn map_column_type(dtype: &DataType) -> contextdb_core::ColumnType {
4879    match dtype {
4880        DataType::Uuid => contextdb_core::ColumnType::Uuid,
4881        DataType::Text => contextdb_core::ColumnType::Text,
4882        DataType::Integer => contextdb_core::ColumnType::Integer,
4883        DataType::Real => contextdb_core::ColumnType::Real,
4884        DataType::Boolean => contextdb_core::ColumnType::Boolean,
4885        DataType::Timestamp => contextdb_core::ColumnType::Timestamp,
4886        DataType::Json => contextdb_core::ColumnType::Json,
4887        DataType::Vector(dim) => contextdb_core::ColumnType::Vector(*dim as usize),
4888        DataType::TxId => contextdb_core::ColumnType::TxId,
4889    }
4890}
4891
4892pub(crate) fn map_rank_policy(
4893    policy: &contextdb_parser::ast::RankPolicyAst,
4894) -> contextdb_core::RankPolicy {
4895    contextdb_core::RankPolicy {
4896        joined_table: policy.joined_table.clone(),
4897        joined_column: policy.joined_column.clone(),
4898        anchor_column: String::new(),
4899        sort_key: policy.sort_key.clone(),
4900        formula: policy.formula.clone(),
4901        protected_index: String::new(),
4902    }
4903}
4904
4905struct ResolvedRankPolicy {
4906    policy: contextdb_core::RankPolicy,
4907    formula: Arc<RankFormula>,
4908}
4909
4910fn core_column_from_ast(
4911    col: &contextdb_parser::ast::ColumnDef,
4912    rank_policy: Option<contextdb_core::RankPolicy>,
4913) -> contextdb_core::ColumnDef {
4914    contextdb_core::ColumnDef {
4915        name: col.name.clone(),
4916        column_type: map_column_type(&col.data_type),
4917        nullable: col.nullable,
4918        primary_key: col.primary_key,
4919        unique: col.unique,
4920        default: col.default.as_ref().map(stored_default_expr),
4921        references: col
4922            .references
4923            .as_ref()
4924            .map(|reference| contextdb_core::ForeignKeyReference {
4925                table: reference.table.clone(),
4926                column: reference.column.clone(),
4927            }),
4928        expires: col.expires,
4929        immutable: col.immutable,
4930        quantization: map_vector_quantization(col.quantization),
4931        rank_policy,
4932    }
4933}
4934
4935fn ast_column_from_core(col: contextdb_core::ColumnDef) -> contextdb_parser::ast::ColumnDef {
4936    contextdb_parser::ast::ColumnDef {
4937        name: col.name,
4938        data_type: match col.column_type {
4939            ColumnType::Uuid => DataType::Uuid,
4940            ColumnType::Text => DataType::Text,
4941            ColumnType::Integer => DataType::Integer,
4942            ColumnType::Real => DataType::Real,
4943            ColumnType::Boolean => DataType::Boolean,
4944            ColumnType::Timestamp => DataType::Timestamp,
4945            ColumnType::Json => DataType::Json,
4946            ColumnType::Vector(dim) => DataType::Vector(dim as u32),
4947            ColumnType::TxId => DataType::TxId,
4948        },
4949        nullable: col.nullable,
4950        primary_key: col.primary_key,
4951        unique: col.unique,
4952        default: None,
4953        references: None,
4954        expires: col.expires,
4955        immutable: col.immutable,
4956        quantization: match col.quantization {
4957            contextdb_core::VectorQuantization::F32 => {
4958                contextdb_parser::ast::VectorQuantization::F32
4959            }
4960            contextdb_core::VectorQuantization::SQ8 => {
4961                contextdb_parser::ast::VectorQuantization::SQ8
4962            }
4963            contextdb_core::VectorQuantization::SQ4 => {
4964                contextdb_parser::ast::VectorQuantization::SQ4
4965            }
4966        },
4967        rank_policy: None,
4968    }
4969}
4970
4971fn validate_rank_policy_for_column(
4972    db: &Database,
4973    table: &str,
4974    column: &contextdb_parser::ast::ColumnDef,
4975    all_columns: &[contextdb_parser::ast::ColumnDef],
4976) -> Result<Option<ResolvedRankPolicy>> {
4977    let Some(policy_ast) = column.rank_policy.as_deref() else {
4978        return Ok(None);
4979    };
4980    let index = rank_index_name(table, &column.name);
4981    if !matches!(column.data_type, DataType::Vector(_)) {
4982        return Err(Error::RankPolicyColumnType {
4983            index,
4984            column: column.name.clone(),
4985            expected: "VECTOR(N)".to_string(),
4986            actual: data_type_name(&column.data_type).to_string(),
4987        });
4988    }
4989    let joined_meta = db.table_meta(&policy_ast.joined_table).ok_or_else(|| {
4990        Error::RankPolicyJoinTableUnknown {
4991            index: index.clone(),
4992            table: policy_ast.joined_table.clone(),
4993        }
4994    })?;
4995    if !joined_meta
4996        .columns
4997        .iter()
4998        .any(|col| col.name == policy_ast.joined_column)
4999    {
5000        return Err(Error::RankPolicyJoinColumnUnknown {
5001            index: index.clone(),
5002            table: policy_ast.joined_table.clone(),
5003            column: policy_ast.joined_column.clone(),
5004        });
5005    }
5006    let protected_index = protected_rank_policy_index(&joined_meta, &policy_ast.joined_column)
5007        .ok_or_else(|| Error::RankPolicyJoinColumnUnindexed {
5008            index: index.clone(),
5009            joined_table: policy_ast.joined_table.clone(),
5010            column: policy_ast.joined_column.clone(),
5011        })?;
5012    let anchor_column =
5013        resolve_rank_policy_anchor_column(&index, policy_ast, all_columns, &joined_meta)?;
5014    let formula = Arc::new(RankFormula::compile_for_index(&index, &policy_ast.formula)?);
5015    validate_rank_formula_columns(
5016        &index,
5017        &column.name,
5018        all_columns,
5019        &joined_meta,
5020        formula.column_refs(),
5021    )?;
5022    Ok(Some(ResolvedRankPolicy {
5023        policy: contextdb_core::RankPolicy {
5024            joined_table: policy_ast.joined_table.clone(),
5025            joined_column: policy_ast.joined_column.clone(),
5026            anchor_column,
5027            sort_key: policy_ast.sort_key.clone(),
5028            formula: policy_ast.formula.clone(),
5029            protected_index,
5030        },
5031        formula,
5032    }))
5033}
5034
5035fn protected_rank_policy_index(meta: &TableMeta, joined_column: &str) -> Option<String> {
5036    meta.indexes
5037        .iter()
5038        .filter(|index| index.kind == contextdb_core::IndexKind::UserDeclared)
5039        .chain(meta.indexes.iter())
5040        .find(|index| {
5041            index
5042                .columns
5043                .first()
5044                .is_some_and(|(column, _)| column == joined_column)
5045        })
5046        .map(|index| index.name.clone())
5047}
5048
5049fn resolve_rank_policy_anchor_column(
5050    index: &str,
5051    policy: &contextdb_parser::ast::RankPolicyAst,
5052    anchor_columns: &[contextdb_parser::ast::ColumnDef],
5053    joined_meta: &TableMeta,
5054) -> Result<String> {
5055    let joined_column = joined_meta
5056        .columns
5057        .iter()
5058        .find(|col| col.name == policy.joined_column)
5059        .ok_or_else(|| Error::RankPolicyJoinColumnUnknown {
5060            index: index.to_string(),
5061            table: policy.joined_table.clone(),
5062            column: policy.joined_column.clone(),
5063        })?;
5064
5065    let anchor_by_name = |name: &str| anchor_columns.iter().find(|col| col.name == name);
5066    let mut candidates = Vec::new();
5067    if joined_column.primary_key {
5068        let singular = singular_table_name(&policy.joined_table);
5069        for name in [
5070            format!("{singular}_id"),
5071            format!("{}_id", policy.joined_table),
5072        ] {
5073            if anchor_by_name(&name).is_some() && !candidates.contains(&name) {
5074                candidates.push(name);
5075            }
5076        }
5077    }
5078    if candidates.is_empty() && anchor_by_name(&policy.joined_column).is_some() {
5079        candidates.push(policy.joined_column.clone());
5080    }
5081    if candidates.is_empty()
5082        && let Some(primary_key) = anchor_columns.iter().find(|col| col.primary_key)
5083    {
5084        candidates.push(primary_key.name.clone());
5085    }
5086    if candidates.is_empty() && anchor_by_name("id").is_some() {
5087        candidates.push("id".to_string());
5088    }
5089
5090    let anchor_column = match candidates.as_slice() {
5091        [single] => single.clone(),
5092        [] => {
5093            return Err(Error::RankPolicyColumnUnknown {
5094                index: index.to_string(),
5095                column: policy.joined_column.clone(),
5096            });
5097        }
5098        _ => {
5099            return Err(Error::RankPolicyColumnAmbiguous {
5100                index: index.to_string(),
5101                column: candidates.join(","),
5102            });
5103        }
5104    };
5105    let anchor_def =
5106        anchor_by_name(&anchor_column).ok_or_else(|| Error::RankPolicyColumnUnknown {
5107            index: index.to_string(),
5108            column: anchor_column.clone(),
5109        })?;
5110    let anchor_type = map_column_type(&anchor_def.data_type);
5111    if anchor_type != joined_column.column_type {
5112        return Err(Error::RankPolicyColumnType {
5113            index: index.to_string(),
5114            column: anchor_column,
5115            expected: column_type_name(&joined_column.column_type).to_string(),
5116            actual: data_type_name(&anchor_def.data_type).to_string(),
5117        });
5118    }
5119    Ok(anchor_column)
5120}
5121
5122fn singular_table_name(table: &str) -> String {
5123    if let Some(stem) = table.strip_suffix("ies") {
5124        format!("{stem}y")
5125    } else if let Some(stem) = table.strip_suffix('s') {
5126        stem.to_string()
5127    } else {
5128        table.to_string()
5129    }
5130}
5131
5132fn validate_rank_formula_columns(
5133    index: &str,
5134    anchor_vector_column: &str,
5135    anchor_columns: &[contextdb_parser::ast::ColumnDef],
5136    joined_meta: &TableMeta,
5137    refs: &[String],
5138) -> Result<()> {
5139    let vector_score_column_exists = anchor_columns.iter().any(|col| col.name == "vector_score")
5140        || joined_meta
5141            .columns
5142            .iter()
5143            .any(|col| col.name == "vector_score");
5144    for column in refs {
5145        if column == "vector_score" {
5146            if vector_score_column_exists {
5147                return Err(Error::RankPolicyColumnAmbiguous {
5148                    index: index.to_string(),
5149                    column: column.clone(),
5150                });
5151            }
5152            continue;
5153        }
5154        let anchor = anchor_columns.iter().find(|col| col.name == *column);
5155        let joined = joined_meta.columns.iter().find(|col| col.name == *column);
5156        if anchor.is_none() && joined.is_none() {
5157            return Err(Error::RankPolicyColumnUnknown {
5158                index: index.to_string(),
5159                column: column.clone(),
5160            });
5161        }
5162        if column == "id" && anchor.is_some() && joined.is_some() {
5163            return Err(Error::RankPolicyColumnAmbiguous {
5164                index: index.to_string(),
5165                column: column.clone(),
5166            });
5167        }
5168        if let Some(anchor) = anchor {
5169            validate_rank_formula_type(
5170                index,
5171                column,
5172                data_type_name(&anchor.data_type),
5173                &map_column_type(&anchor.data_type),
5174            )?;
5175        } else if let Some(joined) = joined {
5176            validate_rank_formula_type(
5177                index,
5178                column,
5179                column_type_name(&joined.column_type),
5180                &joined.column_type,
5181            )?;
5182        }
5183    }
5184    if refs.iter().any(|column| column == anchor_vector_column) {
5185        return Err(Error::RankPolicyColumnType {
5186            index: index.to_string(),
5187            column: anchor_vector_column.to_string(),
5188            expected: "number-or-bool".to_string(),
5189            actual: "VECTOR".to_string(),
5190        });
5191    }
5192    Ok(())
5193}
5194
5195fn validate_rank_formula_type(
5196    index: &str,
5197    column: &str,
5198    actual_name: &str,
5199    column_type: &ColumnType,
5200) -> Result<()> {
5201    if matches!(
5202        column_type,
5203        ColumnType::Real | ColumnType::Integer | ColumnType::Boolean
5204    ) {
5205        return Ok(());
5206    }
5207    Err(Error::RankPolicyColumnType {
5208        index: index.to_string(),
5209        column: column.to_string(),
5210        expected: "number-or-bool".to_string(),
5211        actual: actual_name.to_string(),
5212    })
5213}
5214
5215fn data_type_name(data_type: &DataType) -> &'static str {
5216    match data_type {
5217        DataType::Uuid => "UUID",
5218        DataType::Text => "TEXT",
5219        DataType::Integer => "INTEGER",
5220        DataType::Real => "REAL",
5221        DataType::Boolean => "BOOLEAN",
5222        DataType::Timestamp => "TIMESTAMP",
5223        DataType::Json => "JSON",
5224        DataType::Vector(_) => "VECTOR",
5225        DataType::TxId => "TXID",
5226    }
5227}
5228
5229fn column_type_name(column_type: &ColumnType) -> &'static str {
5230    match column_type {
5231        ColumnType::Uuid => "UUID",
5232        ColumnType::Text => "TEXT",
5233        ColumnType::Integer => "INTEGER",
5234        ColumnType::Real => "REAL",
5235        ColumnType::Boolean => "BOOLEAN",
5236        ColumnType::Timestamp => "TIMESTAMP",
5237        ColumnType::Json => "JSON",
5238        ColumnType::Vector(_) => "VECTOR",
5239        ColumnType::TxId => "TXID",
5240    }
5241}
5242
5243pub(crate) fn rank_policy_drop_table_blocker(db: &Database, table: &str) -> Option<Error> {
5244    for (policy_table, policy_column, policy) in all_rank_policies(db) {
5245        if policy.joined_table == table && policy_table != table {
5246            return Some(Error::DropBlockedByRankPolicy {
5247                table: table.into(),
5248                column: None,
5249                dropped_index: None,
5250                policy_table: policy_table.into_boxed_str(),
5251                policy_column: policy_column.into_boxed_str(),
5252                sort_key: policy.sort_key.into_boxed_str(),
5253            });
5254        }
5255    }
5256    None
5257}
5258
5259pub(crate) fn rank_policy_drop_index_blocker(
5260    db: &Database,
5261    table: &str,
5262    index: &str,
5263) -> Option<Error> {
5264    for (policy_table, policy_column, policy) in all_rank_policies(db) {
5265        if policy.joined_table == table && policy.protected_index == index {
5266            return Some(Error::DropBlockedByRankPolicy {
5267                table: table.into(),
5268                column: None,
5269                dropped_index: Some(index.into()),
5270                policy_table: policy_table.into_boxed_str(),
5271                policy_column: policy_column.into_boxed_str(),
5272                sort_key: policy.sort_key.into_boxed_str(),
5273            });
5274        }
5275    }
5276    None
5277}
5278
5279fn rank_policy_drop_column_blocker(db: &Database, table: &str, column: &str) -> Option<Error> {
5280    let metas = db
5281        .table_names()
5282        .into_iter()
5283        .filter_map(|name| db.table_meta(&name).map(|meta| (name, meta)))
5284        .collect::<HashMap<_, _>>();
5285    for (policy_table, meta) in &metas {
5286        for policy_col in &meta.columns {
5287            let Some(policy) = &policy_col.rank_policy else {
5288                continue;
5289            };
5290            if policy_table == table && policy_col.name == column {
5291                return Some(drop_column_rank_error(
5292                    table,
5293                    column,
5294                    policy_table,
5295                    &policy_col.name,
5296                    policy,
5297                ));
5298            }
5299            if policy.joined_table == table && policy.joined_column == column {
5300                return Some(drop_column_rank_error(
5301                    table,
5302                    column,
5303                    policy_table,
5304                    &policy_col.name,
5305                    policy,
5306                ));
5307            }
5308            if policy_table == table && policy.anchor_column == column {
5309                return Some(drop_column_rank_error(
5310                    table,
5311                    column,
5312                    policy_table,
5313                    &policy_col.name,
5314                    policy,
5315                ));
5316            }
5317            let Ok(formula) = RankFormula::compile_for_index(
5318                &rank_index_name(policy_table, &policy_col.name),
5319                &policy.formula,
5320            ) else {
5321                continue;
5322            };
5323            let joined_meta = metas.get(&policy.joined_table);
5324            for reference in formula.column_refs() {
5325                if reference == "vector_score" {
5326                    continue;
5327                }
5328                let anchor_has = meta.columns.iter().any(|col| col.name == *reference);
5329                let joined_has = joined_meta
5330                    .is_some_and(|joined| joined.columns.iter().any(|col| col.name == *reference));
5331                if anchor_has && policy_table == table && reference == column {
5332                    return Some(drop_column_rank_error(
5333                        table,
5334                        column,
5335                        policy_table,
5336                        &policy_col.name,
5337                        policy,
5338                    ));
5339                }
5340                if !anchor_has && joined_has && policy.joined_table == table && reference == column
5341                {
5342                    return Some(drop_column_rank_error(
5343                        table,
5344                        column,
5345                        policy_table,
5346                        &policy_col.name,
5347                        policy,
5348                    ));
5349                }
5350            }
5351        }
5352    }
5353    None
5354}
5355
5356fn drop_column_rank_error(
5357    table: &str,
5358    column: &str,
5359    policy_table: &str,
5360    policy_column: &str,
5361    policy: &contextdb_core::RankPolicy,
5362) -> Error {
5363    Error::DropBlockedByRankPolicy {
5364        table: table.into(),
5365        column: Some(column.into()),
5366        dropped_index: None,
5367        policy_table: policy_table.into(),
5368        policy_column: policy_column.into(),
5369        sort_key: policy.sort_key.clone().into_boxed_str(),
5370    }
5371}
5372
5373fn all_rank_policies(db: &Database) -> Vec<(String, String, contextdb_core::RankPolicy)> {
5374    db.table_names()
5375        .into_iter()
5376        .filter_map(|table| db.table_meta(&table).map(|meta| (table, meta)))
5377        .flat_map(|(table, meta)| {
5378            meta.columns.into_iter().filter_map(move |column| {
5379                column
5380                    .rank_policy
5381                    .map(|policy| (table.clone(), column.name, policy))
5382            })
5383        })
5384        .collect()
5385}
5386
5387fn map_vector_quantization(
5388    quantization: contextdb_parser::ast::VectorQuantization,
5389) -> contextdb_core::VectorQuantization {
5390    match quantization {
5391        contextdb_parser::ast::VectorQuantization::F32 => contextdb_core::VectorQuantization::F32,
5392        contextdb_parser::ast::VectorQuantization::SQ8 => contextdb_core::VectorQuantization::SQ8,
5393        contextdb_parser::ast::VectorQuantization::SQ4 => contextdb_core::VectorQuantization::SQ4,
5394    }
5395}
5396
5397fn parse_conflict_policy(s: &str) -> Result<ConflictPolicy> {
5398    match s {
5399        "latest_wins" => Ok(ConflictPolicy::LatestWins),
5400        "server_wins" => Ok(ConflictPolicy::ServerWins),
5401        "edge_wins" => Ok(ConflictPolicy::EdgeWins),
5402        "insert_if_not_exists" => Ok(ConflictPolicy::InsertIfNotExists),
5403        _ => Err(Error::Other(format!("unknown conflict policy: {s}"))),
5404    }
5405}
5406
5407fn conflict_policy_to_string(p: ConflictPolicy) -> String {
5408    match p {
5409        ConflictPolicy::LatestWins => "latest_wins".to_string(),
5410        ConflictPolicy::ServerWins => "server_wins".to_string(),
5411        ConflictPolicy::EdgeWins => "edge_wins".to_string(),
5412        ConflictPolicy::InsertIfNotExists => "insert_if_not_exists".to_string(),
5413    }
5414}
5415
5416fn project_graph_frontier_rows(
5417    frontier: Vec<(HashMap<String, uuid::Uuid>, uuid::Uuid, u32)>,
5418    start_alias: &str,
5419    steps: &[GraphStepPlan],
5420) -> Result<Vec<Vec<Value>>> {
5421    frontier
5422        .into_iter()
5423        .map(|(bindings, id, depth)| {
5424            let mut row = Vec::with_capacity(steps.len() + 3);
5425            let start_id = bindings.get(start_alias).ok_or_else(|| {
5426                Error::PlanError(format!(
5427                    "graph frontier missing required start alias binding '{start_alias}'"
5428                ))
5429            })?;
5430            row.push(Value::Uuid(*start_id));
5431            for step in steps {
5432                let target_id = bindings.get(&step.target_alias).ok_or_else(|| {
5433                    Error::PlanError(format!(
5434                        "graph frontier missing required target alias binding '{}'",
5435                        step.target_alias
5436                    ))
5437                })?;
5438                row.push(Value::Uuid(*target_id));
5439            }
5440            row.push(Value::Uuid(id));
5441            row.push(Value::Int64(depth as i64));
5442            Ok(row)
5443        })
5444        .collect()
5445}
5446
5447fn release_accounted_bytes(db: &Database, bytes: &[usize]) {
5448    for bytes in bytes {
5449        db.accountant().release(*bytes);
5450    }
5451}
5452
5453#[cfg(test)]
5454mod tests {
5455    use super::*;
5456    use contextdb_planner::GraphStepPlan;
5457    use uuid::Uuid;
5458
5459    #[test]
5460    fn graph_01_frontier_projection_requires_complete_bindings() {
5461        let steps = vec![GraphStepPlan {
5462            edge_types: vec!["EDGE".to_string()],
5463            direction: Direction::Outgoing,
5464            min_depth: 1,
5465            max_depth: 1,
5466            target_alias: "b".to_string(),
5467        }];
5468
5469        let missing_start = vec![(HashMap::new(), Uuid::new_v4(), 0)];
5470        let missing_target = vec![(
5471            HashMap::from([("a".to_string(), Uuid::new_v4())]),
5472            Uuid::new_v4(),
5473            0,
5474        )];
5475
5476        let start_result = project_graph_frontier_rows(missing_start, "a", &steps);
5477        assert!(
5478            matches!(start_result, Err(Error::PlanError(_))),
5479            "graph frontier projection should return a plan error on missing start alias binding, got {start_result:?}"
5480        );
5481
5482        let target_result = project_graph_frontier_rows(missing_target, "a", &steps);
5483        assert!(
5484            matches!(target_result, Err(Error::PlanError(_))),
5485            "graph frontier projection should return a plan error on missing target alias binding, got {target_result:?}"
5486        );
5487    }
5488}