Skip to main content

contextdb_engine/
executor.rs

1use crate::database::{Database, InsertRowResult, QueryResult};
2use crate::sync_types::ConflictPolicy;
3use contextdb_core::*;
4use contextdb_parser::ast::{
5    AlterAction, BinOp, ColumnRef, Cte, DataType, Expr, Literal, SelectStatement,
6    SetDiskLimitValue, SetMemoryLimitValue, SortDirection, Statement, UnaryOp,
7};
8use contextdb_planner::{
9    DeletePlan, GraphStepPlan, InsertPlan, OnConflictPlan, PhysicalPlan, UpdatePlan, plan,
10};
11use roaring::RoaringTreemap;
12use std::cmp::Ordering;
13use std::collections::{BTreeSet, HashMap, HashSet};
14use std::time::{SystemTime, UNIX_EPOCH};
15use time::OffsetDateTime;
16use time::format_description::well_known::Rfc3339;
17
18pub(crate) fn execute_plan(
19    db: &Database,
20    plan: &PhysicalPlan,
21    params: &HashMap<String, Value>,
22    tx: Option<TxId>,
23) -> Result<QueryResult> {
24    match plan {
25        PhysicalPlan::CreateTable(p) => {
26            db.check_disk_budget("CREATE TABLE")?;
27            let expires_column = expires_column_name(&p.columns)?;
28            // Auto-generate implicit indexes for PK / UNIQUE columns and
29            // composite UNIQUE constraints, so constraint probes run at
30            // O(log n) instead of O(n) per insert.
31            let mut auto_indexes: Vec<contextdb_core::IndexDecl> = Vec::new();
32            for c in &p.columns {
33                if c.primary_key
34                    && !matches!(
35                        map_column_type(&c.data_type),
36                        ColumnType::Json | ColumnType::Vector(_)
37                    )
38                {
39                    auto_indexes.push(contextdb_core::IndexDecl {
40                        name: format!("__pk_{}", c.name),
41                        columns: vec![(c.name.clone(), contextdb_core::SortDirection::Asc)],
42                        kind: contextdb_core::IndexKind::Auto,
43                    });
44                }
45                if c.unique
46                    && !c.primary_key
47                    && !matches!(
48                        map_column_type(&c.data_type),
49                        ColumnType::Json | ColumnType::Vector(_)
50                    )
51                {
52                    auto_indexes.push(contextdb_core::IndexDecl {
53                        name: format!("__unique_{}", c.name),
54                        columns: vec![(c.name.clone(), contextdb_core::SortDirection::Asc)],
55                        kind: contextdb_core::IndexKind::Auto,
56                    });
57                }
58            }
59            for uc in &p.unique_constraints {
60                // Only index composite UNIQUE constraints whose columns are
61                // all B-tree indexable.
62                let all_indexable = uc.iter().all(|col_name| {
63                    p.columns
64                        .iter()
65                        .find(|c| c.name == *col_name)
66                        .map(|c| {
67                            !matches!(
68                                map_column_type(&c.data_type),
69                                ColumnType::Json | ColumnType::Vector(_)
70                            )
71                        })
72                        .unwrap_or(false)
73                });
74                if !all_indexable || uc.is_empty() {
75                    continue;
76                }
77                let name = format!("__unique_{}", uc.join("_"));
78                let cols: Vec<(String, contextdb_core::SortDirection)> = uc
79                    .iter()
80                    .map(|c| (c.clone(), contextdb_core::SortDirection::Asc))
81                    .collect();
82                auto_indexes.push(contextdb_core::IndexDecl {
83                    name,
84                    columns: cols,
85                    kind: contextdb_core::IndexKind::Auto,
86                });
87            }
88            let meta = TableMeta {
89                columns: p
90                    .columns
91                    .iter()
92                    .map(|c| contextdb_core::ColumnDef {
93                        name: c.name.clone(),
94                        column_type: map_column_type(&c.data_type),
95                        nullable: c.nullable,
96                        primary_key: c.primary_key,
97                        unique: c.unique,
98                        default: c.default.as_ref().map(stored_default_expr),
99                        references: c.references.as_ref().map(|reference| {
100                            contextdb_core::ForeignKeyReference {
101                                table: reference.table.clone(),
102                                column: reference.column.clone(),
103                            }
104                        }),
105                        expires: c.expires,
106                        immutable: c.immutable,
107                    })
108                    .collect(),
109                immutable: p.immutable,
110                state_machine: p.state_machine.as_ref().map(|sm| StateMachineConstraint {
111                    column: sm.column.clone(),
112                    transitions: sm
113                        .transitions
114                        .iter()
115                        .map(|(from, tos)| (from.clone(), tos.clone()))
116                        .collect(),
117                }),
118                dag_edge_types: p.dag_edge_types.clone(),
119                unique_constraints: p.unique_constraints.clone(),
120                natural_key_column: None,
121                propagation_rules: p.propagation_rules.clone(),
122                default_ttl_seconds: p.retain.as_ref().map(|r| r.duration_seconds),
123                sync_safe: p.retain.as_ref().is_some_and(|r| r.sync_safe),
124                expires_column,
125                // Auto-indexes land in `TableMeta.indexes` at CREATE TABLE
126                // time with `kind == IndexKind::Auto`. The planner sees them
127                // (so PK / UNIQUE point probes pick IndexScan); the user-
128                // visible `.schema` render suppresses them by default but
129                // keeps them in `EXPLAIN <query>` so agents can assert
130                // routing programmatically.
131                indexes: auto_indexes.clone(),
132            };
133            let metadata_bytes = meta.estimated_bytes();
134            db.accountant().try_allocate_for(
135                metadata_bytes,
136                "ddl",
137                "create_table",
138                "Reduce schema size or raise MEMORY_LIMIT before creating more tables.",
139            )?;
140            db.relational_store().create_table(&p.name, meta);
141            for idx in &auto_indexes {
142                db.relational_store()
143                    .create_index_storage(&p.name, &idx.name, idx.columns.clone());
144            }
145            if let Some(table_meta) = db.table_meta(&p.name) {
146                db.persist_table_meta(&p.name, &table_meta)?;
147                db.allocate_ddl_lsn(|lsn| db.log_create_table_ddl(&p.name, &table_meta, lsn))?;
148            }
149            db.clear_statement_cache();
150            Ok(QueryResult::empty_with_affected(0))
151        }
152        PhysicalPlan::DropTable(name) => {
153            let bytes_to_release = estimate_drop_table_bytes(db, name);
154            db.drop_table_aux_state(name);
155            db.relational_store().drop_table(name);
156            db.remove_persisted_table(name)?;
157            db.allocate_ddl_lsn(|lsn| db.log_drop_table_ddl(name, lsn))?;
158            db.accountant().release(bytes_to_release);
159            db.clear_statement_cache();
160            Ok(QueryResult::empty_with_affected(0))
161        }
162        PhysicalPlan::AlterTable(p) => {
163            db.check_disk_budget("ALTER TABLE")?;
164            let store = db.relational_store();
165            match &p.action {
166                AlterAction::AddColumn(col) => {
167                    if col.primary_key {
168                        return Err(Error::Other(
169                            "adding a primary key column via ALTER TABLE is not supported"
170                                .to_string(),
171                        ));
172                    }
173                    validate_expires_column(col)?;
174                    // If the new column is flagged IMMUTABLE, refuse to add it if any
175                    // existing propagation rule would write into a column of that name.
176                    // This closes the DROP-then-ADD-as-flagged loophole (Gotcha 13).
177                    if col.immutable
178                        && let Some(existing_meta) = db.table_meta(&p.table)
179                    {
180                        let targets_col =
181                            existing_meta
182                                .propagation_rules
183                                .iter()
184                                .any(|rule| {
185                                    match rule {
186                                contextdb_core::table_meta::PropagationRule::ForeignKey {
187                                    target_state,
188                                    ..
189                                } => *target_state == col.name,
190                                contextdb_core::table_meta::PropagationRule::Edge {
191                                    target_state,
192                                    ..
193                                } => *target_state == col.name,
194                                contextdb_core::table_meta::PropagationRule::VectorExclusion {
195                                    ..
196                                } => false,
197                            }
198                                });
199                        if targets_col {
200                            return Err(Error::ImmutableColumn {
201                                table: p.table.clone(),
202                                column: col.name.clone(),
203                            });
204                        }
205                    }
206                    let core_col = contextdb_core::ColumnDef {
207                        name: col.name.clone(),
208                        column_type: map_column_type(&col.data_type),
209                        nullable: col.nullable,
210                        primary_key: col.primary_key,
211                        unique: col.unique,
212                        default: col.default.as_ref().map(stored_default_expr),
213                        references: col.references.as_ref().map(|reference| {
214                            contextdb_core::ForeignKeyReference {
215                                table: reference.table.clone(),
216                                column: reference.column.clone(),
217                            }
218                        }),
219                        expires: col.expires,
220                        immutable: col.immutable,
221                    };
222                    store
223                        .alter_table_add_column(&p.table, core_col)
224                        .map_err(Error::Other)?;
225                    if col.expires {
226                        let mut meta = store.table_meta.write();
227                        let table_meta = meta.get_mut(&p.table).ok_or_else(|| {
228                            Error::Other(format!("table '{}' not found", p.table))
229                        })?;
230                        table_meta.expires_column = Some(col.name.clone());
231                    }
232                }
233                AlterAction::DropColumn {
234                    column: name,
235                    cascade,
236                } => {
237                    if let Some(existing_meta) = db.table_meta(&p.table)
238                        && let Some(col) = existing_meta.columns.iter().find(|c| c.name == *name)
239                        && col.immutable
240                    {
241                        return Err(Error::ImmutableColumn {
242                            table: p.table.clone(),
243                            column: name.clone(),
244                        });
245                    }
246                    // PK check precedes index-dependency reporting: a column
247                    // flagged PRIMARY KEY cannot be dropped (the auto-index
248                    // would also show as a dependency, but the actionable
249                    // error is that the PK cannot be removed).
250                    if let Some(existing_meta) = db.table_meta(&p.table)
251                        && let Some(col) = existing_meta.columns.iter().find(|c| c.name == *name)
252                        && col.primary_key
253                    {
254                        return Err(Error::Other(format!(
255                            "cannot drop primary key column {}.{}",
256                            p.table, name
257                        )));
258                    }
259                    // RESTRICT / CASCADE on indexed columns. Only user-declared
260                    // indexes gate the RESTRICT path — auto-indexes dissolve
261                    // naturally when their defining column leaves.
262                    let dependent_user_indexes: Vec<String> = db
263                        .table_meta(&p.table)
264                        .map(|m| {
265                            m.indexes
266                                .iter()
267                                .filter(|i| {
268                                    i.kind == contextdb_core::IndexKind::UserDeclared
269                                        && i.columns.iter().any(|(c, _)| c == name)
270                                })
271                                .map(|i| i.name.clone())
272                                .collect()
273                        })
274                        .unwrap_or_default();
275                    let dependent_indexes: Vec<String> = db
276                        .table_meta(&p.table)
277                        .map(|m| {
278                            m.indexes
279                                .iter()
280                                .filter(|i| i.columns.iter().any(|(c, _)| c == name))
281                                .map(|i| i.name.clone())
282                                .collect()
283                        })
284                        .unwrap_or_default();
285                    if !*cascade && !dependent_user_indexes.is_empty() {
286                        return Err(Error::ColumnInIndex {
287                            table: p.table.clone(),
288                            column: name.clone(),
289                            index: dependent_user_indexes[0].clone(),
290                        });
291                    }
292                    store
293                        .alter_table_drop_column(&p.table, name)
294                        .map_err(Error::Other)?;
295                    if *cascade {
296                        // Remove IndexDecls referencing `name`, release storage.
297                        {
298                            let mut metas = store.table_meta.write();
299                            if let Some(m) = metas.get_mut(&p.table) {
300                                m.indexes
301                                    .retain(|i| !i.columns.iter().any(|(c, _)| c == name));
302                            }
303                        }
304                        for idx in &dependent_indexes {
305                            store.drop_index_storage(&p.table, idx);
306                            db.allocate_ddl_lsn(|lsn| db.log_drop_index_ddl(&p.table, idx, lsn))?;
307                        }
308                    }
309                    let mut meta = store.table_meta.write();
310                    if let Some(table_meta) = meta.get_mut(&p.table)
311                        && table_meta.expires_column.as_deref() == Some(name.as_str())
312                    {
313                        table_meta.expires_column = None;
314                    }
315                    drop(meta);
316                    if let Some(table_meta) = db.table_meta(&p.table) {
317                        db.persist_table_meta(&p.table, &table_meta)?;
318                        db.persist_table_rows(&p.table)?;
319                        db.allocate_ddl_lsn(|lsn| {
320                            db.log_alter_table_ddl(&p.table, &table_meta, lsn)
321                        })?;
322                    }
323                    db.clear_statement_cache();
324                    return Ok(QueryResult {
325                        columns: vec![],
326                        rows: vec![],
327                        rows_affected: 0,
328                        trace: crate::database::QueryTrace::scan(),
329                        cascade: if *cascade {
330                            Some(crate::database::CascadeReport {
331                                dropped_indexes: dependent_indexes,
332                            })
333                        } else {
334                            None
335                        },
336                    });
337                }
338                AlterAction::RenameColumn { from, to } => {
339                    if let Some(existing_meta) = db.table_meta(&p.table)
340                        && let Some(col) = existing_meta.columns.iter().find(|c| c.name == *from)
341                        && col.immutable
342                    {
343                        return Err(Error::ImmutableColumn {
344                            table: p.table.clone(),
345                            column: from.clone(),
346                        });
347                    }
348                    store
349                        .alter_table_rename_column(&p.table, from, to)
350                        .map_err(Error::Other)?;
351                    let mut meta = store.table_meta.write();
352                    if let Some(table_meta) = meta.get_mut(&p.table)
353                        && table_meta.expires_column.as_deref() == Some(from.as_str())
354                    {
355                        table_meta.expires_column = Some(to.clone());
356                    }
357                }
358                AlterAction::SetRetain {
359                    duration_seconds,
360                    sync_safe,
361                } => {
362                    let mut meta = store.table_meta.write();
363                    let table_meta = meta
364                        .get_mut(&p.table)
365                        .ok_or_else(|| Error::Other(format!("table '{}' not found", p.table)))?;
366                    if table_meta.immutable {
367                        return Err(Error::Other(
368                            "IMMUTABLE and RETAIN are mutually exclusive".to_string(),
369                        ));
370                    }
371                    table_meta.default_ttl_seconds = Some(*duration_seconds);
372                    table_meta.sync_safe = *sync_safe;
373                }
374                AlterAction::DropRetain => {
375                    let mut meta = store.table_meta.write();
376                    let table_meta = meta
377                        .get_mut(&p.table)
378                        .ok_or_else(|| Error::Other(format!("table '{}' not found", p.table)))?;
379                    table_meta.default_ttl_seconds = None;
380                    table_meta.sync_safe = false;
381                }
382                AlterAction::SetSyncConflictPolicy(policy) => {
383                    let cp = parse_conflict_policy(policy)?;
384                    db.set_table_conflict_policy(&p.table, cp);
385                }
386                AlterAction::DropSyncConflictPolicy => {
387                    db.drop_table_conflict_policy(&p.table);
388                }
389            }
390            if let Some(table_meta) = db.table_meta(&p.table) {
391                db.persist_table_meta(&p.table, &table_meta)?;
392                if !matches!(
393                    p.action,
394                    AlterAction::AddColumn(_)
395                        | AlterAction::SetRetain { .. }
396                        | AlterAction::DropRetain
397                        | AlterAction::SetSyncConflictPolicy(_)
398                        | AlterAction::DropSyncConflictPolicy
399                ) {
400                    db.persist_table_rows(&p.table)?;
401                }
402                db.allocate_ddl_lsn(|lsn| db.log_alter_table_ddl(&p.table, &table_meta, lsn))?;
403            }
404            db.clear_statement_cache();
405            Ok(QueryResult::empty_with_affected(0))
406        }
407        PhysicalPlan::Insert(p) => exec_insert(db, p, params, tx),
408        PhysicalPlan::Delete(p) => exec_delete(db, p, params, tx),
409        PhysicalPlan::Update(p) => exec_update(db, p, params, tx),
410        PhysicalPlan::Scan { table, filter, .. } => {
411            if table == "dual" {
412                return Ok(QueryResult {
413                    columns: vec![],
414                    rows: vec![vec![]],
415                    rows_affected: 0,
416                    trace: crate::database::QueryTrace::scan(),
417                    cascade: None,
418                });
419            }
420            let snapshot = db.snapshot_for_read();
421            let schema_columns = db.table_meta(table).map(|meta| {
422                meta.columns
423                    .into_iter()
424                    .map(|column| column.name)
425                    .collect::<Vec<_>>()
426            });
427            let resolved_filter = filter
428                .as_ref()
429                .map(|expr| resolve_in_subqueries(db, expr, params, tx))
430                .transpose()?;
431
432            // Try to route through an IndexScan if the filter is index-eligible.
433            // Analyze the PRE-resolve filter so `a IN (SELECT …)` disqualifies
434            // the outer IndexScan even after the subquery has been executed.
435            let meta_for_indexes = db.table_meta(table);
436            let indexes: Vec<contextdb_core::IndexDecl> = meta_for_indexes
437                .as_ref()
438                .map(|m| m.indexes.clone())
439                .unwrap_or_default();
440            let analysis = filter
441                .as_ref()
442                .filter(|_| !indexes.is_empty())
443                .map(|f| analyze_filter_for_index(f, &indexes, params));
444
445            if let Some(a) = analysis {
446                if let Some(pick) = a.pick {
447                    // IndexScan path. Fetch by BTree range; apply residual filter.
448                    let (rows, examined) = execute_index_scan(db, table, &pick, snapshot, tx)?;
449                    db.__bump_rows_examined(examined);
450                    let mut result = materialize_rows(
451                        rows,
452                        resolved_filter.as_ref(),
453                        params,
454                        schema_columns.as_deref(),
455                    )?;
456                    let mut pushed: smallvec::SmallVec<[std::borrow::Cow<'static, str>; 4]> =
457                        smallvec::SmallVec::new();
458                    pushed.push(std::borrow::Cow::Owned(pick.pushed_column.clone()));
459                    let considered: smallvec::SmallVec<[crate::database::IndexCandidate; 4]> = a
460                        .considered
461                        .iter()
462                        .filter(|c| c.name != pick.name)
463                        .cloned()
464                        .collect();
465                    result.trace = crate::database::QueryTrace {
466                        physical_plan: "IndexScan",
467                        index_used: Some(pick.name.clone()),
468                        predicates_pushed: pushed,
469                        indexes_considered: considered,
470                        sort_elided: false,
471                    };
472                    return Ok(result);
473                } else {
474                    // Scan with rejection trace.
475                    let rows = db.scan(table, snapshot)?;
476                    db.__bump_rows_examined(rows.len() as u64);
477                    let mut result = materialize_rows(
478                        rows,
479                        resolved_filter.as_ref(),
480                        params,
481                        schema_columns.as_deref(),
482                    )?;
483                    let considered: smallvec::SmallVec<[crate::database::IndexCandidate; 4]> =
484                        a.considered.into_iter().collect();
485                    result.trace = crate::database::QueryTrace {
486                        physical_plan: "Scan",
487                        index_used: None,
488                        predicates_pushed: Default::default(),
489                        indexes_considered: considered,
490                        sort_elided: false,
491                    };
492                    return Ok(result);
493                }
494            }
495
496            let rows = db.scan(table, snapshot)?;
497            db.__bump_rows_examined(rows.len() as u64);
498            let mut result = materialize_rows(
499                rows,
500                resolved_filter.as_ref(),
501                params,
502                schema_columns.as_deref(),
503            )?;
504            result.trace = crate::database::QueryTrace::scan();
505            Ok(result)
506        }
507        PhysicalPlan::GraphBfs {
508            start_alias,
509            start_expr,
510            start_candidates,
511            steps,
512            filter,
513        } => {
514            let start_uuids = match resolve_uuid(start_expr, params) {
515                Ok(start) => vec![start],
516                Err(Error::PlanError(_))
517                    if matches!(
518                        start_expr,
519                        Expr::Column(contextdb_parser::ast::ColumnRef { table: None, .. })
520                    ) =>
521                {
522                    // Start node not directly specified — check if a subquery or filter can help
523                    if let Some(candidate_plan) = start_candidates {
524                        resolve_graph_start_nodes_from_plan(db, candidate_plan, params, tx)?
525                    } else if let Some(filter_expr) = filter {
526                        let resolved_filter = resolve_in_subqueries(db, filter_expr, params, tx)?;
527                        resolve_graph_start_nodes_from_filter(db, &resolved_filter, params)?
528                    } else {
529                        vec![]
530                    }
531                }
532                Err(err) => return Err(err),
533            };
534            if start_uuids.is_empty() {
535                return Ok(QueryResult {
536                    columns: vec!["id".to_string(), "depth".to_string()],
537                    rows: vec![],
538                    rows_affected: 0,
539                    trace: crate::database::QueryTrace::scan(),
540                    cascade: None,
541                });
542            }
543            let snapshot = db.snapshot();
544            let mut frontier = start_uuids
545                .into_iter()
546                .map(|id| (HashMap::from([(start_alias.clone(), id)]), id, 0_u32))
547                .collect::<Vec<_>>();
548            let bfs_bytes = estimate_bfs_working_bytes(&frontier, steps);
549            db.accountant().try_allocate_for(
550                bfs_bytes,
551                "bfs_frontier",
552                "graph_bfs",
553                "Reduce traversal depth/fan-out or raise MEMORY_LIMIT before running BFS.",
554            )?;
555
556            let result = (|| {
557                for step in steps {
558                    let edge_types_ref = if step.edge_types.is_empty() {
559                        None
560                    } else {
561                        Some(step.edge_types.as_slice())
562                    };
563                    let mut next = Vec::new();
564
565                    for (bindings, start, base_depth) in &frontier {
566                        let res = db.graph().bfs(
567                            *start,
568                            edge_types_ref,
569                            step.direction,
570                            step.min_depth,
571                            step.max_depth,
572                            snapshot,
573                        )?;
574                        for node in res.nodes {
575                            let total_depth = base_depth.saturating_add(node.depth);
576                            let mut next_bindings = bindings.clone();
577                            next_bindings.insert(step.target_alias.clone(), node.id);
578                            next.push((next_bindings, node.id, total_depth));
579                        }
580                    }
581
582                    frontier = dedupe_graph_frontier(next, steps);
583                    if frontier.is_empty() {
584                        break;
585                    }
586                }
587
588                let mut columns =
589                    steps
590                        .iter()
591                        .fold(vec![format!("{start_alias}.id")], |mut cols, step| {
592                            cols.push(format!("{}.id", step.target_alias));
593                            cols
594                        });
595                columns.push("id".to_string());
596                columns.push("depth".to_string());
597
598                Ok(QueryResult {
599                    columns,
600                    rows: project_graph_frontier_rows(frontier, start_alias, steps)?,
601                    rows_affected: 0,
602                    trace: crate::database::QueryTrace::scan(),
603                    cascade: None,
604                })
605            })();
606            db.accountant().release(bfs_bytes);
607
608            result
609        }
610        PhysicalPlan::VectorSearch {
611            table,
612            query_expr,
613            k,
614            candidates,
615            ..
616        }
617        | PhysicalPlan::HnswSearch {
618            table,
619            query_expr,
620            k,
621            candidates,
622            ..
623        } => {
624            let query_vec = resolve_vector_from_expr(query_expr, params)?;
625            let snapshot = db.snapshot();
626            let all_rows = db.scan(table, snapshot)?;
627            let candidate_bitmap = if let Some(cands_plan) = candidates {
628                let qr = execute_plan(db, cands_plan, params, tx)?;
629                let mut bm = RoaringTreemap::new();
630                let row_id_idx = qr.columns.iter().position(|column| {
631                    column == "row_id" || column.rsplit('.').next() == Some("row_id")
632                });
633                let id_idx = qr
634                    .columns
635                    .iter()
636                    .position(|column| column == "id" || column.rsplit('.').next() == Some("id"));
637
638                if let Some(idx) = row_id_idx {
639                    for row in qr.rows {
640                        if let Some(Value::Int64(id)) = row.get(idx) {
641                            bm.insert(*id as u64);
642                        }
643                    }
644                } else if let Some(idx) = id_idx {
645                    let uuid_to_row_id: HashMap<uuid::Uuid, RowId> = all_rows
646                        .iter()
647                        .filter_map(|row| match row.values.get("id") {
648                            Some(Value::Uuid(uuid)) => Some((*uuid, row.row_id)),
649                            _ => None,
650                        })
651                        .collect();
652                    for row in qr.rows {
653                        if let Some(Value::Uuid(uuid)) = row.get(idx)
654                            && let Some(row_id) = uuid_to_row_id.get(uuid)
655                        {
656                            bm.insert(row_id.0);
657                        }
658                    }
659                }
660                Some(bm)
661            } else {
662                None
663            };
664
665            let vector_bytes = estimate_vector_search_bytes(query_vec.len(), *k as usize);
666            db.accountant().try_allocate_for(
667                vector_bytes,
668                "vector_search",
669                "search",
670                "Reduce LIMIT/dimensionality or raise MEMORY_LIMIT before vector search.",
671            )?;
672            let res = db.query_vector(
673                &query_vec,
674                *k as usize,
675                candidate_bitmap.as_ref(),
676                db.snapshot(),
677            );
678            db.accountant().release(vector_bytes);
679            let res = res?;
680
681            // Re-materialize: look up actual rows by row_id so SELECT * returns user columns
682            let schema_columns = db.table_meta(table).map(|meta| {
683                meta.columns
684                    .into_iter()
685                    .map(|column| column.name)
686                    .collect::<Vec<_>>()
687            });
688            let keys = if let Some(ref sc) = schema_columns {
689                sc.clone()
690            } else {
691                let mut ks = BTreeSet::new();
692                for r in &all_rows {
693                    for k in r.values.keys() {
694                        ks.insert(k.clone());
695                    }
696                }
697                ks.into_iter().collect::<Vec<_>>()
698            };
699
700            let row_map: HashMap<RowId, &VersionedRow> =
701                all_rows.iter().map(|r| (r.row_id, r)).collect();
702
703            let mut columns = vec!["row_id".to_string()];
704            columns.extend(keys.iter().cloned());
705            columns.push("score".to_string());
706
707            let rows = res
708                .into_iter()
709                .filter_map(|(rid, score)| {
710                    row_map.get(&rid).map(|row| {
711                        let mut out = vec![Value::Int64(rid.0 as i64)];
712                        for k in &keys {
713                            out.push(row.values.get(k).cloned().unwrap_or(Value::Null));
714                        }
715                        out.push(Value::Float64(score as f64));
716                        out
717                    })
718                })
719                .collect();
720
721            Ok(QueryResult {
722                columns,
723                rows,
724                rows_affected: 0,
725                trace: crate::database::QueryTrace::scan(),
726                cascade: None,
727            })
728        }
729        PhysicalPlan::MaterializeCte { input, .. } => execute_plan(db, input, params, tx),
730        PhysicalPlan::Project { input, columns } => {
731            let input_result = execute_plan(db, input, params, tx)?;
732            let has_aggregate = columns.iter().any(|column| {
733                matches!(
734                    &column.expr,
735                    Expr::FunctionCall { name, .. } if name.eq_ignore_ascii_case("count")
736                )
737            });
738            if has_aggregate {
739                if columns.iter().any(|column| {
740                    !matches!(
741                        &column.expr,
742                        Expr::FunctionCall { name, .. } if name.eq_ignore_ascii_case("count")
743                    )
744                }) {
745                    return Err(Error::PlanError(
746                        "mixed aggregate and non-aggregate columns without GROUP BY".to_string(),
747                    ));
748                }
749
750                let output_columns = columns
751                    .iter()
752                    .map(|column| {
753                        column.alias.clone().unwrap_or_else(|| match &column.expr {
754                            Expr::FunctionCall { name, .. } => name.clone(),
755                            _ => "expr".to_string(),
756                        })
757                    })
758                    .collect::<Vec<_>>();
759
760                let aggregate_row = columns
761                    .iter()
762                    .map(|column| match &column.expr {
763                        Expr::FunctionCall { name: _, args } => {
764                            let count = if matches!(
765                                args.as_slice(),
766                                [Expr::Column(contextdb_parser::ast::ColumnRef { table: None, column })]
767                                if column == "*"
768                            ) {
769                                input_result.rows.len() as i64
770                            } else {
771                                input_result
772                                    .rows
773                                    .iter()
774                                    .filter_map(|row| {
775                                        args.first().map(|arg| {
776                                            eval_query_result_expr(
777                                                arg,
778                                                row,
779                                                &input_result.columns,
780                                                params,
781                                            )
782                                        })
783                                    })
784                                    .collect::<Result<Vec<_>>>()?
785                                    .into_iter()
786                                    .filter(|value| *value != Value::Null)
787                                    .count() as i64
788                            };
789                            Ok(Value::Int64(count))
790                        }
791                        _ => Err(Error::PlanError(
792                            "mixed aggregate and non-aggregate columns without GROUP BY"
793                                .to_string(),
794                        )),
795                    })
796                    .collect::<Result<Vec<_>>>()?;
797
798                return Ok(QueryResult {
799                    columns: output_columns,
800                    rows: vec![aggregate_row],
801                    rows_affected: 0,
802                    trace: input_result.trace.clone(),
803                    cascade: None,
804                });
805            }
806
807            let output_columns = columns
808                .iter()
809                .map(|c| {
810                    c.alias.clone().unwrap_or_else(|| match &c.expr {
811                        Expr::Column(col) => col.column.clone(),
812                        _ => "expr".to_string(),
813                    })
814                })
815                .collect::<Vec<_>>();
816
817            let mut output_rows = Vec::with_capacity(input_result.rows.len());
818            for row in &input_result.rows {
819                let mut projected = Vec::with_capacity(columns.len());
820                for col in columns {
821                    projected.push(eval_project_expr(
822                        &col.expr,
823                        row,
824                        &input_result.columns,
825                        params,
826                    )?);
827                }
828                output_rows.push(projected);
829            }
830
831            Ok(QueryResult {
832                columns: output_columns,
833                rows: output_rows,
834                rows_affected: 0,
835                trace: input_result.trace.clone(),
836                cascade: None,
837            })
838        }
839        PhysicalPlan::Sort { input, keys } => {
840            // Sort elision path A: input is a Scan and an index's direction
841            // prefix matches `keys`. We rewrite the input into an IndexScan
842            // and skip the re-sort.
843            let elided = try_elide_sort(db, input, keys, params, tx)?;
844            if let Some(mut result) = elided {
845                result.trace.sort_elided = true;
846                return Ok(result);
847            }
848            let mut input_result = execute_plan(db, input, params, tx)?;
849
850            // Sort elision path B: the input already used an IndexScan whose
851            // column list + directions prefix-match the ORDER BY keys. The
852            // IndexScan already delivers rows in the requested order, so the
853            // Sort is a no-op; skip it and mark `sort_elided`.
854            if input_result.trace.physical_plan == "IndexScan"
855                && let Some(idx_name) = &input_result.trace.index_used
856                && sort_keys_match_index_prefix(db, input, idx_name, keys)
857            {
858                input_result.trace.sort_elided = true;
859                return Ok(input_result);
860            }
861            input_result.rows.sort_by(|left, right| {
862                for key in keys {
863                    let Expr::Column(column_ref) = &key.expr else {
864                        return Ordering::Equal;
865                    };
866                    let left_value =
867                        match lookup_query_result_column(left, &input_result.columns, column_ref) {
868                            Ok(value) => value,
869                            Err(_) => return Ordering::Equal,
870                        };
871                    let right_value = match lookup_query_result_column(
872                        right,
873                        &input_result.columns,
874                        column_ref,
875                    ) {
876                        Ok(value) => value,
877                        Err(_) => return Ordering::Equal,
878                    };
879                    let ordering = compare_sort_values(&left_value, &right_value, key.direction);
880                    if ordering != Ordering::Equal {
881                        return ordering;
882                    }
883                }
884                Ordering::Equal
885            });
886            // Preserve the child's physical_plan when it was an IndexScan:
887            // the trace reports the data-source strategy, and Sort is
888            // represented by `sort_elided = false` rather than overriding the
889            // plan label. A plain `Scan` child gets relabeled to `Sort` to
890            // match the plan's ORDER BY-without-index expectations.
891            if input_result.trace.physical_plan != "IndexScan" {
892                input_result.trace.physical_plan = "Sort";
893            }
894            input_result.trace.sort_elided = false;
895            Ok(input_result)
896        }
897        PhysicalPlan::Limit { input, count } => {
898            let mut input_result = execute_plan(db, input, params, tx)?;
899            input_result.rows.truncate(*count as usize);
900            Ok(input_result)
901        }
902        PhysicalPlan::Filter { input, predicate } => {
903            let mut input_result = execute_plan(db, input, params, tx)?;
904            input_result.rows.retain(|row| {
905                query_result_row_matches(row, &input_result.columns, predicate, params)
906                    .unwrap_or(false)
907            });
908            Ok(input_result)
909        }
910        PhysicalPlan::Distinct { input } => {
911            let input_result = execute_plan(db, input, params, tx)?;
912            let mut seen = HashSet::<Vec<u8>>::new();
913            let rows = input_result
914                .rows
915                .into_iter()
916                .filter(|row| seen.insert(distinct_row_key(row)))
917                .collect();
918            Ok(QueryResult {
919                columns: input_result.columns,
920                rows,
921                rows_affected: input_result.rows_affected,
922                trace: input_result.trace,
923                cascade: None,
924            })
925        }
926        PhysicalPlan::Join {
927            left,
928            right,
929            condition,
930            join_type,
931            left_alias,
932            right_alias,
933        } => {
934            let left_result = execute_plan(db, left, params, tx)?;
935            let right_result = execute_plan(db, right, params, tx)?;
936            let right_duplicate_names =
937                duplicate_column_names(&left_result.columns, &right_result.columns);
938            let right_prefix = right_alias
939                .clone()
940                .unwrap_or_else(|| right_table_name(right));
941            let right_columns = right_result
942                .columns
943                .iter()
944                .map(|column| {
945                    if right_duplicate_names.contains(column) {
946                        format!("{right_prefix}.{column}")
947                    } else {
948                        column.clone()
949                    }
950                })
951                .collect::<Vec<_>>();
952
953            let mut columns = left_result.columns.clone();
954            columns.extend(right_columns);
955
956            let mut rows = Vec::new();
957            for left_row in &left_result.rows {
958                let mut matched = false;
959                for right_row in &right_result.rows {
960                    let combined = concatenate_rows(left_row, right_row);
961                    if query_result_row_matches(&combined, &columns, condition, params)? {
962                        matched = true;
963                        rows.push(combined);
964                    }
965                }
966
967                if !matched && matches!(join_type, contextdb_planner::JoinType::Left) {
968                    let mut combined = left_row.clone();
969                    combined.extend(std::iter::repeat_n(Value::Null, right_result.columns.len()));
970                    rows.push(combined);
971                }
972            }
973
974            let output_columns = qualify_join_columns(
975                &columns,
976                &left_result.columns,
977                &right_result.columns,
978                left_alias,
979                &right_prefix,
980            );
981
982            Ok(QueryResult {
983                columns: output_columns,
984                rows,
985                rows_affected: 0,
986                trace: crate::database::QueryTrace::scan(),
987                cascade: None,
988            })
989        }
990        PhysicalPlan::CreateIndex(p) => exec_create_index(db, p),
991        PhysicalPlan::DropIndex(p) => exec_drop_index(db, p),
992        PhysicalPlan::IndexScan {
993            table,
994            index,
995            range: _,
996        } => {
997            // Stub: always return empty rows + an IndexScan trace marker. Impl
998            // must walk the BTreeMap at the named index, apply visibility,
999            // materialize rows, and populate the trace fully.
1000            let _ = (table, index);
1001            Ok(QueryResult {
1002                columns: vec![],
1003                rows: vec![],
1004                rows_affected: 0,
1005                trace: crate::database::QueryTrace {
1006                    physical_plan: "IndexScan",
1007                    index_used: None,
1008                    ..crate::database::QueryTrace::default()
1009                },
1010                cascade: None,
1011            })
1012        }
1013        PhysicalPlan::SetMemoryLimit(val) => {
1014            let limit = match val {
1015                SetMemoryLimitValue::Bytes(bytes) => Some(*bytes),
1016                SetMemoryLimitValue::None => None,
1017            };
1018            db.accountant().set_budget(limit)?;
1019            db.persist_memory_limit(limit)?;
1020            Ok(QueryResult::empty())
1021        }
1022        PhysicalPlan::ShowMemoryLimit => {
1023            let usage = db.accountant().usage();
1024            Ok(QueryResult {
1025                columns: vec![
1026                    "limit".to_string(),
1027                    "used".to_string(),
1028                    "available".to_string(),
1029                    "startup_ceiling".to_string(),
1030                ],
1031                rows: vec![vec![
1032                    usage
1033                        .limit
1034                        .map(|value| Value::Int64(value as i64))
1035                        .unwrap_or_else(|| Value::Text("none".to_string())),
1036                    Value::Int64(usage.used as i64),
1037                    usage
1038                        .available
1039                        .map(|value| Value::Int64(value as i64))
1040                        .unwrap_or_else(|| Value::Text("none".to_string())),
1041                    usage
1042                        .startup_ceiling
1043                        .map(|value| Value::Int64(value as i64))
1044                        .unwrap_or_else(|| Value::Text("none".to_string())),
1045                ]],
1046                rows_affected: 0,
1047                trace: crate::database::QueryTrace::scan(),
1048                cascade: None,
1049            })
1050        }
1051        PhysicalPlan::SetDiskLimit(val) => {
1052            let limit = match val {
1053                SetDiskLimitValue::Bytes(bytes) => Some(*bytes),
1054                SetDiskLimitValue::None => None,
1055            };
1056            db.set_disk_limit(limit)?;
1057            db.persist_disk_limit(limit)?;
1058            Ok(QueryResult::empty())
1059        }
1060        PhysicalPlan::ShowDiskLimit => {
1061            let limit = db.disk_limit();
1062            let used = db.disk_file_size();
1063            let startup_ceiling = db.disk_limit_startup_ceiling();
1064            Ok(QueryResult {
1065                columns: vec![
1066                    "limit".to_string(),
1067                    "used".to_string(),
1068                    "available".to_string(),
1069                    "startup_ceiling".to_string(),
1070                ],
1071                rows: vec![vec![
1072                    limit
1073                        .map(|value| Value::Int64(value as i64))
1074                        .unwrap_or_else(|| Value::Text("none".to_string())),
1075                    used.map(|value| Value::Int64(value as i64))
1076                        .unwrap_or(Value::Null),
1077                    match (limit, used) {
1078                        (Some(limit), Some(used)) => {
1079                            Value::Int64(limit.saturating_sub(used) as i64)
1080                        }
1081                        _ => Value::Null,
1082                    },
1083                    startup_ceiling
1084                        .map(|value| Value::Int64(value as i64))
1085                        .unwrap_or_else(|| Value::Text("none".to_string())),
1086                ]],
1087                rows_affected: 0,
1088                trace: crate::database::QueryTrace::scan(),
1089                cascade: None,
1090            })
1091        }
1092        PhysicalPlan::SetSyncConflictPolicy(policy) => {
1093            let cp = parse_conflict_policy(policy)?;
1094            db.set_default_conflict_policy(cp);
1095            Ok(QueryResult::empty())
1096        }
1097        PhysicalPlan::ShowSyncConflictPolicy => {
1098            let policies = db.conflict_policies();
1099            let default_str = conflict_policy_to_string(policies.default);
1100            let mut rows = vec![vec![Value::Text(default_str)]];
1101            for (table, policy) in &policies.per_table {
1102                rows.push(vec![Value::Text(format!(
1103                    "{}={}",
1104                    table,
1105                    conflict_policy_to_string(*policy)
1106                ))]);
1107            }
1108            Ok(QueryResult {
1109                columns: vec!["policy".to_string()],
1110                rows,
1111                rows_affected: 0,
1112                trace: crate::database::QueryTrace::scan(),
1113                cascade: None,
1114            })
1115        }
1116        PhysicalPlan::Pipeline(plans) => {
1117            let mut last = QueryResult::empty();
1118            for p in plans {
1119                last = execute_plan(db, p, params, tx)?;
1120            }
1121            Ok(last)
1122        }
1123        _ => Err(Error::PlanError(
1124            "unsupported plan node in executor".to_string(),
1125        )),
1126    }
1127}
1128
1129fn eval_project_expr(
1130    expr: &Expr,
1131    row: &[Value],
1132    input_columns: &[String],
1133    params: &HashMap<String, Value>,
1134) -> Result<Value> {
1135    match expr {
1136        Expr::Column(c) => lookup_query_result_column(row, input_columns, c),
1137        Expr::Literal(lit) => resolve_expr(&Expr::Literal(lit.clone()), params),
1138        Expr::Parameter(name) => params
1139            .get(name)
1140            .cloned()
1141            .ok_or_else(|| Error::NotFound(format!("missing parameter: {}", name))),
1142        Expr::BinaryOp { left, op, right } => {
1143            let left = eval_query_result_expr(left, row, input_columns, params)?;
1144            let right = eval_query_result_expr(right, row, input_columns, params)?;
1145            eval_binary_op(op, &left, &right)
1146        }
1147        Expr::UnaryOp { op, operand } => {
1148            let value = eval_query_result_expr(operand, row, input_columns, params)?;
1149            match op {
1150                UnaryOp::Not => Ok(Value::Bool(!value_to_bool(&value))),
1151                UnaryOp::Neg => match value {
1152                    Value::Int64(v) => Ok(Value::Int64(-v)),
1153                    Value::Float64(v) => Ok(Value::Float64(-v)),
1154                    _ => Err(Error::PlanError(
1155                        "cannot negate non-numeric value".to_string(),
1156                    )),
1157                },
1158            }
1159        }
1160        Expr::FunctionCall { name, args } => {
1161            let values = args
1162                .iter()
1163                .map(|arg| eval_query_result_expr(arg, row, input_columns, params))
1164                .collect::<Result<Vec<_>>>()?;
1165            eval_function(name, &values)
1166        }
1167        Expr::IsNull { expr, negated } => {
1168            let is_null = eval_query_result_expr(expr, row, input_columns, params)? == Value::Null;
1169            Ok(Value::Bool(if *negated { !is_null } else { is_null }))
1170        }
1171        Expr::InList {
1172            expr,
1173            list,
1174            negated,
1175        } => {
1176            let needle = eval_query_result_expr(expr, row, input_columns, params)?;
1177            let matched = list.iter().try_fold(false, |found, item| {
1178                if found {
1179                    Ok(true)
1180                } else {
1181                    let candidate = eval_query_result_expr(item, row, input_columns, params)?;
1182                    Ok(
1183                        matches!(compare_values(&needle, &candidate), Some(Ordering::Equal))
1184                            || (needle != Value::Null
1185                                && candidate != Value::Null
1186                                && needle == candidate),
1187                    )
1188                }
1189            })?;
1190            Ok(Value::Bool(if *negated { !matched } else { matched }))
1191        }
1192        Expr::Like {
1193            expr,
1194            pattern,
1195            negated,
1196        } => {
1197            let matches = match (
1198                eval_query_result_expr(expr, row, input_columns, params)?,
1199                eval_query_result_expr(pattern, row, input_columns, params)?,
1200            ) {
1201                (Value::Text(value), Value::Text(pattern)) => like_matches(&value, &pattern),
1202                _ => false,
1203            };
1204            Ok(Value::Bool(if *negated { !matches } else { matches }))
1205        }
1206        _ => resolve_expr(expr, params),
1207    }
1208}
1209
1210fn eval_query_result_expr(
1211    expr: &Expr,
1212    row: &[Value],
1213    input_columns: &[String],
1214    params: &HashMap<String, Value>,
1215) -> Result<Value> {
1216    match expr {
1217        Expr::Column(c) => lookup_query_result_column(row, input_columns, c),
1218        Expr::Literal(lit) => resolve_expr(&Expr::Literal(lit.clone()), params),
1219        Expr::Parameter(name) => params
1220            .get(name)
1221            .cloned()
1222            .ok_or_else(|| Error::NotFound(format!("missing parameter: {}", name))),
1223        Expr::FunctionCall { name, args } => {
1224            let values = args
1225                .iter()
1226                .map(|arg| eval_query_result_expr(arg, row, input_columns, params))
1227                .collect::<Result<Vec<_>>>()?;
1228            eval_function(name, &values)
1229        }
1230        _ => resolve_expr(expr, params),
1231    }
1232}
1233
1234fn exec_insert(
1235    db: &Database,
1236    p: &InsertPlan,
1237    params: &HashMap<String, Value>,
1238    tx: Option<TxId>,
1239) -> Result<QueryResult> {
1240    db.check_disk_budget("INSERT")?;
1241    let txid = tx.ok_or_else(|| Error::Other("missing tx for insert".to_string()))?;
1242
1243    let insert_meta = db
1244        .table_meta(&p.table)
1245        .ok_or_else(|| Error::TableNotFound(p.table.clone()))?;
1246    // When no column list is provided (INSERT INTO t VALUES (...)),
1247    // infer column names from table metadata in declaration order.
1248    let columns: Vec<String> = if p.columns.is_empty() {
1249        insert_meta.columns.iter().map(|c| c.name.clone()).collect()
1250    } else {
1251        p.columns.clone()
1252    };
1253
1254    // Statement-scoped snapshot of the committed TxId watermark for TXID bound checks.
1255    let current_tx_max = Some(db.committed_watermark());
1256    let route_inserts_to_graph =
1257        p.table.eq_ignore_ascii_case("edges") || !insert_meta.dag_edge_types.is_empty();
1258    let vector_column = insert_meta.columns.iter().find_map(|column| {
1259        if matches!(column.column_type, contextdb_core::ColumnType::Vector(_)) {
1260            Some(column.name.clone())
1261        } else {
1262            None
1263        }
1264    });
1265    let has_column_defaults = insert_meta
1266        .columns
1267        .iter()
1268        .any(|column| column.default.is_some());
1269
1270    let mut rows_affected = 0;
1271    for row in &p.values {
1272        let mut values = HashMap::new();
1273        for (idx, expr) in row.iter().enumerate() {
1274            let col = columns
1275                .get(idx)
1276                .ok_or_else(|| Error::PlanError("column/value count mismatch".to_string()))?;
1277            let v = resolve_expr(expr, params)?;
1278            values.insert(
1279                col.clone(),
1280                coerce_value_for_column_with_meta(
1281                    &p.table,
1282                    &insert_meta,
1283                    col,
1284                    v,
1285                    current_tx_max,
1286                    Some(txid),
1287                )?,
1288            );
1289        }
1290
1291        if has_column_defaults {
1292            apply_missing_column_defaults(db, &p.table, &mut values, Some(txid))?;
1293        }
1294
1295        if vector_column.is_some() {
1296            validate_vector_columns(db, &p.table, &values)?;
1297        }
1298        let row_bytes = estimate_row_bytes_for_meta(&values, &insert_meta, false);
1299        db.accountant().try_allocate_for(
1300            row_bytes,
1301            "insert",
1302            "row_insert",
1303            "Reduce row size or raise MEMORY_LIMIT before inserting more data.",
1304        )?;
1305        let checkpoint = db.write_set_checkpoint(txid)?;
1306        let graph_edge = if route_inserts_to_graph {
1307            match (
1308                values.get("source_id"),
1309                values.get("target_id"),
1310                values.get("edge_type"),
1311            ) {
1312                (
1313                    Some(Value::Uuid(source)),
1314                    Some(Value::Uuid(target)),
1315                    Some(Value::Text(edge_type)),
1316                ) => Some((*source, *target, edge_type.clone())),
1317                _ => None,
1318            }
1319        } else {
1320            None
1321        };
1322        let vector_value = vector_column
1323            .as_ref()
1324            .and_then(|column| match values.get(column) {
1325                Some(Value::Vector(vector)) => Some(vector.clone()),
1326                _ => None,
1327            });
1328
1329        let row_id = if let Some(on_conflict) = &p.on_conflict {
1330            let conflict_col = &on_conflict.columns[0];
1331            let conflict_value = values
1332                .get(conflict_col)
1333                .ok_or_else(|| Error::Other("conflict column not in values".to_string()))?;
1334            let existing =
1335                db.point_lookup(&p.table, conflict_col, conflict_value, db.snapshot())?;
1336            let existing_row_id = existing.as_ref().map(|row| row.row_id);
1337            let existing_has_vector = existing
1338                .as_ref()
1339                .is_some_and(|row| db.has_live_vector(row.row_id, db.snapshot()));
1340            let upsert_values = if let Some(existing_row) = existing.as_ref() {
1341                match apply_on_conflict_updates(
1342                    db,
1343                    &p.table,
1344                    values.clone(),
1345                    existing_row,
1346                    on_conflict,
1347                    params,
1348                    Some(txid),
1349                ) {
1350                    Ok(v) => v,
1351                    Err(err) => {
1352                        db.accountant().release(row_bytes);
1353                        let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1354                        return Err(err);
1355                    }
1356                }
1357            } else {
1358                values.clone()
1359            };
1360
1361            match db.upsert_row(txid, &p.table, conflict_col, upsert_values) {
1362                Ok(UpsertResult::Inserted) => {
1363                    db.point_lookup_in_tx(
1364                        txid,
1365                        &p.table,
1366                        conflict_col,
1367                        conflict_value,
1368                        db.snapshot(),
1369                    )?
1370                    .ok_or_else(|| {
1371                        Error::Other("inserted upsert row not visible in tx".to_string())
1372                    })?
1373                    .row_id
1374                }
1375                Ok(UpsertResult::Updated) => {
1376                    if existing_has_vector && let Some(existing_row_id) = existing_row_id {
1377                        db.delete_vector(txid, existing_row_id)?;
1378                    }
1379                    db.point_lookup_in_tx(
1380                        txid,
1381                        &p.table,
1382                        conflict_col,
1383                        conflict_value,
1384                        db.snapshot(),
1385                    )?
1386                    .ok_or_else(|| {
1387                        Error::Other("updated upsert row not visible in tx".to_string())
1388                    })?
1389                    .row_id
1390                }
1391                Ok(UpsertResult::NoOp) => {
1392                    db.accountant().release(row_bytes);
1393                    RowId(0)
1394                }
1395                Err(err) => {
1396                    db.accountant().release(row_bytes);
1397                    return Err(err);
1398                }
1399            }
1400        } else {
1401            match db.insert_row_with_unique_noop(txid, &p.table, values) {
1402                Ok(InsertRowResult::Inserted(row_id)) => row_id,
1403                Ok(InsertRowResult::NoOp) => {
1404                    db.accountant().release(row_bytes);
1405                    continue;
1406                }
1407                Err(err) => {
1408                    db.accountant().release(row_bytes);
1409                    return Err(err);
1410                }
1411            }
1412        };
1413
1414        if let Some((source, target, edge_type)) = graph_edge {
1415            match db.insert_edge(txid, source, target, edge_type, HashMap::new()) {
1416                Ok(true) => {}
1417                Ok(false) => {
1418                    let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1419                    db.accountant().release(row_bytes);
1420                    continue;
1421                }
1422                Err(err) => {
1423                    let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1424                    db.accountant().release(row_bytes);
1425                    return Err(err);
1426                }
1427            }
1428        }
1429
1430        if let Some(v) = vector_value
1431            && row_id != RowId(0)
1432            && let Err(err) = db.insert_vector(txid, row_id, v)
1433        {
1434            let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1435            db.accountant().release(row_bytes);
1436            return Err(err);
1437        }
1438
1439        rows_affected += 1;
1440    }
1441
1442    Ok(QueryResult::empty_with_affected(rows_affected))
1443}
1444
1445fn exec_delete(
1446    db: &Database,
1447    p: &DeletePlan,
1448    params: &HashMap<String, Value>,
1449    tx: Option<TxId>,
1450) -> Result<QueryResult> {
1451    let txid = tx.ok_or_else(|| Error::Other("missing tx for delete".to_string()))?;
1452    let snapshot = db.snapshot();
1453    let rows = db.scan(&p.table, snapshot)?;
1454    let resolved_where = p
1455        .where_clause
1456        .as_ref()
1457        .map(|expr| resolve_in_subqueries(db, expr, params, tx))
1458        .transpose()?;
1459    let matched: Vec<_> = rows
1460        .into_iter()
1461        .filter(|r| {
1462            resolved_where
1463                .as_ref()
1464                .is_none_or(|w| row_matches(r, w, params).unwrap_or(false))
1465        })
1466        .collect();
1467
1468    for row in &matched {
1469        if db.has_live_vector(row.row_id, snapshot) {
1470            db.delete_vector(txid, row.row_id)?;
1471        }
1472        db.delete_row(txid, &p.table, row.row_id)?;
1473    }
1474
1475    Ok(QueryResult::empty_with_affected(matched.len() as u64))
1476}
1477
1478fn exec_update(
1479    db: &Database,
1480    p: &UpdatePlan,
1481    params: &HashMap<String, Value>,
1482    tx: Option<TxId>,
1483) -> Result<QueryResult> {
1484    db.check_disk_budget("UPDATE")?;
1485    let txid = tx.ok_or_else(|| Error::Other("missing tx for update".to_string()))?;
1486    let snapshot = db.snapshot();
1487    // Use in-tx scan so prior statements in a BEGIN/COMMIT block are visible:
1488    // the old row must not shadow a previously-updated in-tx row.
1489    let rows = db.scan_in_tx(txid, &p.table, snapshot)?;
1490    let resolved_where = p
1491        .where_clause
1492        .as_ref()
1493        .map(|expr| resolve_in_subqueries(db, expr, params, tx))
1494        .transpose()?;
1495    let matched: Vec<_> = rows
1496        .into_iter()
1497        .filter(|r| {
1498            resolved_where
1499                .as_ref()
1500                .is_none_or(|w| row_matches(r, w, params).unwrap_or(false))
1501        })
1502        .collect();
1503
1504    let current_tx_max = Some(db.committed_watermark());
1505
1506    for row in &matched {
1507        let mut values = row.values.clone();
1508        for (k, vexpr) in &p.assignments {
1509            let value = eval_assignment_expr(vexpr, &row.values, params)?;
1510            values.insert(
1511                k.clone(),
1512                coerce_value_for_column(db, &p.table, k, value, current_tx_max, Some(txid))?,
1513            );
1514        }
1515        validate_update_state_transition(db, &p.table, row, &values)?;
1516        let row_uuid = values.get("id").and_then(Value::as_uuid).copied();
1517        let new_state = db
1518            .table_meta(&p.table)
1519            .as_ref()
1520            .and_then(|meta| meta.state_machine.as_ref())
1521            .and_then(|sm| values.get(&sm.column))
1522            .and_then(Value::as_text)
1523            .map(std::borrow::ToOwned::to_owned);
1524
1525        let old_has_vector = db.has_live_vector(row.row_id, snapshot);
1526        validate_vector_columns(db, &p.table, &values)?;
1527        let new_vector = vector_value_for_table(db, &p.table, &values).cloned();
1528        let new_row_bytes = estimate_table_row_bytes(db, &p.table, &values)?;
1529        db.accountant().try_allocate_for(
1530            new_row_bytes,
1531            "update",
1532            "row_replace",
1533            "Reduce row growth or raise MEMORY_LIMIT before updating this row.",
1534        )?;
1535        let checkpoint = db.write_set_checkpoint(txid)?;
1536
1537        if let Err(err) = db.delete_row(txid, &p.table, row.row_id) {
1538            db.accountant().release(new_row_bytes);
1539            return Err(err);
1540        }
1541        if old_has_vector && let Err(err) = db.delete_vector(txid, row.row_id) {
1542            db.accountant().release(new_row_bytes);
1543            let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1544            return Err(err);
1545        }
1546
1547        let new_row_id = match db.insert_row_replacing(txid, &p.table, values, row.row_id) {
1548            Ok(row_id) => row_id,
1549            Err(err) => {
1550                db.accountant().release(new_row_bytes);
1551                let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1552                return Err(err);
1553            }
1554        };
1555        if let Some(vector) = new_vector
1556            && let Err(err) = db.insert_vector(txid, new_row_id, vector)
1557        {
1558            db.accountant().release(new_row_bytes);
1559            let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1560            return Err(err);
1561        }
1562        if let Err(err) =
1563            db.propagate_state_change_if_needed(txid, &p.table, row_uuid, new_state.as_deref())
1564        {
1565            db.accountant().release(new_row_bytes);
1566            let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1567            return Err(err);
1568        }
1569    }
1570
1571    Ok(QueryResult::empty_with_affected(matched.len() as u64))
1572}
1573
1574fn exec_create_index(
1575    db: &Database,
1576    plan: &contextdb_planner::CreateIndexPlan,
1577) -> Result<QueryResult> {
1578    // Reserved-prefix guard: user-declared indexes must not collide with the
1579    // auto-index namespace used for PRIMARY KEY / UNIQUE backing indexes.
1580    for prefix in ["__pk_", "__unique_"] {
1581        if plan.name.starts_with(prefix) {
1582            return Err(Error::ReservedIndexName {
1583                table: plan.table.clone(),
1584                name: plan.name.clone(),
1585                prefix: prefix.to_string(),
1586            });
1587        }
1588    }
1589
1590    // Error precedence (plan §Error precedence): TableNotFound > ColumnNotFound
1591    // > ColumnNotIndexable > DuplicateIndex. Check in that exact order so
1592    // "structural" bugs surface before "naming" bugs.
1593    let meta = db
1594        .table_meta(&plan.table)
1595        .ok_or_else(|| Error::TableNotFound(plan.table.clone()))?;
1596
1597    // 2. Check every column exists.
1598    for (col_name, _) in &plan.columns {
1599        if !meta.columns.iter().any(|c| c.name == *col_name) {
1600            return Err(Error::ColumnNotFound {
1601                table: plan.table.clone(),
1602                column: col_name.clone(),
1603            });
1604        }
1605    }
1606
1607    // 3. Check every column type is B-tree indexable.
1608    for (col_name, _) in &plan.columns {
1609        let col = meta
1610            .columns
1611            .iter()
1612            .find(|c| c.name == *col_name)
1613            .expect("column existence verified above");
1614        if matches!(col.column_type, ColumnType::Json | ColumnType::Vector(_)) {
1615            return Err(Error::ColumnNotIndexable {
1616                table: plan.table.clone(),
1617                column: col_name.clone(),
1618                column_type: col.column_type.clone(),
1619            });
1620        }
1621    }
1622
1623    // 4. Duplicate-name check (last).
1624    if meta.indexes.iter().any(|i| i.name == plan.name) {
1625        return Err(Error::DuplicateIndex {
1626            table: plan.table.clone(),
1627            index: plan.name.clone(),
1628        });
1629    }
1630
1631    // All validations passed. Persist the IndexDecl into TableMeta.indexes,
1632    // register the IndexStorage, rebuild it over existing rows, flush meta.
1633    {
1634        let store = db.relational_store();
1635        let mut metas = store.table_meta.write();
1636        let m = metas
1637            .get_mut(&plan.table)
1638            .ok_or_else(|| Error::TableNotFound(plan.table.clone()))?;
1639        m.indexes.push(contextdb_core::IndexDecl {
1640            name: plan.name.clone(),
1641            columns: plan.columns.clone(),
1642            kind: contextdb_core::IndexKind::UserDeclared,
1643        });
1644    }
1645    db.relational_store()
1646        .create_index_storage(&plan.table, &plan.name, plan.columns.clone());
1647    db.relational_store().rebuild_index(&plan.table, &plan.name);
1648
1649    if let Some(table_meta) = db.table_meta(&plan.table) {
1650        db.persist_table_meta(&plan.table, &table_meta)?;
1651    }
1652
1653    db.allocate_ddl_lsn(|lsn| {
1654        db.log_create_index_ddl(&plan.table, &plan.name, &plan.columns, lsn)
1655    })?;
1656
1657    db.clear_statement_cache();
1658    Ok(QueryResult::empty_with_affected(0))
1659}
1660
1661fn exec_drop_index(db: &Database, plan: &contextdb_planner::DropIndexPlan) -> Result<QueryResult> {
1662    let meta = db
1663        .table_meta(&plan.table)
1664        .ok_or_else(|| Error::TableNotFound(plan.table.clone()))?;
1665    let exists = meta.indexes.iter().any(|i| i.name == plan.name);
1666    if !exists {
1667        if plan.if_exists {
1668            return Ok(QueryResult::empty_with_affected(0));
1669        }
1670        return Err(Error::IndexNotFound {
1671            table: plan.table.clone(),
1672            index: plan.name.clone(),
1673        });
1674    }
1675    {
1676        let store = db.relational_store();
1677        let mut metas = store.table_meta.write();
1678        if let Some(m) = metas.get_mut(&plan.table) {
1679            m.indexes.retain(|i| i.name != plan.name);
1680        }
1681    }
1682    db.relational_store()
1683        .drop_index_storage(&plan.table, &plan.name);
1684    if let Some(table_meta) = db.table_meta(&plan.table) {
1685        db.persist_table_meta(&plan.table, &table_meta)?;
1686    }
1687    db.allocate_ddl_lsn(|lsn| db.log_drop_index_ddl(&plan.table, &plan.name, lsn))?;
1688    db.clear_statement_cache();
1689    Ok(QueryResult::empty_with_affected(0))
1690}
1691
1692fn estimate_table_row_bytes(
1693    db: &Database,
1694    table: &str,
1695    values: &HashMap<String, Value>,
1696) -> Result<usize> {
1697    let meta = db
1698        .table_meta(table)
1699        .ok_or_else(|| Error::TableNotFound(table.to_string()))?;
1700    Ok(estimate_row_bytes_for_meta(values, &meta, false))
1701}
1702
1703// ========================= Index scan planning + execution =========================
1704
1705/// Shape of a predicate on the first indexed column. Drives IndexScan
1706/// eligibility: equality narrows to a point, range to a range walk, IN-list
1707/// to multiple point lookups, IS NULL to the NULL partition.
1708#[derive(Debug, Clone)]
1709pub(crate) enum IndexPredicateShape {
1710    Equality(Value),
1711    NotEqual(Value),
1712    Range {
1713        lower: std::ops::Bound<Value>,
1714        upper: std::ops::Bound<Value>,
1715    },
1716    InList(Vec<Value>),
1717    IsNull,
1718    IsNotNull,
1719}
1720
1721impl IndexPredicateShape {
1722    /// Selectivity tier — lower is more selective.
1723    fn selectivity_tier(&self) -> u8 {
1724        match self {
1725            IndexPredicateShape::Equality(_) | IndexPredicateShape::InList(_) => 0,
1726            IndexPredicateShape::Range { .. } | IndexPredicateShape::NotEqual(_) => 1,
1727            IndexPredicateShape::IsNull | IndexPredicateShape::IsNotNull => 2,
1728        }
1729    }
1730}
1731
1732#[derive(Debug, Clone)]
1733struct IndexPick {
1734    name: String,
1735    columns: Vec<(String, contextdb_core::SortDirection)>,
1736    /// Shape on the FIRST indexed column. Only one shape drives the scan.
1737    shape: IndexPredicateShape,
1738    /// Pushed column name (engine column name) for trace.
1739    pushed_column: String,
1740}
1741
1742/// Top-level decision: did we rewrite to IndexScan?
1743/// Carries index pick + the rejected candidates (for trace) + residual filter.
1744struct IndexAnalysis {
1745    pick: Option<IndexPick>,
1746    considered: Vec<crate::database::IndexCandidate>,
1747}
1748
1749/// Coerce every literal value inside `pick.shape` to `pick.pushed_column`'s
1750/// declared type. B-tree walks use variant-exact comparisons by design, so a
1751/// SELECT `WHERE uuid_col = 'text-literal'` must arrive at the executor with
1752/// the text already converted to Uuid. Coercion failure propagates so callers
1753/// can fall back to zero-rows — matching the semantics a predicate-evaluating
1754/// scan would produce on an un-coercible literal.
1755fn coerce_pick_shape_to_column_type(
1756    db: &Database,
1757    table: &str,
1758    pick: &IndexPick,
1759) -> Result<IndexPick> {
1760    use std::ops::Bound;
1761    let col = &pick.pushed_column;
1762    let coerce = |v: Value| coerce_value_for_column(db, table, col, v, None, None);
1763    let new_shape = match &pick.shape {
1764        IndexPredicateShape::Equality(v) => IndexPredicateShape::Equality(coerce(v.clone())?),
1765        IndexPredicateShape::InList(vs) => IndexPredicateShape::InList(
1766            vs.iter()
1767                .cloned()
1768                .map(&coerce)
1769                .collect::<Result<Vec<_>>>()?,
1770        ),
1771        IndexPredicateShape::Range { lower, upper } => {
1772            let lower = match lower {
1773                Bound::Included(v) => Bound::Included(coerce(v.clone())?),
1774                Bound::Excluded(v) => Bound::Excluded(coerce(v.clone())?),
1775                Bound::Unbounded => Bound::Unbounded,
1776            };
1777            let upper = match upper {
1778                Bound::Included(v) => Bound::Included(coerce(v.clone())?),
1779                Bound::Excluded(v) => Bound::Excluded(coerce(v.clone())?),
1780                Bound::Unbounded => Bound::Unbounded,
1781            };
1782            IndexPredicateShape::Range { lower, upper }
1783        }
1784        IndexPredicateShape::NotEqual(v) => IndexPredicateShape::NotEqual(coerce(v.clone())?),
1785        IndexPredicateShape::IsNull => IndexPredicateShape::IsNull,
1786        IndexPredicateShape::IsNotNull => IndexPredicateShape::IsNotNull,
1787    };
1788    Ok(IndexPick {
1789        name: pick.name.clone(),
1790        columns: pick.columns.clone(),
1791        shape: new_shape,
1792        pushed_column: pick.pushed_column.clone(),
1793    })
1794}
1795
1796/// Inspect `filter` looking for an eligible predicate on the first column of
1797/// any declared index. Returns the chosen pick + list of considered/rejected.
1798fn analyze_filter_for_index(
1799    filter: &Expr,
1800    indexes: &[contextdb_core::IndexDecl],
1801    params: &HashMap<String, Value>,
1802) -> IndexAnalysis {
1803    use std::borrow::Cow;
1804    let mut considered: Vec<crate::database::IndexCandidate> = Vec::new();
1805
1806    // Find each conjunct (split on AND) and map to (column, shape).
1807    let conjuncts = split_conjuncts(filter);
1808    let mut conjunct_shapes: Vec<(String, IndexPredicateShape)> = Vec::new();
1809    for conjunct in &conjuncts {
1810        if let Some((col, shape)) = classify_index_predicate(conjunct, params) {
1811            conjunct_shapes.push((col, shape));
1812        }
1813    }
1814
1815    // Annotate rejections on indexes that can't apply, for the trace.
1816    let mut candidates: Vec<(IndexPick, u8, usize)> = Vec::new();
1817    for (i_idx, decl) in indexes.iter().enumerate() {
1818        let first_col = match decl.columns.first() {
1819            Some((c, _)) => c.clone(),
1820            None => continue,
1821        };
1822        // Find the most-selective matching conjunct on the first column.
1823        let matching: Vec<&(String, IndexPredicateShape)> = conjunct_shapes
1824            .iter()
1825            .filter(|(c, _)| c == &first_col)
1826            .collect();
1827        if matching.is_empty() {
1828            // Check whether the filter mentions first_col in an un-usable way
1829            // (function call / arithmetic / col-to-col / subquery) to produce
1830            // a useful rejection reason.
1831            let reason = classify_rejection_reason(filter, &first_col);
1832            considered.push(crate::database::IndexCandidate {
1833                name: decl.name.clone(),
1834                rejected_reason: Cow::Borrowed(reason),
1835            });
1836            continue;
1837        }
1838        // Combine range conjuncts on the same column (BETWEEN = `>= X AND <= Y`).
1839        let shape = combine_shapes(matching.iter().map(|(_, s)| s.clone()).collect());
1840        let tier = shape.selectivity_tier();
1841        candidates.push((
1842            IndexPick {
1843                name: decl.name.clone(),
1844                columns: decl.columns.clone(),
1845                shape,
1846                pushed_column: first_col.clone(),
1847            },
1848            tier,
1849            i_idx,
1850        ));
1851    }
1852
1853    // Selection: most-selective tier wins; tie-break by creation order (index i).
1854    let pick = candidates
1855        .into_iter()
1856        .min_by(|a, b| a.1.cmp(&b.1).then(a.2.cmp(&b.2)))
1857        .map(|(p, _, _)| p);
1858
1859    IndexAnalysis { pick, considered }
1860}
1861
1862/// Combine multiple index-shapes on the same column into the most-selective
1863/// composite form. Used by BETWEEN (which becomes `col >= X AND col <= Y`).
1864fn combine_shapes(mut shapes: Vec<IndexPredicateShape>) -> IndexPredicateShape {
1865    // Find the single best (most selective) shape.
1866    shapes.sort_by_key(|s| s.selectivity_tier());
1867    let head = shapes.remove(0);
1868    // If the head is a Range, try to merge subsequent Range conjuncts into it.
1869    if let IndexPredicateShape::Range {
1870        mut lower,
1871        mut upper,
1872    } = head.clone()
1873    {
1874        for s in shapes {
1875            if let IndexPredicateShape::Range { lower: l, upper: u } = s {
1876                // Merge lower: more restrictive is higher.
1877                lower = tighter_lower(&lower, &l);
1878                upper = tighter_upper(&upper, &u);
1879            }
1880        }
1881        return IndexPredicateShape::Range { lower, upper };
1882    }
1883    head
1884}
1885
1886fn tighter_lower(a: &std::ops::Bound<Value>, b: &std::ops::Bound<Value>) -> std::ops::Bound<Value> {
1887    use std::ops::Bound;
1888    match (a, b) {
1889        (Bound::Unbounded, _) => b.clone(),
1890        (_, Bound::Unbounded) => a.clone(),
1891        (Bound::Included(va), Bound::Included(vb)) => {
1892            if compare_values(va, vb).is_some_and(|o| o == std::cmp::Ordering::Greater) {
1893                a.clone()
1894            } else {
1895                b.clone()
1896            }
1897        }
1898        (Bound::Excluded(va), Bound::Excluded(vb)) => {
1899            if compare_values(va, vb).is_some_and(|o| o == std::cmp::Ordering::Greater) {
1900                a.clone()
1901            } else {
1902                b.clone()
1903            }
1904        }
1905        (Bound::Included(va), Bound::Excluded(vb)) => {
1906            if compare_values(va, vb).is_some_and(|o| o == std::cmp::Ordering::Greater) {
1907                a.clone()
1908            } else {
1909                b.clone()
1910            }
1911        }
1912        (Bound::Excluded(va), Bound::Included(vb)) => {
1913            if compare_values(va, vb).is_some_and(|o| o == std::cmp::Ordering::Less) {
1914                b.clone()
1915            } else {
1916                a.clone()
1917            }
1918        }
1919    }
1920}
1921
1922fn tighter_upper(a: &std::ops::Bound<Value>, b: &std::ops::Bound<Value>) -> std::ops::Bound<Value> {
1923    use std::ops::Bound;
1924    match (a, b) {
1925        (Bound::Unbounded, _) => b.clone(),
1926        (_, Bound::Unbounded) => a.clone(),
1927        (Bound::Included(va), Bound::Included(vb)) => {
1928            if compare_values(va, vb).is_some_and(|o| o == std::cmp::Ordering::Less) {
1929                a.clone()
1930            } else {
1931                b.clone()
1932            }
1933        }
1934        (Bound::Excluded(va), Bound::Excluded(vb)) => {
1935            if compare_values(va, vb).is_some_and(|o| o == std::cmp::Ordering::Less) {
1936                a.clone()
1937            } else {
1938                b.clone()
1939            }
1940        }
1941        (Bound::Included(va), Bound::Excluded(vb)) => {
1942            if compare_values(va, vb).is_some_and(|o| o == std::cmp::Ordering::Less) {
1943                a.clone()
1944            } else {
1945                b.clone()
1946            }
1947        }
1948        (Bound::Excluded(va), Bound::Included(vb)) => {
1949            if compare_values(va, vb).is_some_and(|o| o == std::cmp::Ordering::Greater) {
1950                b.clone()
1951            } else {
1952                a.clone()
1953            }
1954        }
1955    }
1956}
1957
1958/// Split a boolean expression on top-level AND.
1959fn split_conjuncts(expr: &Expr) -> Vec<Expr> {
1960    match expr {
1961        Expr::BinaryOp {
1962            left,
1963            op: BinOp::And,
1964            right,
1965        } => {
1966            let mut out = split_conjuncts(left);
1967            out.extend(split_conjuncts(right));
1968            out
1969        }
1970        other => vec![other.clone()],
1971    }
1972}
1973
1974/// Look at a predicate of the form `<col-ref> <op> <rhs>` where both sides are
1975/// simple. Return Some((column, shape)) if it's index-eligible, None otherwise.
1976fn classify_index_predicate(
1977    expr: &Expr,
1978    params: &HashMap<String, Value>,
1979) -> Option<(String, IndexPredicateShape)> {
1980    match expr {
1981        Expr::BinaryOp { left, op, right } => {
1982            let col = extract_simple_col_ref(left)?;
1983            // Reject function / arithmetic / column-ref RHS / subquery.
1984            if !is_literal_or_param(right) {
1985                return None;
1986            }
1987            let rhs = resolve_simple_rhs(right, params)?;
1988            let shape = match op {
1989                BinOp::Eq => IndexPredicateShape::Equality(rhs),
1990                BinOp::Neq => IndexPredicateShape::NotEqual(rhs),
1991                BinOp::Lt => IndexPredicateShape::Range {
1992                    lower: std::ops::Bound::Unbounded,
1993                    upper: std::ops::Bound::Excluded(rhs),
1994                },
1995                BinOp::Lte => IndexPredicateShape::Range {
1996                    lower: std::ops::Bound::Unbounded,
1997                    upper: std::ops::Bound::Included(rhs),
1998                },
1999                BinOp::Gt => IndexPredicateShape::Range {
2000                    lower: std::ops::Bound::Excluded(rhs),
2001                    upper: std::ops::Bound::Unbounded,
2002                },
2003                BinOp::Gte => IndexPredicateShape::Range {
2004                    lower: std::ops::Bound::Included(rhs),
2005                    upper: std::ops::Bound::Unbounded,
2006                },
2007                _ => return None,
2008            };
2009            Some((col, shape))
2010        }
2011        Expr::InList {
2012            expr: e,
2013            list,
2014            negated: false,
2015        } => {
2016            let col = extract_simple_col_ref(e)?;
2017            let mut values = Vec::with_capacity(list.len());
2018            for v in list {
2019                if !is_literal_or_param(v) {
2020                    return None;
2021                }
2022                values.push(resolve_simple_rhs(v, params)?);
2023            }
2024            Some((col, IndexPredicateShape::InList(values)))
2025        }
2026        Expr::IsNull { expr: e, negated } => {
2027            let col = extract_simple_col_ref(e)?;
2028            Some((
2029                col,
2030                if *negated {
2031                    IndexPredicateShape::IsNotNull
2032                } else {
2033                    IndexPredicateShape::IsNull
2034                },
2035            ))
2036        }
2037        _ => None,
2038    }
2039}
2040
2041/// Classify why `filter` rejected `column` for IndexScan. Returns a static
2042/// reason string matching the plan's trace-reason vocabulary.
2043fn classify_rejection_reason(filter: &Expr, column: &str) -> &'static str {
2044    // Walk the expression tree and find a predicate mentioning `column`; report
2045    // the first structural reason we detect.
2046    fn walk(expr: &Expr, column: &str) -> Option<&'static str> {
2047        match expr {
2048            Expr::BinaryOp {
2049                left,
2050                op: BinOp::And | BinOp::Or,
2051                right,
2052            } => walk(left, column).or_else(|| walk(right, column)),
2053            Expr::BinaryOp { left, op, right } => {
2054                // Detect arithmetic-on-column specifically (parser lowers
2055                // `a + 1` to FunctionCall { name: "__add", .. }).
2056                if expr_uses_arithmetic_on(left, column) || expr_uses_arithmetic_on(right, column) {
2057                    return Some("arithmetic in predicate");
2058                }
2059                // Generic function call (UPPER(col) etc.)
2060                if expr_uses_function_on(left, column) || expr_uses_function_on(right, column) {
2061                    return Some("function call in predicate");
2062                }
2063                // Column-ref RHS?
2064                if mentions_column_ref(left, column) || mentions_column_ref(right, column) {
2065                    let left_is_col = extract_simple_col_ref(left).as_deref() == Some(column);
2066                    let right_is_col_ref = matches!(right.as_ref(), Expr::Column(_));
2067                    if left_is_col && right_is_col_ref {
2068                        return Some("non-literal rhs");
2069                    }
2070                }
2071                let _ = op;
2072                None
2073            }
2074            Expr::Like { expr: e, .. } => {
2075                if mentions_column_ref(e, column) {
2076                    Some("LIKE is residual-only")
2077                } else {
2078                    None
2079                }
2080            }
2081            Expr::InSubquery { expr: e, .. } => {
2082                if mentions_column_ref(e, column) {
2083                    Some("non-literal rhs")
2084                } else {
2085                    None
2086                }
2087            }
2088            _ => None,
2089        }
2090    }
2091    walk(filter, column).unwrap_or("first column not in WHERE")
2092}
2093
2094fn extract_simple_col_ref(expr: &Expr) -> Option<String> {
2095    match expr {
2096        Expr::Column(r) => Some(r.column.clone()),
2097        _ => None,
2098    }
2099}
2100
2101fn is_literal_or_param(expr: &Expr) -> bool {
2102    match expr {
2103        Expr::Literal(_) | Expr::Parameter(_) => true,
2104        Expr::FunctionCall { name, args } => {
2105            // Arithmetic-of-literals (e.g., `0.0 / 0.0`, `1 + 2`) counts as
2106            // a const RHS for planning purposes; we evaluate at execute time.
2107            matches!(name.as_str(), "__add" | "__sub" | "__mul" | "__div")
2108                && args.iter().all(is_literal_or_param)
2109        }
2110        _ => false,
2111    }
2112}
2113
2114fn resolve_simple_rhs(expr: &Expr, params: &HashMap<String, Value>) -> Option<Value> {
2115    match expr {
2116        Expr::Literal(lit) => Some(match lit {
2117            Literal::Null => Value::Null,
2118            Literal::Bool(b) => Value::Bool(*b),
2119            Literal::Integer(i) => Value::Int64(*i),
2120            Literal::Real(f) => Value::Float64(*f),
2121            Literal::Text(s) => Value::Text(s.clone()),
2122            Literal::Vector(_) => return None,
2123        }),
2124        Expr::Parameter(name) => params.get(name).cloned(),
2125        Expr::FunctionCall { name, args }
2126            if matches!(name.as_str(), "__add" | "__sub" | "__mul" | "__div") =>
2127        {
2128            if args.len() != 2 {
2129                return None;
2130            }
2131            let a = resolve_simple_rhs(&args[0], params)?;
2132            let b = resolve_simple_rhs(&args[1], params)?;
2133            match (a, b, name.as_str()) {
2134                (Value::Int64(x), Value::Int64(y), "__add") => {
2135                    Some(Value::Int64(x.wrapping_add(y)))
2136                }
2137                (Value::Int64(x), Value::Int64(y), "__sub") => {
2138                    Some(Value::Int64(x.wrapping_sub(y)))
2139                }
2140                (Value::Int64(x), Value::Int64(y), "__mul") => {
2141                    Some(Value::Int64(x.wrapping_mul(y)))
2142                }
2143                (Value::Int64(x), Value::Int64(y), "__div") if y != 0 => Some(Value::Int64(x / y)),
2144                (Value::Float64(x), Value::Float64(y), "__add") => Some(Value::Float64(x + y)),
2145                (Value::Float64(x), Value::Float64(y), "__sub") => Some(Value::Float64(x - y)),
2146                (Value::Float64(x), Value::Float64(y), "__mul") => Some(Value::Float64(x * y)),
2147                (Value::Float64(x), Value::Float64(y), "__div") => Some(Value::Float64(x / y)),
2148                _ => None,
2149            }
2150        }
2151        _ => None,
2152    }
2153}
2154
2155fn expr_uses_function_on(expr: &Expr, column: &str) -> bool {
2156    match expr {
2157        Expr::FunctionCall { name, args } => {
2158            // Skip known arithmetic-lowering function-call names; those are
2159            // classified separately as "arithmetic in predicate".
2160            if matches!(name.as_str(), "__add" | "__sub" | "__mul" | "__div") {
2161                return false;
2162            }
2163            args.iter().any(|a| mentions_column_ref(a, column))
2164        }
2165        Expr::BinaryOp { left, right, .. } => {
2166            expr_uses_function_on(left, column) || expr_uses_function_on(right, column)
2167        }
2168        _ => false,
2169    }
2170}
2171
2172fn expr_uses_arithmetic_on(expr: &Expr, column: &str) -> bool {
2173    // The parser lowers `a + 1`, `a - 1`, etc. into FunctionCall with reserved
2174    // names `__add` / `__sub` / `__mul` / `__div`. We detect that shape here.
2175    match expr {
2176        Expr::FunctionCall { name, args } => {
2177            matches!(name.as_str(), "__add" | "__sub" | "__mul" | "__div")
2178                && args.iter().any(|a| mentions_column_ref(a, column))
2179        }
2180        Expr::BinaryOp { left, right, .. } => {
2181            expr_uses_arithmetic_on(left, column) || expr_uses_arithmetic_on(right, column)
2182        }
2183        _ => false,
2184    }
2185}
2186
2187fn mentions_column_ref(expr: &Expr, column: &str) -> bool {
2188    match expr {
2189        Expr::Column(r) => r.column == column,
2190        Expr::FunctionCall { args, .. } => args.iter().any(|a| mentions_column_ref(a, column)),
2191        Expr::BinaryOp { left, right, .. } => {
2192            mentions_column_ref(left, column) || mentions_column_ref(right, column)
2193        }
2194        Expr::UnaryOp { operand, .. } => mentions_column_ref(operand, column),
2195        Expr::IsNull { expr: e, .. } => mentions_column_ref(e, column),
2196        Expr::Like { expr: e, .. } => mentions_column_ref(e, column),
2197        Expr::InList { expr: e, .. } => mentions_column_ref(e, column),
2198        Expr::InSubquery { expr: e, .. } => mentions_column_ref(e, column),
2199        _ => false,
2200    }
2201}
2202
2203/// Walk the index's B-tree per the picked shape, fetch matching rows by
2204/// row_id, apply residual filter, return VersionedRow list.
2205#[allow(clippy::too_many_arguments)]
2206fn execute_index_scan(
2207    db: &Database,
2208    table: &str,
2209    pick: &IndexPick,
2210    snapshot: contextdb_core::SnapshotId,
2211    tx: Option<TxId>,
2212) -> Result<(Vec<VersionedRow>, u64)> {
2213    use contextdb_core::{DirectedValue, SortDirection, TotalOrdAsc, TotalOrdDesc};
2214    use std::ops::Bound;
2215
2216    // NaN equality short-circuit (I19): `col = NaN` or bound param NaN → empty.
2217    if let IndexPredicateShape::Equality(rhs) = &pick.shape
2218        && let Value::Float64(f) = rhs
2219        && f.is_nan()
2220    {
2221        return Ok((Vec::new(), 0));
2222    }
2223    // NULL equality short-circuit: `col = $p` with $p = NULL → empty (NULL
2224    // comparisons are UNKNOWN in SQL).
2225    if let IndexPredicateShape::Equality(Value::Null) = &pick.shape {
2226        return Ok((Vec::new(), 0));
2227    }
2228
2229    // Coerce pick.shape's literal values to the pushed column's declared type.
2230    // A SELECT WHERE uuid_col = 'uuid-string' arrives here with Text(..) even
2231    // though the indexed column stores Uuid(..). B-tree walks use variant-exact
2232    // comparisons (value_total_cmp panics on mismatched variants by design),
2233    // so we must match the stored key-type before walking. Coercion failure
2234    // (e.g. Text that is not a valid UUID) is treated as zero rows matched —
2235    // same semantics a full-scan predicate would produce.
2236    let pick = match coerce_pick_shape_to_column_type(db, table, pick) {
2237        Ok(coerced) => coerced,
2238        Err(_) => return Ok((Vec::new(), 0)),
2239    };
2240    let pick = &pick;
2241
2242    let indexes = db.relational_store().indexes.read();
2243    let storage = match indexes.get(&(table.to_string(), pick.name.clone())) {
2244        Some(s) => s,
2245        None => return Ok((Vec::new(), 0)),
2246    };
2247
2248    // Build bound keys as single-column IndexKey prefixes. Composite indexes:
2249    // we walk the entire range for the first-col match and rely on residual
2250    // filter for subsequent columns.
2251    let first_dir = pick
2252        .columns
2253        .first()
2254        .map(|(_, d)| *d)
2255        .unwrap_or(SortDirection::Asc);
2256
2257    let wrap = |v: Value| -> DirectedValue {
2258        match first_dir {
2259            SortDirection::Asc => DirectedValue::Asc(TotalOrdAsc(v)),
2260            SortDirection::Desc => DirectedValue::Desc(TotalOrdDesc(v)),
2261        }
2262    };
2263
2264    // Collect matching postings then filter by MVCC visibility.
2265    let mut postings: Vec<contextdb_relational::IndexEntry> = Vec::new();
2266    let mut rows_examined: u64 = 0;
2267
2268    let collect_range = |postings: &mut Vec<contextdb_relational::IndexEntry>,
2269                         examined: &mut u64,
2270                         lower: Bound<Vec<DirectedValue>>,
2271                         upper: Bound<Vec<DirectedValue>>| {
2272        for (_k, entries) in storage.tree.range((lower, upper)) {
2273            for e in entries {
2274                *examined += 1;
2275                if e.visible_at(snapshot) {
2276                    postings.push(e.clone());
2277                }
2278            }
2279        }
2280    };
2281
2282    // For composite indexes, a first-column equality must walk ALL keys
2283    // whose first component equals the target (i.e. the prefix range). We
2284    // iterate the whole tree and filter by first-component match to cover
2285    // both single-column and composite shapes uniformly.
2286    let is_composite = pick.columns.len() > 1;
2287
2288    match &pick.shape {
2289        IndexPredicateShape::Equality(v) => {
2290            if is_composite {
2291                // Walk every posting whose first component equals `v`.
2292                let want = wrap(v.clone());
2293                for (key, entries) in storage.tree.iter() {
2294                    if key.first() != Some(&want) {
2295                        continue;
2296                    }
2297                    for e in entries {
2298                        rows_examined += 1;
2299                        if e.visible_at(snapshot) {
2300                            postings.push(e.clone());
2301                        }
2302                    }
2303                }
2304            } else {
2305                let lower = vec![wrap(v.clone())];
2306                let upper = lower.clone();
2307                collect_range(
2308                    &mut postings,
2309                    &mut rows_examined,
2310                    Bound::Included(lower),
2311                    Bound::Included(upper),
2312                );
2313            }
2314        }
2315        IndexPredicateShape::InList(vs) => {
2316            for v in vs {
2317                if is_composite {
2318                    let want = wrap(v.clone());
2319                    for (key, entries) in storage.tree.iter() {
2320                        if key.first() != Some(&want) {
2321                            continue;
2322                        }
2323                        for e in entries {
2324                            rows_examined += 1;
2325                            if e.visible_at(snapshot) {
2326                                postings.push(e.clone());
2327                            }
2328                        }
2329                    }
2330                } else {
2331                    let k = vec![wrap(v.clone())];
2332                    collect_range(
2333                        &mut postings,
2334                        &mut rows_examined,
2335                        Bound::Included(k.clone()),
2336                        Bound::Included(k),
2337                    );
2338                }
2339            }
2340        }
2341        IndexPredicateShape::Range { lower, upper } => {
2342            if is_composite {
2343                // Composite + range on first column: walk entries whose first
2344                // component falls in the range.
2345                for (key, entries) in storage.tree.iter() {
2346                    let Some(first) = key.first() else { continue };
2347                    let in_lower = match lower {
2348                        Bound::Unbounded => true,
2349                        Bound::Included(v) => first >= &wrap(v.clone()),
2350                        Bound::Excluded(v) => first > &wrap(v.clone()),
2351                    };
2352                    let in_upper = match upper {
2353                        Bound::Unbounded => true,
2354                        Bound::Included(v) => first <= &wrap(v.clone()),
2355                        Bound::Excluded(v) => first < &wrap(v.clone()),
2356                    };
2357                    if !(in_lower && in_upper) {
2358                        continue;
2359                    }
2360                    for e in entries {
2361                        rows_examined += 1;
2362                        if e.visible_at(snapshot) {
2363                            postings.push(e.clone());
2364                        }
2365                    }
2366                }
2367            } else {
2368                let l = match lower {
2369                    Bound::Included(v) => Bound::Included(vec![wrap(v.clone())]),
2370                    Bound::Excluded(v) => Bound::Excluded(vec![wrap(v.clone())]),
2371                    Bound::Unbounded => Bound::Unbounded,
2372                };
2373                let u = match upper {
2374                    Bound::Included(v) => Bound::Included(vec![wrap(v.clone())]),
2375                    Bound::Excluded(v) => Bound::Excluded(vec![wrap(v.clone())]),
2376                    Bound::Unbounded => Bound::Unbounded,
2377                };
2378                collect_range(&mut postings, &mut rows_examined, l, u);
2379            }
2380        }
2381        IndexPredicateShape::NotEqual(v) => {
2382            // Full walk; skip exact key. For IndexScan-trace we still attribute
2383            // all postings touched to __rows_examined (trace counts postings).
2384            let except_key = vec![wrap(v.clone())];
2385            for (k, entries) in storage.tree.iter() {
2386                if *k == except_key {
2387                    continue;
2388                }
2389                for e in entries {
2390                    rows_examined += 1;
2391                    if e.visible_at(snapshot) {
2392                        postings.push(e.clone());
2393                    }
2394                }
2395            }
2396        }
2397        IndexPredicateShape::IsNull => {
2398            let k = vec![wrap(Value::Null)];
2399            collect_range(
2400                &mut postings,
2401                &mut rows_examined,
2402                Bound::Included(k.clone()),
2403                Bound::Included(k),
2404            );
2405        }
2406        IndexPredicateShape::IsNotNull => {
2407            // Everything except NULL partition.
2408            let null_key = vec![wrap(Value::Null)];
2409            for (k, entries) in storage.tree.iter() {
2410                if *k == null_key {
2411                    continue;
2412                }
2413                for e in entries {
2414                    rows_examined += 1;
2415                    if e.visible_at(snapshot) {
2416                        postings.push(e.clone());
2417                    }
2418                }
2419            }
2420        }
2421    }
2422
2423    // Now fetch base rows by row_id while preserving index-order. The index
2424    // already enumerates postings in index sort order; rows[] preserve it.
2425    drop(indexes);
2426    let row_ids: Vec<RowId> = postings.iter().map(|p| p.row_id).collect();
2427    if row_ids.is_empty() {
2428        return Ok((Vec::new(), rows_examined));
2429    }
2430    let tables = db.relational_store().tables.read();
2431    let rows_slice = tables.get(table);
2432    let mut out: Vec<VersionedRow> = Vec::with_capacity(row_ids.len());
2433    if let Some(rows) = rows_slice {
2434        let by_id: HashMap<RowId, &VersionedRow> = rows.iter().map(|r| (r.row_id, r)).collect();
2435        for rid in &row_ids {
2436            if let Some(r) = by_id.get(rid) {
2437                // Layered-visibility check: apply same rule as scan_with_tx so
2438                // deletes/inserts from the active tx are honored.
2439                if (*r).visible_at(snapshot) {
2440                    out.push((**r).clone());
2441                }
2442            }
2443        }
2444    }
2445    // Layer tx-scoped inserts / deletes on top, matching the semantics of
2446    // scan_with_tx.
2447    drop(tables);
2448    if let Some(tx_id) = tx {
2449        let overlay = db.index_scan_tx_overlay(tx_id, table, &pick.pushed_column, &pick.shape)?;
2450        let deleted_row_ids = overlay.deleted_row_ids;
2451        out.retain(|row| !deleted_row_ids.contains(&row.row_id));
2452        out.extend(overlay.matching_inserts);
2453    }
2454    Ok((out, rows_examined))
2455}
2456
2457pub(crate) fn range_includes(
2458    v: &Value,
2459    lower: &std::ops::Bound<Value>,
2460    upper: &std::ops::Bound<Value>,
2461) -> bool {
2462    use std::ops::Bound;
2463    let ok_lower = match lower {
2464        Bound::Unbounded => true,
2465        Bound::Included(b) => compare_values(v, b).is_some_and(|o| o != std::cmp::Ordering::Less),
2466        Bound::Excluded(b) => {
2467            compare_values(v, b).is_some_and(|o| o == std::cmp::Ordering::Greater)
2468        }
2469    };
2470    let ok_upper = match upper {
2471        Bound::Unbounded => true,
2472        Bound::Included(b) => {
2473            compare_values(v, b).is_some_and(|o| o != std::cmp::Ordering::Greater)
2474        }
2475        Bound::Excluded(b) => compare_values(v, b).is_some_and(|o| o == std::cmp::Ordering::Less),
2476    };
2477    ok_lower && ok_upper
2478}
2479
2480/// Try to elide the `Sort` node when the child's Scan can be rewritten as an
2481/// IndexScan whose ordering matches `keys`. The common case with no WHERE
2482/// filter uses a full-range index walk. If the Scan has a WHERE filter that
2483/// does NOT match this specific index's first column, we refuse to elide
2484/// (the Scan arm will still pick the best-matching index for the filter and
2485/// the Sort arm's path-B check handles the elision).
2486fn try_elide_sort(
2487    db: &Database,
2488    input: &PhysicalPlan,
2489    keys: &[contextdb_planner::SortKey],
2490    params: &HashMap<String, Value>,
2491    tx: Option<TxId>,
2492) -> Result<Option<QueryResult>> {
2493    fn find_scan(plan: &PhysicalPlan) -> Option<(&String, &Option<String>, &Option<Expr>)> {
2494        match plan {
2495            PhysicalPlan::Scan {
2496                table,
2497                alias,
2498                filter,
2499            } => Some((table, alias, filter)),
2500            PhysicalPlan::Project { input, .. }
2501            | PhysicalPlan::Filter { input, .. }
2502            | PhysicalPlan::Distinct { input }
2503            | PhysicalPlan::Limit { input, .. } => find_scan(input),
2504            _ => None,
2505        }
2506    }
2507    let Some((table, _alias, filter)) = find_scan(input) else {
2508        return Ok(None);
2509    };
2510    // If the underlying Scan has a WHERE, route through the Scan executor
2511    // path so it gets the narrow range / correct rows_examined accounting.
2512    // Path B on the Sort arm will detect the IndexScan trace + matching
2513    // prefix and flip `sort_elided` on the result.
2514    if filter.is_some() {
2515        return Ok(None);
2516    }
2517    // Keys must all be simple column references.
2518    let key_cols: Option<Vec<(&str, &contextdb_parser::ast::SortDirection)>> = keys
2519        .iter()
2520        .map(|k| match &k.expr {
2521            Expr::Column(r) => Some((r.column.as_str(), &k.direction)),
2522            _ => None,
2523        })
2524        .collect();
2525    let Some(key_cols) = key_cols else {
2526        return Ok(None);
2527    };
2528    let meta = match db.table_meta(table) {
2529        Some(m) => m,
2530        None => return Ok(None),
2531    };
2532    let matching_index = meta.indexes.iter().find(|decl| {
2533        if decl.columns.len() < key_cols.len() {
2534            return false;
2535        }
2536        decl.columns
2537            .iter()
2538            .zip(key_cols.iter())
2539            .all(|((col, dir), (kcol, kdir))| col == kcol && core_dir_matches_ast(*dir, **kdir))
2540    });
2541    let Some(matching) = matching_index else {
2542        return Ok(None);
2543    };
2544    run_index_scan_with_order(db, table, matching, filter.as_ref(), params, tx)
2545}
2546
2547/// Execute an IndexScan over `table` with `index`, applying the optional
2548/// residual filter. Constructs predicates_pushed / indexes_considered the
2549/// same way the Scan arm does.
2550fn run_index_scan_with_order(
2551    db: &Database,
2552    table: &str,
2553    decl: &contextdb_core::IndexDecl,
2554    filter: Option<&Expr>,
2555    params: &HashMap<String, Value>,
2556    tx: Option<TxId>,
2557) -> Result<Option<QueryResult>> {
2558    use std::borrow::Cow;
2559    let snapshot = db.snapshot_for_read();
2560    let schema_columns = db.table_meta(table).map(|meta| {
2561        meta.columns
2562            .into_iter()
2563            .map(|column| column.name)
2564            .collect::<Vec<_>>()
2565    });
2566    let resolved_filter = filter
2567        .map(|expr| resolve_in_subqueries(db, expr, params, tx))
2568        .transpose()?;
2569
2570    // Pick: full-range walk for ORDER BY elision. Shape is "unbounded range"
2571    // so we walk every posting. Residual filter applies.
2572    let pick = IndexPick {
2573        name: decl.name.clone(),
2574        columns: decl.columns.clone(),
2575        shape: IndexPredicateShape::Range {
2576            lower: std::ops::Bound::Unbounded,
2577            upper: std::ops::Bound::Unbounded,
2578        },
2579        pushed_column: decl.columns[0].0.clone(),
2580    };
2581    let (rows, examined) = execute_index_scan(db, table, &pick, snapshot, tx)?;
2582    db.__bump_rows_examined(examined);
2583    let mut result = materialize_rows(
2584        rows,
2585        resolved_filter.as_ref(),
2586        params,
2587        schema_columns.as_deref(),
2588    )?;
2589    let mut pushed: smallvec::SmallVec<[Cow<'static, str>; 4]> = smallvec::SmallVec::new();
2590    pushed.push(Cow::Owned(decl.columns[0].0.clone()));
2591    result.trace = crate::database::QueryTrace {
2592        physical_plan: "IndexScan",
2593        index_used: Some(decl.name.clone()),
2594        predicates_pushed: pushed,
2595        indexes_considered: Default::default(),
2596        sort_elided: true,
2597    };
2598    Ok(Some(result))
2599}
2600
2601fn sort_keys_match_index_prefix(
2602    db: &Database,
2603    input: &PhysicalPlan,
2604    index_name: &str,
2605    keys: &[contextdb_planner::SortKey],
2606) -> bool {
2607    fn find_scan_and_filter(plan: &PhysicalPlan) -> Option<(&String, &Option<Expr>)> {
2608        match plan {
2609            PhysicalPlan::Scan { table, filter, .. } => Some((table, filter)),
2610            PhysicalPlan::Project { input, .. }
2611            | PhysicalPlan::Filter { input, .. }
2612            | PhysicalPlan::Distinct { input }
2613            | PhysicalPlan::Limit { input, .. } => find_scan_and_filter(input),
2614            _ => None,
2615        }
2616    }
2617    let (table, filter) = match find_scan_and_filter(input) {
2618        Some(t) => t,
2619        None => return false,
2620    };
2621    let meta = match db.table_meta(table) {
2622        Some(m) => m,
2623        None => return false,
2624    };
2625    let decl = meta.indexes.iter().find(|i| i.name == index_name);
2626    let Some(decl) = decl else {
2627        return false;
2628    };
2629    // Shape guard: IndexScan with InList or NotEqual shape on the leading
2630    // indexed column walks fragmented posting-list ranges, so rows are
2631    // emitted per-value, not globally sorted. Refuse sort elision for those
2632    // shapes — the Sort node must run.
2633    if let Some(filter_expr) = filter.as_ref()
2634        && let Some(leading_col) = decl.columns.first().map(|(c, _)| c.as_str())
2635    {
2636        let conjuncts = split_conjuncts(filter_expr);
2637        let empty_params = HashMap::new();
2638        for conjunct in &conjuncts {
2639            if let Some((col, shape)) = classify_index_predicate(conjunct, &empty_params)
2640                && col == leading_col
2641                && matches!(
2642                    shape,
2643                    IndexPredicateShape::InList(_) | IndexPredicateShape::NotEqual(_)
2644                )
2645            {
2646                return false;
2647            }
2648        }
2649    }
2650    // Determine how many leading index columns the WHERE filter pins to a
2651    // single equality. Those columns are effectively "used up" by the
2652    // IndexScan's range; subsequent ORDER BY keys matching the remaining
2653    // index columns still elide the Sort.
2654    let pinned_prefix_len = count_equality_prefix(filter.as_ref(), &decl.columns);
2655    let remaining_index_cols = &decl.columns[pinned_prefix_len..];
2656    if remaining_index_cols.len() < keys.len() {
2657        return false;
2658    }
2659    remaining_index_cols
2660        .iter()
2661        .zip(keys.iter())
2662        .all(|((col, dir), k)| match &k.expr {
2663            Expr::Column(r) => r.column == *col && core_dir_matches_ast(*dir, k.direction),
2664            _ => false,
2665        })
2666}
2667
2668fn count_equality_prefix(
2669    filter: Option<&Expr>,
2670    columns: &[(String, contextdb_core::SortDirection)],
2671) -> usize {
2672    let Some(filter) = filter else {
2673        return 0;
2674    };
2675    let conjuncts = split_conjuncts(filter);
2676    let mut pinned = 0usize;
2677    for (col, _) in columns {
2678        let has_eq = conjuncts.iter().any(|c| match c {
2679            Expr::BinaryOp {
2680                left,
2681                op: BinOp::Eq,
2682                right,
2683            } => {
2684                let left_is_col = matches!(left.as_ref(), Expr::Column(r) if r.column == *col);
2685                let right_is_simple =
2686                    matches!(right.as_ref(), Expr::Literal(_) | Expr::Parameter(_));
2687                left_is_col && right_is_simple
2688            }
2689            _ => false,
2690        });
2691        if has_eq {
2692            pinned += 1;
2693        } else {
2694            break;
2695        }
2696    }
2697    pinned
2698}
2699
2700fn core_dir_matches_ast(
2701    core: contextdb_core::SortDirection,
2702    ast: contextdb_parser::ast::SortDirection,
2703) -> bool {
2704    matches!(
2705        (core, ast),
2706        (
2707            contextdb_core::SortDirection::Asc,
2708            contextdb_parser::ast::SortDirection::Asc
2709        ) | (
2710            contextdb_core::SortDirection::Desc,
2711            contextdb_parser::ast::SortDirection::Desc
2712        )
2713    )
2714}
2715
2716// ========================= End of index scan planning =========================
2717
2718fn validate_update_state_transition(
2719    db: &Database,
2720    table: &str,
2721    existing: &VersionedRow,
2722    next_values: &HashMap<String, Value>,
2723) -> Result<()> {
2724    let Some(meta) = db.table_meta(table) else {
2725        return Ok(());
2726    };
2727    let Some(state_machine) = meta.state_machine else {
2728        return Ok(());
2729    };
2730
2731    let old_state = existing
2732        .values
2733        .get(&state_machine.column)
2734        .and_then(Value::as_text);
2735    let new_state = next_values
2736        .get(&state_machine.column)
2737        .and_then(Value::as_text);
2738
2739    let (Some(old_state), Some(new_state)) = (old_state, new_state) else {
2740        return Ok(());
2741    };
2742
2743    if old_state == new_state
2744        || db.relational_store().validate_state_transition(
2745            table,
2746            &state_machine.column,
2747            old_state,
2748            new_state,
2749        )
2750    {
2751        return Ok(());
2752    }
2753
2754    Err(Error::InvalidStateTransition(format!(
2755        "{old_state} -> {new_state}"
2756    )))
2757}
2758
2759fn estimate_row_bytes_for_meta(
2760    values: &HashMap<String, Value>,
2761    meta: &TableMeta,
2762    include_vectors: bool,
2763) -> usize {
2764    let mut bytes = 96usize;
2765    for column in &meta.columns {
2766        let Some(value) = values.get(&column.name) else {
2767            continue;
2768        };
2769        if !include_vectors && matches!(column.column_type, ColumnType::Vector(_)) {
2770            continue;
2771        }
2772        bytes = bytes.saturating_add(32 + column.name.len() * 8 + value.estimated_bytes());
2773    }
2774    bytes
2775}
2776
2777fn estimate_vector_search_bytes(dimension: usize, k: usize) -> usize {
2778    k.saturating_mul(3)
2779        .saturating_mul(dimension)
2780        .saturating_mul(std::mem::size_of::<f32>())
2781}
2782
2783fn estimate_bfs_working_bytes<T>(
2784    frontier: &[T],
2785    steps: &[contextdb_planner::GraphStepPlan],
2786) -> usize {
2787    let max_hops = steps.iter().fold(0usize, |acc, step| {
2788        acc.saturating_add(step.max_depth as usize)
2789    });
2790    frontier
2791        .len()
2792        .saturating_mul(2048)
2793        .saturating_mul(max_hops.max(1))
2794}
2795
2796fn dedupe_graph_frontier(
2797    frontier: Vec<(HashMap<String, uuid::Uuid>, uuid::Uuid, u32)>,
2798    steps: &[contextdb_planner::GraphStepPlan],
2799) -> Vec<(HashMap<String, uuid::Uuid>, uuid::Uuid, u32)> {
2800    let mut best =
2801        HashMap::<Vec<uuid::Uuid>, (HashMap<String, uuid::Uuid>, uuid::Uuid, u32)>::new();
2802
2803    for (bindings, current_id, depth) in frontier {
2804        let mut key = Vec::with_capacity(steps.len());
2805        for step in steps {
2806            if let Some(id) = bindings.get(&step.target_alias) {
2807                key.push(*id);
2808            }
2809        }
2810
2811        best.entry(key)
2812            .and_modify(|existing| {
2813                if depth < existing.2 {
2814                    *existing = (bindings.clone(), current_id, depth);
2815                }
2816            })
2817            .or_insert((bindings, current_id, depth));
2818    }
2819
2820    best.into_values().collect()
2821}
2822
2823fn estimate_drop_table_bytes(db: &Database, table: &str) -> usize {
2824    let meta = db.table_meta(table);
2825    let metadata_bytes = meta.as_ref().map(TableMeta::estimated_bytes).unwrap_or(0);
2826    let snapshot = db.snapshot();
2827    let rows = db.scan(table, snapshot).unwrap_or_default();
2828    let row_bytes = rows.iter().fold(0usize, |acc, row| {
2829        acc.saturating_add(meta.as_ref().map_or_else(
2830            || row.estimated_bytes(),
2831            |meta| estimate_row_bytes_for_meta(&row.values, meta, false),
2832        ))
2833    });
2834    let vector_bytes = rows
2835        .iter()
2836        .filter_map(|row| db.live_vector_entry(row.row_id, snapshot))
2837        .fold(0usize, |acc, entry| {
2838            acc.saturating_add(entry.estimated_bytes())
2839        });
2840    let edge_bytes = rows.iter().fold(0usize, |acc, row| {
2841        match (
2842            row.values.get("source_id").and_then(Value::as_uuid),
2843            row.values.get("target_id").and_then(Value::as_uuid),
2844            row.values.get("edge_type").and_then(Value::as_text),
2845        ) {
2846            (Some(_), Some(_), Some(edge_type)) => acc.saturating_add(
2847                96 + edge_type.len().saturating_mul(16) + estimate_row_value_bytes(&HashMap::new()),
2848            ),
2849            _ => acc,
2850        }
2851    });
2852    metadata_bytes
2853        .saturating_add(row_bytes)
2854        .saturating_add(vector_bytes)
2855        .saturating_add(edge_bytes)
2856}
2857
2858fn materialize_rows(
2859    rows: Vec<VersionedRow>,
2860    filter: Option<&Expr>,
2861    params: &HashMap<String, Value>,
2862    schema_columns: Option<&[String]>,
2863) -> Result<QueryResult> {
2864    let filtered: Vec<VersionedRow> = rows
2865        .into_iter()
2866        .filter(|r| filter.is_none_or(|f| row_matches(r, f, params).unwrap_or(false)))
2867        .collect();
2868
2869    let keys = if let Some(schema_columns) = schema_columns {
2870        schema_columns.to_vec()
2871    } else {
2872        let mut keys = BTreeSet::new();
2873        for r in &filtered {
2874            for k in r.values.keys() {
2875                keys.insert(k.clone());
2876            }
2877        }
2878        keys.into_iter().collect::<Vec<_>>()
2879    };
2880
2881    let mut columns = vec!["row_id".to_string()];
2882    columns.extend(keys.iter().cloned());
2883
2884    let rows = filtered
2885        .into_iter()
2886        .map(|r| {
2887            let mut out = vec![Value::Int64(r.row_id.0 as i64)];
2888            for k in &keys {
2889                out.push(r.values.get(k).cloned().unwrap_or(Value::Null));
2890            }
2891            out
2892        })
2893        .collect();
2894
2895    Ok(QueryResult {
2896        columns,
2897        rows,
2898        rows_affected: 0,
2899        trace: crate::database::QueryTrace::scan(),
2900        cascade: None,
2901    })
2902}
2903
2904fn row_matches(row: &VersionedRow, expr: &Expr, params: &HashMap<String, Value>) -> Result<bool> {
2905    Ok(eval_bool_expr(row, expr, params)?.unwrap_or(false))
2906}
2907
2908fn eval_expr_value(
2909    row: &VersionedRow,
2910    expr: &Expr,
2911    params: &HashMap<String, Value>,
2912) -> Result<Value> {
2913    match expr {
2914        Expr::Column(c) => {
2915            if c.column == "row_id" {
2916                Ok(Value::Int64(row.row_id.0 as i64))
2917            } else {
2918                Ok(row.values.get(&c.column).cloned().unwrap_or(Value::Null))
2919            }
2920        }
2921        Expr::BinaryOp { left, op, right } => {
2922            let left = eval_expr_value(row, left, params)?;
2923            let right = eval_expr_value(row, right, params)?;
2924            eval_binary_op(op, &left, &right)
2925        }
2926        Expr::UnaryOp { op, operand } => {
2927            let value = eval_expr_value(row, operand, params)?;
2928            match op {
2929                UnaryOp::Not => Ok(Value::Bool(!value_to_bool(&value))),
2930                UnaryOp::Neg => match value {
2931                    Value::Int64(v) => Ok(Value::Int64(-v)),
2932                    Value::Float64(v) => Ok(Value::Float64(-v)),
2933                    _ => Err(Error::PlanError(
2934                        "cannot negate non-numeric value".to_string(),
2935                    )),
2936                },
2937            }
2938        }
2939        Expr::FunctionCall { name, args } => eval_function_in_row_context(row, name, args, params),
2940        Expr::IsNull { expr, negated } => {
2941            let is_null = eval_expr_value(row, expr, params)? == Value::Null;
2942            Ok(Value::Bool(if *negated { !is_null } else { is_null }))
2943        }
2944        Expr::InList {
2945            expr,
2946            list,
2947            negated,
2948        } => {
2949            let needle = eval_expr_value(row, expr, params)?;
2950            let matched = list.iter().try_fold(false, |found, item| {
2951                if found {
2952                    Ok(true)
2953                } else {
2954                    let candidate = eval_expr_value(row, item, params)?;
2955                    Ok(
2956                        matches!(compare_values(&needle, &candidate), Some(Ordering::Equal))
2957                            || (needle != Value::Null
2958                                && candidate != Value::Null
2959                                && needle == candidate),
2960                    )
2961                }
2962            })?;
2963            Ok(Value::Bool(if *negated { !matched } else { matched }))
2964        }
2965        Expr::Like {
2966            expr,
2967            pattern,
2968            negated,
2969        } => {
2970            let matches = match (
2971                eval_expr_value(row, expr, params)?,
2972                eval_expr_value(row, pattern, params)?,
2973            ) {
2974                (Value::Text(value), Value::Text(pattern)) => like_matches(&value, &pattern),
2975                _ => false,
2976            };
2977            Ok(Value::Bool(if *negated { !matches } else { matches }))
2978        }
2979        _ => resolve_expr(expr, params),
2980    }
2981}
2982
2983pub fn resolve_expr(expr: &Expr, params: &HashMap<String, Value>) -> Result<Value> {
2984    match expr {
2985        Expr::Literal(l) => Ok(match l {
2986            Literal::Null => Value::Null,
2987            Literal::Bool(v) => Value::Bool(*v),
2988            Literal::Integer(v) => Value::Int64(*v),
2989            Literal::Real(v) => Value::Float64(*v),
2990            Literal::Text(v) => Value::Text(v.clone()),
2991            Literal::Vector(v) => Value::Vector(v.clone()),
2992        }),
2993        Expr::Parameter(p) => params
2994            .get(p)
2995            .cloned()
2996            .ok_or_else(|| Error::NotFound(format!("missing parameter: {}", p))),
2997        Expr::Column(c) => Ok(Value::Text(c.column.clone())),
2998        Expr::UnaryOp { op, operand } => match op {
2999            UnaryOp::Neg => match resolve_expr(operand, params)? {
3000                Value::Int64(v) => Ok(Value::Int64(-v)),
3001                Value::Float64(v) => Ok(Value::Float64(-v)),
3002                _ => Err(Error::PlanError(
3003                    "cannot negate non-numeric value".to_string(),
3004                )),
3005            },
3006            UnaryOp::Not => Err(Error::PlanError(
3007                "boolean NOT requires row context".to_string(),
3008            )),
3009        },
3010        Expr::FunctionCall { name, args } => {
3011            let values = args
3012                .iter()
3013                .map(|arg| resolve_expr(arg, params))
3014                .collect::<Result<Vec<_>>>()?;
3015            eval_function(name, &values)
3016        }
3017        Expr::CosineDistance { right, .. } => resolve_expr(right, params),
3018        _ => Err(Error::PlanError("unsupported expression".to_string())),
3019    }
3020}
3021
3022fn compare_values(a: &Value, b: &Value) -> Option<Ordering> {
3023    match (a, b) {
3024        (Value::Int64(left), Value::Int64(right)) => Some(left.cmp(right)),
3025        (Value::Float64(left), Value::Float64(right)) => Some(left.total_cmp(right)),
3026        (Value::Text(left), Value::Text(right)) => Some(left.cmp(right)),
3027        (Value::Timestamp(left), Value::Timestamp(right)) => Some(left.cmp(right)),
3028        (Value::Int64(left), Value::Float64(right)) => Some((*left as f64).total_cmp(right)),
3029        (Value::Float64(left), Value::Int64(right)) => Some(left.total_cmp(&(*right as f64))),
3030        (Value::Timestamp(left), Value::Int64(right)) => Some(left.cmp(right)),
3031        (Value::Int64(left), Value::Timestamp(right)) => Some(left.cmp(right)),
3032        (Value::Bool(left), Value::Bool(right)) => Some(left.cmp(right)),
3033        (Value::Uuid(left), Value::Uuid(right)) => Some(left.cmp(right)),
3034        (Value::Uuid(u), Value::Text(t)) => {
3035            if let Ok(parsed) = t.parse::<uuid::Uuid>() {
3036                Some(u.cmp(&parsed))
3037            } else {
3038                None
3039            }
3040        }
3041        (Value::Text(t), Value::Uuid(u)) => {
3042            if let Ok(parsed) = t.parse::<uuid::Uuid>() {
3043                Some(parsed.cmp(u))
3044            } else {
3045                None
3046            }
3047        }
3048        (Value::TxId(a), Value::TxId(b)) => Some(a.0.cmp(&b.0)),
3049        (Value::TxId(a), Value::Int64(b)) => {
3050            if *b < 0 {
3051                Some(Ordering::Greater)
3052            } else {
3053                Some(a.0.cmp(&(*b as u64)))
3054            }
3055        }
3056        (Value::Int64(a), Value::TxId(b)) => {
3057            if *a < 0 {
3058                Some(Ordering::Less)
3059            } else {
3060                Some((*a as u64).cmp(&b.0))
3061            }
3062        }
3063        (Value::TxId(_), Value::Timestamp(_)) | (Value::Timestamp(_), Value::TxId(_)) => None,
3064        (Value::Null, _) | (_, Value::Null) => None,
3065        _ => None,
3066    }
3067}
3068
3069fn eval_bool_expr(
3070    row: &VersionedRow,
3071    expr: &Expr,
3072    params: &HashMap<String, Value>,
3073) -> Result<Option<bool>> {
3074    match expr {
3075        Expr::BinaryOp { left, op, right } => match op {
3076            BinOp::Eq | BinOp::Neq | BinOp::Lt | BinOp::Lte | BinOp::Gt | BinOp::Gte => {
3077                let left = eval_expr_value(row, left, params)?;
3078                let right = eval_expr_value(row, right, params)?;
3079                if left == Value::Null || right == Value::Null {
3080                    return Ok(None);
3081                }
3082
3083                let result = match op {
3084                    BinOp::Eq => {
3085                        compare_values(&left, &right) == Some(Ordering::Equal) || left == right
3086                    }
3087                    BinOp::Neq => {
3088                        !(compare_values(&left, &right) == Some(Ordering::Equal) || left == right)
3089                    }
3090                    BinOp::Lt => compare_values(&left, &right) == Some(Ordering::Less),
3091                    BinOp::Lte => matches!(
3092                        compare_values(&left, &right),
3093                        Some(Ordering::Less | Ordering::Equal)
3094                    ),
3095                    BinOp::Gt => compare_values(&left, &right) == Some(Ordering::Greater),
3096                    BinOp::Gte => matches!(
3097                        compare_values(&left, &right),
3098                        Some(Ordering::Greater | Ordering::Equal)
3099                    ),
3100                    BinOp::And | BinOp::Or => unreachable!(),
3101                };
3102                Ok(Some(result))
3103            }
3104            BinOp::And => {
3105                let left = eval_bool_expr(row, left, params)?;
3106                if left == Some(false) {
3107                    return Ok(Some(false));
3108                }
3109                let right = eval_bool_expr(row, right, params)?;
3110                Ok(match (left, right) {
3111                    (Some(true), Some(true)) => Some(true),
3112                    (Some(true), other) => other,
3113                    (None, Some(false)) => Some(false),
3114                    (None, Some(true)) | (None, None) => None,
3115                    (Some(false), _) => Some(false),
3116                })
3117            }
3118            BinOp::Or => {
3119                let left = eval_bool_expr(row, left, params)?;
3120                if left == Some(true) {
3121                    return Ok(Some(true));
3122                }
3123                let right = eval_bool_expr(row, right, params)?;
3124                Ok(match (left, right) {
3125                    (Some(false), Some(false)) => Some(false),
3126                    (Some(false), other) => other,
3127                    (None, Some(true)) => Some(true),
3128                    (None, Some(false)) | (None, None) => None,
3129                    (Some(true), _) => Some(true),
3130                })
3131            }
3132        },
3133        Expr::UnaryOp {
3134            op: UnaryOp::Not,
3135            operand,
3136        } => Ok(eval_bool_expr(row, operand, params)?.map(|value| !value)),
3137        Expr::InList {
3138            expr,
3139            list,
3140            negated,
3141        } => {
3142            let needle = eval_expr_value(row, expr, params)?;
3143            if needle == Value::Null {
3144                return Ok(None);
3145            }
3146
3147            let matched = list.iter().try_fold(false, |found, item| {
3148                if found {
3149                    Ok(true)
3150                } else {
3151                    let candidate = eval_expr_value(row, item, params)?;
3152                    Ok(
3153                        matches!(compare_values(&needle, &candidate), Some(Ordering::Equal))
3154                            || (candidate != Value::Null && needle == candidate),
3155                    )
3156                }
3157            })?;
3158            Ok(Some(if *negated { !matched } else { matched }))
3159        }
3160        Expr::InSubquery { .. } => Err(Error::PlanError(
3161            "IN (subquery) must be resolved before execution".to_string(),
3162        )),
3163        Expr::Like {
3164            expr,
3165            pattern,
3166            negated,
3167        } => {
3168            let left = eval_expr_value(row, expr, params)?;
3169            let right = eval_expr_value(row, pattern, params)?;
3170            let matched = match (left, right) {
3171                (Value::Text(value), Value::Text(pattern)) => like_matches(&value, &pattern),
3172                _ => false,
3173            };
3174            Ok(Some(if *negated { !matched } else { matched }))
3175        }
3176        Expr::IsNull { expr, negated } => {
3177            let is_null = eval_expr_value(row, expr, params)? == Value::Null;
3178            Ok(Some(if *negated { !is_null } else { is_null }))
3179        }
3180        Expr::FunctionCall { .. } => match eval_expr_value(row, expr, params)? {
3181            Value::Bool(value) => Ok(Some(value)),
3182            Value::Null => Ok(None),
3183            _ => Err(Error::PlanError(format!(
3184                "unsupported WHERE expression: {:?}",
3185                expr
3186            ))),
3187        },
3188        _ => Err(Error::PlanError(format!(
3189            "unsupported WHERE expression: {:?}",
3190            expr
3191        ))),
3192    }
3193}
3194
3195fn eval_binary_op(op: &BinOp, left: &Value, right: &Value) -> Result<Value> {
3196    let bool_value = match op {
3197        BinOp::Eq => {
3198            if left == &Value::Null || right == &Value::Null {
3199                false
3200            } else {
3201                compare_values(left, right) == Some(Ordering::Equal) || left == right
3202            }
3203        }
3204        BinOp::Neq => {
3205            if left == &Value::Null || right == &Value::Null {
3206                false
3207            } else {
3208                !(compare_values(left, right) == Some(Ordering::Equal) || left == right)
3209            }
3210        }
3211        BinOp::Lt => compare_values(left, right) == Some(Ordering::Less),
3212        BinOp::Lte => matches!(
3213            compare_values(left, right),
3214            Some(Ordering::Less | Ordering::Equal)
3215        ),
3216        BinOp::Gt => compare_values(left, right) == Some(Ordering::Greater),
3217        BinOp::Gte => matches!(
3218            compare_values(left, right),
3219            Some(Ordering::Greater | Ordering::Equal)
3220        ),
3221        BinOp::And => value_to_bool(left) && value_to_bool(right),
3222        BinOp::Or => value_to_bool(left) || value_to_bool(right),
3223    };
3224    Ok(Value::Bool(bool_value))
3225}
3226
3227fn value_to_bool(value: &Value) -> bool {
3228    matches!(value, Value::Bool(true))
3229}
3230
3231fn compare_sort_values(left: &Value, right: &Value, direction: SortDirection) -> Ordering {
3232    match (left, right) {
3233        (Value::Null, Value::Null) => Ordering::Equal,
3234        (Value::Null, _) => match direction {
3235            SortDirection::Asc => Ordering::Greater,
3236            SortDirection::Desc => Ordering::Less,
3237            SortDirection::CosineDistance => Ordering::Equal,
3238        },
3239        (_, Value::Null) => match direction {
3240            SortDirection::Asc => Ordering::Less,
3241            SortDirection::Desc => Ordering::Greater,
3242            SortDirection::CosineDistance => Ordering::Equal,
3243        },
3244        _ => {
3245            let ordering = compare_values(left, right).unwrap_or(Ordering::Equal);
3246            match direction {
3247                SortDirection::Asc => ordering,
3248                SortDirection::Desc => ordering.reverse(),
3249                SortDirection::CosineDistance => ordering,
3250            }
3251        }
3252    }
3253}
3254
3255fn eval_assignment_expr(
3256    expr: &Expr,
3257    row_values: &HashMap<String, Value>,
3258    params: &HashMap<String, Value>,
3259) -> Result<Value> {
3260    match expr {
3261        Expr::Literal(lit) => literal_to_value(lit),
3262        Expr::Parameter(name) => params
3263            .get(name)
3264            .cloned()
3265            .ok_or_else(|| Error::Other(format!("unknown parameter: {}", name))),
3266        Expr::Column(col_ref) => row_values
3267            .get(&col_ref.column)
3268            .cloned()
3269            .ok_or_else(|| Error::Other(format!("column not found: {}", col_ref.column))),
3270        Expr::BinaryOp { left, op, right } => {
3271            let left = eval_assignment_expr(left, row_values, params)?;
3272            let right = eval_assignment_expr(right, row_values, params)?;
3273            eval_binary_op(op, &left, &right)
3274        }
3275        Expr::UnaryOp { op, operand } => match op {
3276            UnaryOp::Neg => match eval_assignment_expr(operand, row_values, params)? {
3277                Value::Int64(value) => Ok(Value::Int64(-value)),
3278                Value::Float64(value) => Ok(Value::Float64(-value)),
3279                _ => Err(Error::Other(format!(
3280                    "unsupported expression in UPDATE SET: {:?}",
3281                    expr
3282                ))),
3283            },
3284            UnaryOp::Not => Err(Error::Other(format!(
3285                "unsupported expression in UPDATE SET: {:?}",
3286                expr
3287            ))),
3288        },
3289        Expr::FunctionCall { name, args } => {
3290            let evaluated = args
3291                .iter()
3292                .map(|arg| eval_assignment_expr(arg, row_values, params))
3293                .collect::<Result<Vec<_>>>()?;
3294            eval_function(name, &evaluated)
3295        }
3296        _ => Err(Error::Other(format!(
3297            "unsupported expression in UPDATE SET: {:?}",
3298            expr
3299        ))),
3300    }
3301}
3302
3303fn apply_on_conflict_updates(
3304    db: &Database,
3305    table: &str,
3306    mut insert_values: HashMap<String, Value>,
3307    existing_row: &VersionedRow,
3308    on_conflict: &OnConflictPlan,
3309    params: &HashMap<String, Value>,
3310    active_tx: Option<TxId>,
3311) -> Result<HashMap<String, Value>> {
3312    if on_conflict.update_columns.is_empty() {
3313        return Ok(insert_values);
3314    }
3315
3316    // Reject column-level IMMUTABLE updates at the ON CONFLICT DO UPDATE merge
3317    // point. First flagged column in update-list order wins. Rejection returns
3318    // Err here; the caller (exec_insert) is responsible for releasing any
3319    // allocator bytes and restoring the write-set checkpoint.
3320    if let Some(meta) = db.table_meta(table) {
3321        for (column, _) in &on_conflict.update_columns {
3322            if let Some(col_def) = meta.columns.iter().find(|c| c.name == *column)
3323                && col_def.immutable
3324            {
3325                return Err(Error::ImmutableColumn {
3326                    table: table.to_string(),
3327                    column: column.clone(),
3328                });
3329            }
3330        }
3331    }
3332
3333    let current_tx_max = Some(db.committed_watermark());
3334
3335    let mut merged = existing_row.values.clone();
3336    for (column, expr) in &on_conflict.update_columns {
3337        let value = eval_assignment_expr(expr, &existing_row.values, params)?;
3338        merged.insert(
3339            column.clone(),
3340            coerce_value_for_column(db, table, column, value, current_tx_max, active_tx)?,
3341        );
3342    }
3343
3344    for (column, value) in insert_values.drain() {
3345        merged.entry(column).or_insert(value);
3346    }
3347
3348    Ok(merged)
3349}
3350
3351fn literal_to_value(lit: &Literal) -> Result<Value> {
3352    Ok(match lit {
3353        Literal::Null => Value::Null,
3354        Literal::Bool(v) => Value::Bool(*v),
3355        Literal::Integer(v) => Value::Int64(*v),
3356        Literal::Real(v) => Value::Float64(*v),
3357        Literal::Text(v) => Value::Text(v.clone()),
3358        Literal::Vector(v) => Value::Vector(v.clone()),
3359    })
3360}
3361
3362fn eval_arithmetic(name: &str, args: &[Value]) -> Result<Value> {
3363    let [left, right] = args else {
3364        return Err(Error::PlanError(format!(
3365            "function {} expects 2 arguments",
3366            name
3367        )));
3368    };
3369
3370    match (left, right) {
3371        (Value::Int64(left), Value::Int64(right)) => match name {
3372            "__add" => Ok(Value::Int64(left + right)),
3373            "__sub" => Ok(Value::Int64(left - right)),
3374            "__mul" => Ok(Value::Int64(left * right)),
3375            "__div" => Ok(Value::Int64(left / right)),
3376            _ => Err(Error::PlanError(format!("unknown function: {}", name))),
3377        },
3378        (Value::Float64(left), Value::Float64(right)) => match name {
3379            "__add" => Ok(Value::Float64(left + right)),
3380            "__sub" => Ok(Value::Float64(left - right)),
3381            "__mul" => Ok(Value::Float64(left * right)),
3382            "__div" => Ok(Value::Float64(left / right)),
3383            _ => Err(Error::PlanError(format!("unknown function: {}", name))),
3384        },
3385        (Value::Int64(left), Value::Float64(right)) => match name {
3386            "__add" => Ok(Value::Float64(*left as f64 + right)),
3387            "__sub" => Ok(Value::Float64(*left as f64 - right)),
3388            "__mul" => Ok(Value::Float64(*left as f64 * right)),
3389            "__div" => Ok(Value::Float64(*left as f64 / right)),
3390            _ => Err(Error::PlanError(format!("unknown function: {}", name))),
3391        },
3392        (Value::Float64(left), Value::Int64(right)) => match name {
3393            "__add" => Ok(Value::Float64(left + *right as f64)),
3394            "__sub" => Ok(Value::Float64(left - *right as f64)),
3395            "__mul" => Ok(Value::Float64(left * *right as f64)),
3396            "__div" => Ok(Value::Float64(left / *right as f64)),
3397            _ => Err(Error::PlanError(format!("unknown function: {}", name))),
3398        },
3399        _ => Err(Error::PlanError(format!(
3400            "function {} expects numeric arguments",
3401            name
3402        ))),
3403    }
3404}
3405
3406fn eval_function_in_row_context(
3407    row: &VersionedRow,
3408    name: &str,
3409    args: &[Expr],
3410    params: &HashMap<String, Value>,
3411) -> Result<Value> {
3412    let values = args
3413        .iter()
3414        .map(|arg| eval_expr_value(row, arg, params))
3415        .collect::<Result<Vec<_>>>()?;
3416    eval_function(name, &values)
3417}
3418
3419fn eval_function(name: &str, args: &[Value]) -> Result<Value> {
3420    match name.to_ascii_lowercase().as_str() {
3421        "__add" | "__sub" | "__mul" | "__div" => eval_arithmetic(name, args),
3422        "coalesce" => Ok(args
3423            .iter()
3424            .find(|value| **value != Value::Null)
3425            .cloned()
3426            .unwrap_or(Value::Null)),
3427        "now" => Ok(Value::Timestamp(
3428            SystemTime::now()
3429                .duration_since(UNIX_EPOCH)
3430                .map_err(|err| Error::PlanError(err.to_string()))?
3431                .as_secs() as i64,
3432        )),
3433        _ => Err(Error::PlanError(format!("unknown function: {}", name))),
3434    }
3435}
3436
3437fn like_matches(value: &str, pattern: &str) -> bool {
3438    let value_chars = value.chars().collect::<Vec<_>>();
3439    let pattern_chars = pattern.chars().collect::<Vec<_>>();
3440    let (mut vi, mut pi) = (0usize, 0usize);
3441    let (mut star_idx, mut match_idx) = (None, 0usize);
3442
3443    while vi < value_chars.len() {
3444        if pi < pattern_chars.len()
3445            && (pattern_chars[pi] == '_' || pattern_chars[pi] == value_chars[vi])
3446        {
3447            vi += 1;
3448            pi += 1;
3449        } else if pi < pattern_chars.len() && pattern_chars[pi] == '%' {
3450            star_idx = Some(pi);
3451            match_idx = vi;
3452            pi += 1;
3453        } else if let Some(star) = star_idx {
3454            pi = star + 1;
3455            match_idx += 1;
3456            vi = match_idx;
3457        } else {
3458            return false;
3459        }
3460    }
3461
3462    while pi < pattern_chars.len() && pattern_chars[pi] == '%' {
3463        pi += 1;
3464    }
3465
3466    pi == pattern_chars.len()
3467}
3468
3469fn resolve_in_subqueries(
3470    db: &Database,
3471    expr: &Expr,
3472    params: &HashMap<String, Value>,
3473    tx: Option<TxId>,
3474) -> Result<Expr> {
3475    resolve_in_subqueries_with_ctes(db, expr, params, tx, &[])
3476}
3477
3478pub(crate) fn resolve_in_subqueries_with_ctes(
3479    db: &Database,
3480    expr: &Expr,
3481    params: &HashMap<String, Value>,
3482    tx: Option<TxId>,
3483    ctes: &[Cte],
3484) -> Result<Expr> {
3485    match expr {
3486        Expr::InSubquery {
3487            expr,
3488            subquery,
3489            negated,
3490        } => {
3491            // Detect correlated subqueries: WHERE references to outer tables
3492            let mut subquery_tables: std::collections::HashSet<String> = subquery
3493                .from
3494                .iter()
3495                .filter_map(|item| match item {
3496                    contextdb_parser::ast::FromItem::Table { name, .. } => Some(name.clone()),
3497                    _ => None,
3498                })
3499                .collect();
3500            // CTE names are valid table references within the subquery
3501            for cte in ctes {
3502                match cte {
3503                    Cte::SqlCte { name, .. } | Cte::MatchCte { name, .. } => {
3504                        subquery_tables.insert(name.clone());
3505                    }
3506                }
3507            }
3508            if let Some(where_clause) = &subquery.where_clause
3509                && has_outer_table_ref(where_clause, &subquery_tables)
3510            {
3511                return Err(Error::Other(
3512                    "correlated subqueries are not supported".to_string(),
3513                ));
3514            }
3515
3516            let query_plan = plan(&Statement::Select(SelectStatement {
3517                ctes: ctes.to_vec(),
3518                body: (**subquery).clone(),
3519            }))?;
3520            let result = execute_plan(db, &query_plan, params, tx)?;
3521            let select_expr = subquery
3522                .columns
3523                .first()
3524                .map(|column| column.expr.clone())
3525                .ok_or_else(|| Error::PlanError("subquery must select one column".to_string()))?;
3526            let list = result
3527                .rows
3528                .iter()
3529                .map(|row| eval_project_expr(&select_expr, row, &result.columns, params))
3530                .collect::<Result<Vec<_>>>()?
3531                .into_iter()
3532                .map(value_to_literal)
3533                .collect::<Result<Vec<_>>>()?;
3534            Ok(Expr::InList {
3535                expr: Box::new(resolve_in_subqueries_with_ctes(db, expr, params, tx, ctes)?),
3536                list,
3537                negated: *negated,
3538            })
3539        }
3540        Expr::BinaryOp { left, op, right } => Ok(Expr::BinaryOp {
3541            left: Box::new(resolve_in_subqueries_with_ctes(db, left, params, tx, ctes)?),
3542            op: *op,
3543            right: Box::new(resolve_in_subqueries_with_ctes(
3544                db, right, params, tx, ctes,
3545            )?),
3546        }),
3547        Expr::UnaryOp { op, operand } => Ok(Expr::UnaryOp {
3548            op: *op,
3549            operand: Box::new(resolve_in_subqueries_with_ctes(
3550                db, operand, params, tx, ctes,
3551            )?),
3552        }),
3553        Expr::InList {
3554            expr,
3555            list,
3556            negated,
3557        } => Ok(Expr::InList {
3558            expr: Box::new(resolve_in_subqueries_with_ctes(db, expr, params, tx, ctes)?),
3559            list: list
3560                .iter()
3561                .map(|item| resolve_in_subqueries_with_ctes(db, item, params, tx, ctes))
3562                .collect::<Result<Vec<_>>>()?,
3563            negated: *negated,
3564        }),
3565        Expr::Like {
3566            expr,
3567            pattern,
3568            negated,
3569        } => Ok(Expr::Like {
3570            expr: Box::new(resolve_in_subqueries_with_ctes(db, expr, params, tx, ctes)?),
3571            pattern: Box::new(resolve_in_subqueries_with_ctes(
3572                db, pattern, params, tx, ctes,
3573            )?),
3574            negated: *negated,
3575        }),
3576        Expr::IsNull { expr, negated } => Ok(Expr::IsNull {
3577            expr: Box::new(resolve_in_subqueries_with_ctes(db, expr, params, tx, ctes)?),
3578            negated: *negated,
3579        }),
3580        Expr::FunctionCall { name, args } => Ok(Expr::FunctionCall {
3581            name: name.clone(),
3582            args: args
3583                .iter()
3584                .map(|arg| resolve_in_subqueries_with_ctes(db, arg, params, tx, ctes))
3585                .collect::<Result<Vec<_>>>()?,
3586        }),
3587        _ => Ok(expr.clone()),
3588    }
3589}
3590
3591fn has_outer_table_ref(expr: &Expr, subquery_tables: &std::collections::HashSet<String>) -> bool {
3592    match expr {
3593        Expr::Column(ColumnRef {
3594            table: Some(table), ..
3595        }) => !subquery_tables.contains(table),
3596        Expr::BinaryOp { left, right, .. } => {
3597            has_outer_table_ref(left, subquery_tables)
3598                || has_outer_table_ref(right, subquery_tables)
3599        }
3600        Expr::UnaryOp { operand, .. } => has_outer_table_ref(operand, subquery_tables),
3601        Expr::InList { expr, list, .. } => {
3602            has_outer_table_ref(expr, subquery_tables)
3603                || list
3604                    .iter()
3605                    .any(|item| has_outer_table_ref(item, subquery_tables))
3606        }
3607        Expr::IsNull { expr, .. } => has_outer_table_ref(expr, subquery_tables),
3608        Expr::Like { expr, pattern, .. } => {
3609            has_outer_table_ref(expr, subquery_tables)
3610                || has_outer_table_ref(pattern, subquery_tables)
3611        }
3612        Expr::FunctionCall { args, .. } => args
3613            .iter()
3614            .any(|arg| has_outer_table_ref(arg, subquery_tables)),
3615        _ => false,
3616    }
3617}
3618
3619fn value_to_literal(value: Value) -> Result<Expr> {
3620    Ok(Expr::Literal(match value {
3621        Value::Null => Literal::Null,
3622        Value::Bool(v) => Literal::Bool(v),
3623        Value::Int64(v) => Literal::Integer(v),
3624        Value::Float64(v) => Literal::Real(v),
3625        Value::Text(v) => Literal::Text(v),
3626        Value::Uuid(v) => Literal::Text(v.to_string()),
3627        Value::Timestamp(v) => Literal::Integer(v),
3628        other => {
3629            return Err(Error::PlanError(format!(
3630                "unsupported subquery result value: {:?}",
3631                other
3632            )));
3633        }
3634    }))
3635}
3636
3637fn query_result_row_matches(
3638    row: &[Value],
3639    columns: &[String],
3640    expr: &Expr,
3641    params: &HashMap<String, Value>,
3642) -> Result<bool> {
3643    Ok(eval_query_result_bool_expr(row, columns, expr, params)?.unwrap_or(false))
3644}
3645
3646fn eval_query_result_bool_expr(
3647    row: &[Value],
3648    columns: &[String],
3649    expr: &Expr,
3650    params: &HashMap<String, Value>,
3651) -> Result<Option<bool>> {
3652    match expr {
3653        Expr::BinaryOp { left, op, right } => match op {
3654            BinOp::Eq | BinOp::Neq | BinOp::Lt | BinOp::Lte | BinOp::Gt | BinOp::Gte => {
3655                let left = eval_query_result_expr(left, row, columns, params)?;
3656                let right = eval_query_result_expr(right, row, columns, params)?;
3657                if left == Value::Null || right == Value::Null {
3658                    return Ok(None);
3659                }
3660
3661                let result = match op {
3662                    BinOp::Eq => {
3663                        compare_values(&left, &right) == Some(Ordering::Equal) || left == right
3664                    }
3665                    BinOp::Neq => {
3666                        !(compare_values(&left, &right) == Some(Ordering::Equal) || left == right)
3667                    }
3668                    BinOp::Lt => compare_values(&left, &right) == Some(Ordering::Less),
3669                    BinOp::Lte => matches!(
3670                        compare_values(&left, &right),
3671                        Some(Ordering::Less | Ordering::Equal)
3672                    ),
3673                    BinOp::Gt => compare_values(&left, &right) == Some(Ordering::Greater),
3674                    BinOp::Gte => matches!(
3675                        compare_values(&left, &right),
3676                        Some(Ordering::Greater | Ordering::Equal)
3677                    ),
3678                    BinOp::And | BinOp::Or => unreachable!(),
3679                };
3680                Ok(Some(result))
3681            }
3682            BinOp::And => {
3683                let left = eval_query_result_bool_expr(row, columns, left, params)?;
3684                if left == Some(false) {
3685                    return Ok(Some(false));
3686                }
3687                let right = eval_query_result_bool_expr(row, columns, right, params)?;
3688                Ok(match (left, right) {
3689                    (Some(true), Some(true)) => Some(true),
3690                    (Some(true), other) => other,
3691                    (None, Some(false)) => Some(false),
3692                    (None, Some(true)) | (None, None) => None,
3693                    (Some(false), _) => Some(false),
3694                })
3695            }
3696            BinOp::Or => {
3697                let left = eval_query_result_bool_expr(row, columns, left, params)?;
3698                if left == Some(true) {
3699                    return Ok(Some(true));
3700                }
3701                let right = eval_query_result_bool_expr(row, columns, right, params)?;
3702                Ok(match (left, right) {
3703                    (Some(false), Some(false)) => Some(false),
3704                    (Some(false), other) => other,
3705                    (None, Some(true)) => Some(true),
3706                    (None, Some(false)) | (None, None) => None,
3707                    (Some(true), _) => Some(true),
3708                })
3709            }
3710        },
3711        Expr::UnaryOp {
3712            op: UnaryOp::Not,
3713            operand,
3714        } => Ok(eval_query_result_bool_expr(row, columns, operand, params)?.map(|value| !value)),
3715        Expr::InList {
3716            expr,
3717            list,
3718            negated,
3719        } => {
3720            let needle = eval_query_result_expr(expr, row, columns, params)?;
3721            if needle == Value::Null {
3722                return Ok(None);
3723            }
3724
3725            let matched = list.iter().try_fold(false, |found, item| {
3726                if found {
3727                    Ok(true)
3728                } else {
3729                    let candidate = eval_query_result_expr(item, row, columns, params)?;
3730                    Ok(
3731                        matches!(compare_values(&needle, &candidate), Some(Ordering::Equal))
3732                            || (candidate != Value::Null && needle == candidate),
3733                    )
3734                }
3735            })?;
3736            Ok(Some(if *negated { !matched } else { matched }))
3737        }
3738        Expr::InSubquery { .. } => Err(Error::PlanError(
3739            "IN (subquery) must be resolved before execution".to_string(),
3740        )),
3741        Expr::Like {
3742            expr,
3743            pattern,
3744            negated,
3745        } => {
3746            let left = eval_query_result_expr(expr, row, columns, params)?;
3747            let right = eval_query_result_expr(pattern, row, columns, params)?;
3748            let matched = match (left, right) {
3749                (Value::Text(value), Value::Text(pattern)) => like_matches(&value, &pattern),
3750                _ => false,
3751            };
3752            Ok(Some(if *negated { !matched } else { matched }))
3753        }
3754        Expr::IsNull { expr, negated } => {
3755            let is_null = eval_query_result_expr(expr, row, columns, params)? == Value::Null;
3756            Ok(Some(if *negated { !is_null } else { is_null }))
3757        }
3758        Expr::FunctionCall { .. } => match eval_query_result_expr(expr, row, columns, params)? {
3759            Value::Bool(value) => Ok(Some(value)),
3760            Value::Null => Ok(None),
3761            _ => Err(Error::PlanError(format!(
3762                "unsupported WHERE expression: {:?}",
3763                expr
3764            ))),
3765        },
3766        _ => Err(Error::PlanError(format!(
3767            "unsupported WHERE expression: {:?}",
3768            expr
3769        ))),
3770    }
3771}
3772
3773fn lookup_query_result_column(
3774    row: &[Value],
3775    input_columns: &[String],
3776    column_ref: &ColumnRef,
3777) -> Result<Value> {
3778    if let Some(table) = &column_ref.table {
3779        let qualified = format!("{table}.{}", column_ref.column);
3780        // Prioritize qualified match (e.g., "e.id") over unqualified (e.g., "id")
3781        // to avoid picking the wrong table's column in JOINs.
3782        let idx = input_columns
3783            .iter()
3784            .position(|name| name == &qualified)
3785            .or_else(|| {
3786                input_columns
3787                    .iter()
3788                    .position(|name| name == &column_ref.column)
3789            })
3790            .ok_or_else(|| Error::PlanError(format!("project column not found: {}", qualified)))?;
3791        return Ok(row.get(idx).cloned().unwrap_or(Value::Null));
3792    }
3793
3794    let matches = input_columns
3795        .iter()
3796        .enumerate()
3797        .filter_map(|(idx, name)| {
3798            (name == &column_ref.column
3799                || name.rsplit('.').next() == Some(column_ref.column.as_str()))
3800            .then_some(idx)
3801        })
3802        .collect::<Vec<_>>();
3803
3804    match matches.as_slice() {
3805        [] => Err(Error::PlanError(format!(
3806            "project column not found: {}",
3807            column_ref.column
3808        ))),
3809        [idx] => Ok(row.get(*idx).cloned().unwrap_or(Value::Null)),
3810        _ => Err(Error::PlanError(format!(
3811            "ambiguous column reference: {}",
3812            column_ref.column
3813        ))),
3814    }
3815}
3816
3817fn concatenate_rows(left: &[Value], right: &[Value]) -> Vec<Value> {
3818    let mut combined = Vec::with_capacity(left.len() + right.len());
3819    combined.extend_from_slice(left);
3820    combined.extend_from_slice(right);
3821    combined
3822}
3823
3824fn duplicate_column_names(left: &[String], right: &[String]) -> BTreeSet<String> {
3825    let left_names = left
3826        .iter()
3827        .map(|column| column.rsplit('.').next().unwrap_or(column.as_str()))
3828        .collect::<BTreeSet<_>>();
3829    right
3830        .iter()
3831        .filter_map(|column| {
3832            let bare = column.rsplit('.').next().unwrap_or(column.as_str());
3833            left_names.contains(bare).then(|| bare.to_string())
3834        })
3835        .collect()
3836}
3837
3838fn qualify_join_columns(
3839    columns: &[String],
3840    left_columns: &[String],
3841    right_columns: &[String],
3842    left_alias: &Option<String>,
3843    right_prefix: &str,
3844) -> Vec<String> {
3845    let left_prefix = left_alias.as_deref();
3846    columns
3847        .iter()
3848        .enumerate()
3849        .map(|(idx, column)| {
3850            if idx < left_columns.len() {
3851                if let Some(prefix) = left_prefix {
3852                    format!(
3853                        "{prefix}.{}",
3854                        left_columns[idx].rsplit('.').next().unwrap_or(column)
3855                    )
3856                } else {
3857                    left_columns[idx].clone()
3858                }
3859            } else {
3860                let right_idx = idx - left_columns.len();
3861                let bare = right_columns[right_idx]
3862                    .rsplit('.')
3863                    .next()
3864                    .unwrap_or(right_columns[right_idx].as_str());
3865                if column == bare {
3866                    format!("{right_prefix}.{bare}")
3867                } else {
3868                    column.clone()
3869                }
3870            }
3871        })
3872        .collect()
3873}
3874
3875fn right_table_name(plan: &PhysicalPlan) -> String {
3876    match plan {
3877        PhysicalPlan::Scan { table, alias, .. } => alias.clone().unwrap_or_else(|| table.clone()),
3878        _ => "right".to_string(),
3879    }
3880}
3881
3882fn distinct_row_key(row: &[Value]) -> Vec<u8> {
3883    bincode::serde::encode_to_vec(row, bincode::config::standard())
3884        .expect("query rows should serialize for DISTINCT")
3885}
3886
3887fn resolve_uuid(expr: &Expr, params: &HashMap<String, Value>) -> Result<uuid::Uuid> {
3888    match resolve_expr(expr, params)? {
3889        Value::Uuid(u) => Ok(u),
3890        Value::Text(t) => uuid::Uuid::parse_str(&t)
3891            .map_err(|e| Error::PlanError(format!("invalid uuid '{}': {}", t, e))),
3892        _ => Err(Error::PlanError(
3893            "graph start node must be UUID".to_string(),
3894        )),
3895    }
3896}
3897
3898/// Resolve start nodes for a graph BFS from a WHERE filter like `a.name = 'entity-0'`.
3899/// Scans all relational tables for rows matching the filter condition.
3900fn resolve_graph_start_nodes_from_filter(
3901    db: &Database,
3902    filter: &Expr,
3903    params: &HashMap<String, Value>,
3904) -> Result<Vec<uuid::Uuid>> {
3905    if let Some(ids) = resolve_graph_start_ids_from_filter(filter, params)? {
3906        return Ok(ids);
3907    }
3908
3909    // Extract column name and expected value from the filter (e.g., a.name = 'entity-0')
3910    let (col_name, expected_value) = match filter {
3911        Expr::BinaryOp {
3912            left,
3913            op: BinOp::Eq,
3914            right,
3915        } => {
3916            if let Some(col) = extract_column_name(left) {
3917                (col, resolve_expr(right, params)?)
3918            } else if let Some(col) = extract_column_name(right) {
3919                (col, resolve_expr(left, params)?)
3920            } else {
3921                return Ok(vec![]);
3922            }
3923        }
3924        _ => return Ok(vec![]),
3925    };
3926
3927    let snapshot = db.snapshot();
3928    let mut uuids = Vec::new();
3929    for table_name in db.table_names() {
3930        let meta = match db.table_meta(&table_name) {
3931            Some(m) => m,
3932            None => continue,
3933        };
3934        // Only scan tables that have the referenced column and an id column
3935        let has_col = meta.columns.iter().any(|c| c.name == col_name);
3936        let has_id = meta.columns.iter().any(|c| c.name == "id");
3937        if !has_col || !has_id {
3938            continue;
3939        }
3940        let rows = db.scan_filter(&table_name, snapshot, &|row| {
3941            row.values.get(&col_name) == Some(&expected_value)
3942        })?;
3943        for row in rows {
3944            if let Some(Value::Uuid(id)) = row.values.get("id") {
3945                uuids.push(*id);
3946            }
3947        }
3948    }
3949    Ok(uuids)
3950}
3951
3952fn resolve_graph_start_nodes_from_plan(
3953    db: &Database,
3954    plan: &PhysicalPlan,
3955    params: &HashMap<String, Value>,
3956    tx: Option<TxId>,
3957) -> Result<Vec<uuid::Uuid>> {
3958    let result = execute_plan(db, plan, params, tx)?;
3959    result
3960        .rows
3961        .into_iter()
3962        .filter_map(|row| row.into_iter().next())
3963        .map(|value| match value {
3964            Value::Uuid(id) => Ok(id),
3965            Value::Text(text) => uuid::Uuid::parse_str(&text)
3966                .map_err(|_| Error::PlanError(format!("invalid UUID in graph start plan: {text}"))),
3967            other => Err(Error::PlanError(format!(
3968                "invalid graph start identifier from plan: {other:?}"
3969            ))),
3970        })
3971        .collect()
3972}
3973
3974fn resolve_graph_start_ids_from_filter(
3975    filter: &Expr,
3976    params: &HashMap<String, Value>,
3977) -> Result<Option<Vec<uuid::Uuid>>> {
3978    match filter {
3979        Expr::BinaryOp {
3980            left,
3981            op: BinOp::Eq,
3982            right,
3983        } if is_graph_id_ref(left) || is_graph_id_ref(right) => {
3984            let value = if is_graph_id_ref(left) {
3985                resolve_expr(right, params)?
3986            } else {
3987                resolve_expr(left, params)?
3988            };
3989            let id = match value {
3990                Value::Uuid(id) => id,
3991                Value::Text(text) => uuid::Uuid::parse_str(&text).map_err(|_| {
3992                    Error::PlanError(format!("invalid UUID in graph filter: {text}"))
3993                })?,
3994                other => {
3995                    return Err(Error::PlanError(format!(
3996                        "invalid graph start identifier in filter: {other:?}"
3997                    )));
3998                }
3999            };
4000            Ok(Some(vec![id]))
4001        }
4002        Expr::InList { expr, list, .. } if is_graph_id_ref(expr) => {
4003            let ids = list
4004                .iter()
4005                .map(|item| resolve_expr(item, params))
4006                .map(|value| match value? {
4007                    Value::Uuid(id) => Ok(id),
4008                    Value::Text(text) => uuid::Uuid::parse_str(&text).map_err(|_| {
4009                        Error::PlanError(format!("invalid UUID in graph filter: {text}"))
4010                    }),
4011                    other => Err(Error::PlanError(format!(
4012                        "invalid graph start identifier in filter: {other:?}"
4013                    ))),
4014                })
4015                .collect::<Result<Vec<_>>>()?;
4016            Ok(Some(ids))
4017        }
4018        Expr::BinaryOp { left, right, .. } => {
4019            if let Some(ids) = resolve_graph_start_ids_from_filter(left, params)? {
4020                return Ok(Some(ids));
4021            }
4022            resolve_graph_start_ids_from_filter(right, params)
4023        }
4024        Expr::UnaryOp { operand, .. } => resolve_graph_start_ids_from_filter(operand, params),
4025        _ => Ok(None),
4026    }
4027}
4028
4029fn is_graph_id_ref(expr: &Expr) -> bool {
4030    matches!(
4031        expr,
4032        Expr::Column(contextdb_parser::ast::ColumnRef { column, .. }) if column == "id"
4033    )
4034}
4035
4036/// Extract a bare column name from an Expr::Column, ignoring table alias.
4037fn extract_column_name(expr: &Expr) -> Option<String> {
4038    match expr {
4039        Expr::Column(contextdb_parser::ast::ColumnRef { column, .. }) => Some(column.clone()),
4040        _ => None,
4041    }
4042}
4043
4044fn resolve_vector_from_expr(expr: &Expr, params: &HashMap<String, Value>) -> Result<Vec<f32>> {
4045    match resolve_expr(expr, params)? {
4046        Value::Vector(v) => Ok(v),
4047        Value::Text(name) => match params.get(&name) {
4048            Some(Value::Vector(v)) => Ok(v.clone()),
4049            _ => Err(Error::PlanError("vector parameter missing".to_string())),
4050        },
4051        _ => Err(Error::PlanError(
4052            "invalid vector query expression".to_string(),
4053        )),
4054    }
4055}
4056
4057fn validate_vector_columns(
4058    db: &Database,
4059    table: &str,
4060    values: &HashMap<String, Value>,
4061) -> Result<()> {
4062    let Some(meta) = db.table_meta(table) else {
4063        return Ok(());
4064    };
4065
4066    for column in &meta.columns {
4067        if let contextdb_core::ColumnType::Vector(expected) = column.column_type
4068            && let Some(Value::Vector(vector)) = values.get(&column.name)
4069        {
4070            let got = vector.len();
4071            if got != expected {
4072                return Err(Error::VectorDimensionMismatch { expected, got });
4073            }
4074        }
4075    }
4076
4077    Ok(())
4078}
4079
4080fn vector_value_for_table<'a>(
4081    db: &Database,
4082    table: &str,
4083    values: &'a HashMap<String, Value>,
4084) -> Option<&'a Vec<f32>> {
4085    let meta = db.table_meta(table)?;
4086    meta.columns
4087        .iter()
4088        .find_map(|column| match column.column_type {
4089            contextdb_core::ColumnType::Vector(_) => match values.get(&column.name) {
4090                Some(Value::Vector(vector)) => Some(vector),
4091                _ => None,
4092            },
4093            _ => None,
4094        })
4095}
4096
4097pub(crate) fn coerce_into_column(
4098    db: &Database,
4099    table: &str,
4100    col: &str,
4101    v: Value,
4102    current_tx_max: Option<TxId>,
4103    active_tx: Option<TxId>,
4104) -> Result<Value> {
4105    coerce_value_for_column(db, table, col, v, current_tx_max, active_tx)
4106}
4107
4108fn coerce_value_for_column(
4109    db: &Database,
4110    table: &str,
4111    col_name: &str,
4112    v: Value,
4113    current_tx_max: Option<TxId>,
4114    active_tx: Option<TxId>,
4115) -> Result<Value> {
4116    let Some(meta) = db.table_meta(table) else {
4117        // Non-TxId variant: pass through with lenient id-name coercion.
4118        if let Value::TxId(_) = &v {
4119            return Err(Error::ColumnTypeMismatch {
4120                table: table.to_string(),
4121                column: col_name.to_string(),
4122                expected: "UNKNOWN",
4123                actual: "TxId",
4124            });
4125        }
4126        return Ok(coerce_uuid_if_needed(col_name, v));
4127    };
4128    coerce_value_for_column_with_meta(table, &meta, col_name, v, current_tx_max, active_tx)
4129}
4130
4131fn coerce_value_for_column_with_meta(
4132    table: &str,
4133    meta: &TableMeta,
4134    col_name: &str,
4135    v: Value,
4136    current_tx_max: Option<TxId>,
4137    active_tx: Option<TxId>,
4138) -> Result<Value> {
4139    let Some(col) = meta.columns.iter().find(|c| c.name == col_name) else {
4140        if let Value::TxId(_) = &v {
4141            return Err(Error::ColumnTypeMismatch {
4142                table: table.to_string(),
4143                column: col_name.to_string(),
4144                expected: "UNKNOWN",
4145                actual: "TxId",
4146            });
4147        }
4148        return Ok(coerce_uuid_if_needed(col_name, v));
4149    };
4150
4151    match col.column_type {
4152        contextdb_core::ColumnType::Uuid => match v {
4153            Value::TxId(_) => Err(Error::ColumnTypeMismatch {
4154                table: table.to_string(),
4155                column: col_name.to_string(),
4156                expected: "UUID",
4157                actual: "TxId",
4158            }),
4159            other => coerce_uuid_value(other),
4160        },
4161        contextdb_core::ColumnType::Timestamp => match v {
4162            Value::TxId(_) => Err(Error::ColumnTypeMismatch {
4163                table: table.to_string(),
4164                column: col_name.to_string(),
4165                expected: "TIMESTAMP",
4166                actual: "TxId",
4167            }),
4168            other => coerce_timestamp_value(other),
4169        },
4170        contextdb_core::ColumnType::Vector(dim) => match v {
4171            Value::TxId(_) => Err(Error::ColumnTypeMismatch {
4172                table: table.to_string(),
4173                column: col_name.to_string(),
4174                expected: format_vector_type(dim),
4175                actual: "TxId",
4176            }),
4177            other => coerce_vector_value(other, dim),
4178        },
4179        contextdb_core::ColumnType::Integer => match v {
4180            Value::TxId(_) => Err(Error::ColumnTypeMismatch {
4181                table: table.to_string(),
4182                column: col_name.to_string(),
4183                expected: "INTEGER",
4184                actual: "TxId",
4185            }),
4186            other => Ok(coerce_uuid_if_needed(col_name, other)),
4187        },
4188        contextdb_core::ColumnType::Real => match v {
4189            Value::TxId(_) => Err(Error::ColumnTypeMismatch {
4190                table: table.to_string(),
4191                column: col_name.to_string(),
4192                expected: "REAL",
4193                actual: "TxId",
4194            }),
4195            other => Ok(coerce_uuid_if_needed(col_name, other)),
4196        },
4197        contextdb_core::ColumnType::Text => match v {
4198            Value::TxId(_) => Err(Error::ColumnTypeMismatch {
4199                table: table.to_string(),
4200                column: col_name.to_string(),
4201                expected: "TEXT",
4202                actual: "TxId",
4203            }),
4204            other => Ok(coerce_uuid_if_needed(col_name, other)),
4205        },
4206        contextdb_core::ColumnType::Boolean => match v {
4207            Value::TxId(_) => Err(Error::ColumnTypeMismatch {
4208                table: table.to_string(),
4209                column: col_name.to_string(),
4210                expected: "BOOLEAN",
4211                actual: "TxId",
4212            }),
4213            other => Ok(coerce_uuid_if_needed(col_name, other)),
4214        },
4215        contextdb_core::ColumnType::Json => match v {
4216            Value::TxId(_) => Err(Error::ColumnTypeMismatch {
4217                table: table.to_string(),
4218                column: col_name.to_string(),
4219                expected: "JSON",
4220                actual: "TxId",
4221            }),
4222            other => Ok(coerce_uuid_if_needed(col_name, other)),
4223        },
4224        contextdb_core::ColumnType::TxId => {
4225            coerce_txid_value(table, col_name, v, col.nullable, current_tx_max, active_tx)
4226        }
4227    }
4228}
4229
4230fn format_vector_type(dim: usize) -> &'static str {
4231    // We need &'static str for the error variant. Fall back to a lookup for common dims.
4232    match dim {
4233        1 => "VECTOR(1)",
4234        2 => "VECTOR(2)",
4235        3 => "VECTOR(3)",
4236        4 => "VECTOR(4)",
4237        8 => "VECTOR(8)",
4238        16 => "VECTOR(16)",
4239        32 => "VECTOR(32)",
4240        64 => "VECTOR(64)",
4241        128 => "VECTOR(128)",
4242        256 => "VECTOR(256)",
4243        512 => "VECTOR(512)",
4244        768 => "VECTOR(768)",
4245        1024 => "VECTOR(1024)",
4246        1536 => "VECTOR(1536)",
4247        3072 => "VECTOR(3072)",
4248        _ => "VECTOR",
4249    }
4250}
4251
4252fn coerce_txid_value(
4253    table: &str,
4254    col: &str,
4255    v: Value,
4256    nullable: bool,
4257    current_tx_max: Option<TxId>,
4258    active_tx: Option<TxId>,
4259) -> Result<Value> {
4260    match v {
4261        Value::Null => {
4262            if nullable {
4263                Ok(Value::Null)
4264            } else {
4265                Err(Error::ColumnNotNullable {
4266                    table: table.to_string(),
4267                    column: col.to_string(),
4268                })
4269            }
4270        }
4271        Value::TxId(tx) => {
4272            // Plan B7: `Value::TxId(n)` into a TXID column requires
4273            // `n <= max(committed_watermark, active_tx)`. The watermark is the
4274            // statement-scoped `current_tx_max` snapshot from
4275            // `TxManager::current_tx_max()`; `active_tx` is the in-flight
4276            // transaction that allocated the caller's TxId, which is permitted
4277            // as a self-reference. The error reports the watermark so callers
4278            // see what their edge has committed. Non-SQL callers pass `None`
4279            // for `current_tx_max` and skip the check.
4280            if let Some(max) = current_tx_max {
4281                let ceiling = max.0.max(active_tx.map(|t| t.0).unwrap_or(0));
4282                if tx.0 > ceiling {
4283                    return Err(Error::TxIdOutOfRange {
4284                        table: table.to_string(),
4285                        column: col.to_string(),
4286                        value: tx.0,
4287                        max: max.0,
4288                    });
4289                }
4290            }
4291            Ok(Value::TxId(tx))
4292        }
4293        other => Err(Error::ColumnTypeMismatch {
4294            table: table.to_string(),
4295            column: col.to_string(),
4296            expected: "TXID",
4297            actual: value_variant_name(&other),
4298        }),
4299    }
4300}
4301
4302fn value_variant_name(v: &Value) -> &'static str {
4303    match v {
4304        Value::Null => "Null",
4305        Value::Bool(_) => "Bool",
4306        Value::Int64(_) => "Int64",
4307        Value::Float64(_) => "Float64",
4308        Value::Text(_) => "Text",
4309        Value::Uuid(_) => "Uuid",
4310        Value::Timestamp(_) => "Timestamp",
4311        Value::Json(_) => "Json",
4312        Value::Vector(_) => "Vector",
4313        Value::TxId(_) => "TxId",
4314    }
4315}
4316
4317fn coerce_uuid_value(v: Value) -> Result<Value> {
4318    match v {
4319        Value::Null => Ok(Value::Null),
4320        Value::Uuid(id) => Ok(Value::Uuid(id)),
4321        Value::Text(text) => uuid::Uuid::parse_str(&text)
4322            .map(Value::Uuid)
4323            .map_err(|err| Error::Other(format!("invalid UUID literal '{text}': {err}"))),
4324        other => Err(Error::Other(format!(
4325            "UUID column requires UUID or text literal, got {other:?}"
4326        ))),
4327    }
4328}
4329
4330fn coerce_uuid_if_needed(col: &str, v: Value) -> Value {
4331    if (col == "id" || col.ends_with("_id"))
4332        && let Value::Text(s) = &v
4333        && let Ok(u) = uuid::Uuid::parse_str(s)
4334    {
4335        return Value::Uuid(u);
4336    }
4337    v
4338}
4339
4340fn coerce_timestamp_value(v: Value) -> Result<Value> {
4341    match v {
4342        Value::Null => Ok(Value::Null),
4343        Value::Text(text) if text.eq_ignore_ascii_case("infinity") => {
4344            Ok(Value::Timestamp(i64::MAX))
4345        }
4346        Value::Text(text) => {
4347            let parsed = OffsetDateTime::parse(&text, &Rfc3339).map_err(|err| {
4348                Error::Other(format!("invalid TIMESTAMP literal '{text}': {err}"))
4349            })?;
4350            Ok(Value::Timestamp(
4351                parsed.unix_timestamp_nanos() as i64 / 1_000_000,
4352            ))
4353        }
4354        other => Ok(other),
4355    }
4356}
4357
4358fn coerce_vector_value(v: Value, expected_dim: usize) -> Result<Value> {
4359    let vector = match v {
4360        Value::Null => return Ok(Value::Null),
4361        Value::Vector(vector) => vector,
4362        Value::Text(text) => parse_text_vector_literal(&text)?,
4363        other => return Ok(other),
4364    };
4365
4366    if vector.len() != expected_dim {
4367        return Err(Error::VectorDimensionMismatch {
4368            expected: expected_dim,
4369            got: vector.len(),
4370        });
4371    }
4372
4373    Ok(Value::Vector(vector))
4374}
4375
4376fn parse_text_vector_literal(text: &str) -> Result<Vec<f32>> {
4377    let trimmed = text.trim();
4378    let inner = trimmed
4379        .strip_prefix('[')
4380        .and_then(|s| s.strip_suffix(']'))
4381        .ok_or_else(|| Error::Other(format!("invalid VECTOR literal '{text}'")))?;
4382
4383    if inner.trim().is_empty() {
4384        return Ok(Vec::new());
4385    }
4386
4387    inner
4388        .split(',')
4389        .map(|part| {
4390            part.trim().parse::<f32>().map_err(|err| {
4391                Error::Other(format!("invalid VECTOR component '{}': {err}", part.trim()))
4392            })
4393        })
4394        .collect()
4395}
4396
4397fn apply_missing_column_defaults(
4398    db: &Database,
4399    table: &str,
4400    values: &mut HashMap<String, Value>,
4401    active_tx: Option<TxId>,
4402) -> Result<()> {
4403    let Some(meta) = db.table_meta(table) else {
4404        return Ok(());
4405    };
4406
4407    let current_tx_max = Some(db.committed_watermark());
4408
4409    for column in &meta.columns {
4410        if values.contains_key(&column.name) {
4411            continue;
4412        }
4413        let Some(default) = &column.default else {
4414            continue;
4415        };
4416        let value = evaluate_stored_default_expr(default)?;
4417        values.insert(
4418            column.name.clone(),
4419            coerce_value_for_column(db, table, &column.name, value, current_tx_max, active_tx)?,
4420        );
4421    }
4422
4423    Ok(())
4424}
4425
4426fn evaluate_stored_default_expr(default: &str) -> Result<Value> {
4427    if default.eq_ignore_ascii_case("NOW()") {
4428        return eval_function("now", &[]);
4429    }
4430    if default.contains("FunctionCall") && default.contains("name: \"NOW\"") {
4431        return eval_function("now", &[]);
4432    }
4433    if default == "Literal(Null)" || default.eq_ignore_ascii_case("NULL") {
4434        return Ok(Value::Null);
4435    }
4436    if default.eq_ignore_ascii_case("TRUE") {
4437        return Ok(Value::Bool(true));
4438    }
4439    if default.eq_ignore_ascii_case("FALSE") {
4440        return Ok(Value::Bool(false));
4441    }
4442    if default.starts_with('\'') && default.ends_with('\'') && default.len() >= 2 {
4443        return Ok(Value::Text(
4444            default[1..default.len() - 1].replace("''", "'"),
4445        ));
4446    }
4447    if let Some(text) = default
4448        .strip_prefix("Literal(Text(\"")
4449        .and_then(|value| value.strip_suffix("\"))"))
4450    {
4451        return Ok(Value::Text(text.to_string()));
4452    }
4453    if let Some(value) = default
4454        .strip_prefix("Literal(Integer(")
4455        .and_then(|value| value.strip_suffix("))"))
4456    {
4457        let parsed = value.parse::<i64>().map_err(|err| {
4458            Error::Other(format!("invalid stored integer default '{value}': {err}"))
4459        })?;
4460        return Ok(Value::Int64(parsed));
4461    }
4462    if let Some(value) = default
4463        .strip_prefix("Literal(Real(")
4464        .and_then(|value| value.strip_suffix("))"))
4465    {
4466        let parsed = value
4467            .parse::<f64>()
4468            .map_err(|err| Error::Other(format!("invalid stored real default '{value}': {err}")))?;
4469        return Ok(Value::Float64(parsed));
4470    }
4471    if let Some(value) = default
4472        .strip_prefix("Literal(Bool(")
4473        .and_then(|value| value.strip_suffix("))"))
4474    {
4475        let parsed = value
4476            .parse::<bool>()
4477            .map_err(|err| Error::Other(format!("invalid stored bool default '{value}': {err}")))?;
4478        return Ok(Value::Bool(parsed));
4479    }
4480
4481    Err(Error::Other(format!(
4482        "unsupported stored DEFAULT expression: {default}"
4483    )))
4484}
4485
4486pub(crate) fn stored_default_expr(expr: &Expr) -> String {
4487    match expr {
4488        Expr::Literal(Literal::Null) => "NULL".to_string(),
4489        Expr::Literal(Literal::Bool(value)) => {
4490            if *value {
4491                "TRUE".to_string()
4492            } else {
4493                "FALSE".to_string()
4494            }
4495        }
4496        Expr::Literal(Literal::Integer(value)) => value.to_string(),
4497        Expr::Literal(Literal::Real(value)) => value.to_string(),
4498        Expr::Literal(Literal::Text(value)) => format!("'{}'", value.replace('\'', "''")),
4499        Expr::FunctionCall { name, args }
4500            if name.eq_ignore_ascii_case("NOW") && args.is_empty() =>
4501        {
4502            "NOW()".to_string()
4503        }
4504        _ => format!("{expr:?}"),
4505    }
4506}
4507
4508fn validate_expires_column(col: &contextdb_parser::ast::ColumnDef) -> Result<()> {
4509    if col.expires && !matches!(col.data_type, DataType::Timestamp) {
4510        return Err(Error::Other(
4511            "EXPIRES is only valid on TIMESTAMP columns".to_string(),
4512        ));
4513    }
4514    Ok(())
4515}
4516
4517fn expires_column_name(columns: &[contextdb_parser::ast::ColumnDef]) -> Result<Option<String>> {
4518    let mut expires_column = None;
4519    for col in columns {
4520        validate_expires_column(col)?;
4521        if col.expires {
4522            if expires_column.is_some() {
4523                return Err(Error::Other(
4524                    "only one EXPIRES column is supported per table".to_string(),
4525                ));
4526            }
4527            expires_column = Some(col.name.clone());
4528        }
4529    }
4530    Ok(expires_column)
4531}
4532
4533pub(crate) fn map_column_type(dtype: &DataType) -> contextdb_core::ColumnType {
4534    match dtype {
4535        DataType::Uuid => contextdb_core::ColumnType::Uuid,
4536        DataType::Text => contextdb_core::ColumnType::Text,
4537        DataType::Integer => contextdb_core::ColumnType::Integer,
4538        DataType::Real => contextdb_core::ColumnType::Real,
4539        DataType::Boolean => contextdb_core::ColumnType::Boolean,
4540        DataType::Timestamp => contextdb_core::ColumnType::Timestamp,
4541        DataType::Json => contextdb_core::ColumnType::Json,
4542        DataType::Vector(dim) => contextdb_core::ColumnType::Vector(*dim as usize),
4543        DataType::TxId => contextdb_core::ColumnType::TxId,
4544    }
4545}
4546
4547fn parse_conflict_policy(s: &str) -> Result<ConflictPolicy> {
4548    match s {
4549        "latest_wins" => Ok(ConflictPolicy::LatestWins),
4550        "server_wins" => Ok(ConflictPolicy::ServerWins),
4551        "edge_wins" => Ok(ConflictPolicy::EdgeWins),
4552        "insert_if_not_exists" => Ok(ConflictPolicy::InsertIfNotExists),
4553        _ => Err(Error::Other(format!("unknown conflict policy: {s}"))),
4554    }
4555}
4556
4557fn conflict_policy_to_string(p: ConflictPolicy) -> String {
4558    match p {
4559        ConflictPolicy::LatestWins => "latest_wins".to_string(),
4560        ConflictPolicy::ServerWins => "server_wins".to_string(),
4561        ConflictPolicy::EdgeWins => "edge_wins".to_string(),
4562        ConflictPolicy::InsertIfNotExists => "insert_if_not_exists".to_string(),
4563    }
4564}
4565
4566fn project_graph_frontier_rows(
4567    frontier: Vec<(HashMap<String, uuid::Uuid>, uuid::Uuid, u32)>,
4568    start_alias: &str,
4569    steps: &[GraphStepPlan],
4570) -> Result<Vec<Vec<Value>>> {
4571    frontier
4572        .into_iter()
4573        .map(|(bindings, id, depth)| {
4574            let mut row = Vec::with_capacity(steps.len() + 3);
4575            let start_id = bindings.get(start_alias).ok_or_else(|| {
4576                Error::PlanError(format!(
4577                    "graph frontier missing required start alias binding '{start_alias}'"
4578                ))
4579            })?;
4580            row.push(Value::Uuid(*start_id));
4581            for step in steps {
4582                let target_id = bindings.get(&step.target_alias).ok_or_else(|| {
4583                    Error::PlanError(format!(
4584                        "graph frontier missing required target alias binding '{}'",
4585                        step.target_alias
4586                    ))
4587                })?;
4588                row.push(Value::Uuid(*target_id));
4589            }
4590            row.push(Value::Uuid(id));
4591            row.push(Value::Int64(depth as i64));
4592            Ok(row)
4593        })
4594        .collect()
4595}
4596
4597#[cfg(test)]
4598mod tests {
4599    use super::*;
4600    use contextdb_planner::GraphStepPlan;
4601    use uuid::Uuid;
4602
4603    #[test]
4604    fn graph_01_frontier_projection_requires_complete_bindings() {
4605        let steps = vec![GraphStepPlan {
4606            edge_types: vec!["EDGE".to_string()],
4607            direction: Direction::Outgoing,
4608            min_depth: 1,
4609            max_depth: 1,
4610            target_alias: "b".to_string(),
4611        }];
4612
4613        let missing_start = vec![(HashMap::new(), Uuid::new_v4(), 0)];
4614        let missing_target = vec![(
4615            HashMap::from([("a".to_string(), Uuid::new_v4())]),
4616            Uuid::new_v4(),
4617            0,
4618        )];
4619
4620        let start_result = project_graph_frontier_rows(missing_start, "a", &steps);
4621        assert!(
4622            matches!(start_result, Err(Error::PlanError(_))),
4623            "graph frontier projection should return a plan error on missing start alias binding, got {start_result:?}"
4624        );
4625
4626        let target_result = project_graph_frontier_rows(missing_target, "a", &steps);
4627        assert!(
4628            matches!(target_result, Err(Error::PlanError(_))),
4629            "graph frontier projection should return a plan error on missing target alias binding, got {target_result:?}"
4630        );
4631    }
4632}