1use crate::database::{Database, InsertRowResult, QueryResult, QueryTrace, rank_index_name};
2use crate::rank_formula::RankFormula;
3use crate::sync_types::ConflictPolicy;
4use contextdb_core::*;
5use contextdb_parser::ast::{
6 AlterAction, BinOp, ColumnRef, Cte, DataType, Expr, Literal, SelectStatement,
7 SetDiskLimitValue, SetMemoryLimitValue, SortDirection, Statement, UnaryOp,
8};
9use contextdb_planner::{
10 DeletePlan, GraphStepPlan, InsertPlan, OnConflictPlan, PhysicalPlan, UpdatePlan, plan,
11};
12use roaring::RoaringTreemap;
13use std::cmp::Ordering;
14use std::collections::{BTreeSet, HashMap, HashSet};
15use std::sync::Arc;
16use std::time::{SystemTime, UNIX_EPOCH};
17use time::OffsetDateTime;
18use time::format_description::well_known::Rfc3339;
19
20pub(crate) fn execute_plan(
21 db: &Database,
22 plan: &PhysicalPlan,
23 params: &HashMap<String, Value>,
24 tx: Option<TxId>,
25) -> Result<QueryResult> {
26 match plan {
27 PhysicalPlan::CreateTable(p) => {
28 db.check_disk_budget("CREATE TABLE")?;
29 let expires_column = expires_column_name(&p.columns)?;
30 let mut auto_indexes: Vec<contextdb_core::IndexDecl> = Vec::new();
34 for c in &p.columns {
35 if c.primary_key
36 && !matches!(
37 map_column_type(&c.data_type),
38 ColumnType::Json | ColumnType::Vector(_)
39 )
40 {
41 auto_indexes.push(contextdb_core::IndexDecl {
42 name: format!("__pk_{}", c.name),
43 columns: vec![(c.name.clone(), contextdb_core::SortDirection::Asc)],
44 kind: contextdb_core::IndexKind::Auto,
45 });
46 }
47 if c.unique
48 && !c.primary_key
49 && !matches!(
50 map_column_type(&c.data_type),
51 ColumnType::Json | ColumnType::Vector(_)
52 )
53 {
54 auto_indexes.push(contextdb_core::IndexDecl {
55 name: format!("__unique_{}", c.name),
56 columns: vec![(c.name.clone(), contextdb_core::SortDirection::Asc)],
57 kind: contextdb_core::IndexKind::Auto,
58 });
59 }
60 }
61 for uc in &p.unique_constraints {
62 let all_indexable = uc.iter().all(|col_name| {
65 p.columns
66 .iter()
67 .find(|c| c.name == *col_name)
68 .map(|c| {
69 !matches!(
70 map_column_type(&c.data_type),
71 ColumnType::Json | ColumnType::Vector(_)
72 )
73 })
74 .unwrap_or(false)
75 });
76 if !all_indexable || uc.is_empty() {
77 continue;
78 }
79 let name = format!("__unique_{}", uc.join("_"));
80 let cols: Vec<(String, contextdb_core::SortDirection)> = uc
81 .iter()
82 .map(|c| (c.clone(), contextdb_core::SortDirection::Asc))
83 .collect();
84 auto_indexes.push(contextdb_core::IndexDecl {
85 name,
86 columns: cols,
87 kind: contextdb_core::IndexKind::Auto,
88 });
89 }
90 let mut resolved_policies = HashMap::<String, ResolvedRankPolicy>::new();
91 for column in &p.columns {
92 if let Some(resolved) =
93 validate_rank_policy_for_column(db, &p.name, column, &p.columns)?
94 {
95 resolved_policies.insert(column.name.clone(), resolved);
96 }
97 }
98 let meta = TableMeta {
99 columns: p
100 .columns
101 .iter()
102 .map(|c| {
103 core_column_from_ast(
104 c,
105 resolved_policies
106 .get(&c.name)
107 .map(|resolved| resolved.policy.clone()),
108 )
109 })
110 .collect(),
111 immutable: p.immutable,
112 state_machine: p.state_machine.as_ref().map(|sm| StateMachineConstraint {
113 column: sm.column.clone(),
114 transitions: sm
115 .transitions
116 .iter()
117 .map(|(from, tos)| (from.clone(), tos.clone()))
118 .collect(),
119 }),
120 dag_edge_types: p.dag_edge_types.clone(),
121 unique_constraints: p.unique_constraints.clone(),
122 natural_key_column: None,
123 propagation_rules: p.propagation_rules.clone(),
124 default_ttl_seconds: p.retain.as_ref().map(|r| r.duration_seconds),
125 sync_safe: p.retain.as_ref().is_some_and(|r| r.sync_safe),
126 expires_column,
127 indexes: auto_indexes.clone(),
134 };
135 let metadata_bytes = meta.estimated_bytes();
136 db.accountant().try_allocate_for(
137 metadata_bytes,
138 "ddl",
139 "create_table",
140 "Reduce schema size or raise MEMORY_LIMIT before creating more tables.",
141 )?;
142 db.relational_store().create_table(&p.name, meta);
143 for idx in &auto_indexes {
144 db.relational_store()
145 .create_index_storage(&p.name, &idx.name, idx.columns.clone());
146 }
147 if let Some(table_meta) = db.table_meta(&p.name) {
148 for column in &table_meta.columns {
149 db.register_vector_index_for_column(&p.name, column);
150 }
151 }
152 for (column, resolved) in resolved_policies {
153 db.register_rank_formula(&p.name, &column, resolved.formula);
154 }
155 if let Some(table_meta) = db.table_meta(&p.name) {
156 db.persist_table_meta(&p.name, &table_meta)?;
157 db.allocate_ddl_lsn(|lsn| db.log_create_table_ddl(&p.name, &table_meta, lsn))?;
158 }
159 db.clear_statement_cache();
160 Ok(QueryResult::empty_with_affected(0))
161 }
162 PhysicalPlan::DropTable(name) => {
163 if let Some(block) = rank_policy_drop_table_blocker(db, name) {
164 return Err(block);
165 }
166 let bytes_to_release = estimate_drop_table_bytes(db, name);
167 db.drop_table_aux_state(name);
168 db.remove_rank_formulas_for_table(name);
169 db.vector_store_deregister_table(name);
170 db.relational_store().drop_table(name);
171 db.remove_persisted_table(name)?;
172 db.allocate_ddl_lsn(|lsn| db.log_drop_table_ddl(name, lsn))?;
173 db.accountant().release(bytes_to_release);
174 db.clear_statement_cache();
175 Ok(QueryResult::empty_with_affected(0))
176 }
177 PhysicalPlan::AlterTable(p) => {
178 db.check_disk_budget("ALTER TABLE")?;
179 let store = db.relational_store();
180 let mut rewrite_vectors_after_alter = false;
181 match &p.action {
182 AlterAction::AddColumn(col) => {
183 if col.primary_key {
184 return Err(Error::Other(
185 "adding a primary key column via ALTER TABLE is not supported"
186 .to_string(),
187 ));
188 }
189 validate_expires_column(col)?;
190 if col.immutable
194 && let Some(existing_meta) = db.table_meta(&p.table)
195 {
196 let targets_col =
197 existing_meta
198 .propagation_rules
199 .iter()
200 .any(|rule| {
201 match rule {
202 contextdb_core::table_meta::PropagationRule::ForeignKey {
203 target_state,
204 ..
205 } => *target_state == col.name,
206 contextdb_core::table_meta::PropagationRule::Edge {
207 target_state,
208 ..
209 } => *target_state == col.name,
210 contextdb_core::table_meta::PropagationRule::VectorExclusion {
211 ..
212 } => false,
213 }
214 });
215 if targets_col {
216 return Err(Error::ImmutableColumn {
217 table: p.table.clone(),
218 column: col.name.clone(),
219 });
220 }
221 }
222 let mut all_columns = db
223 .table_meta(&p.table)
224 .map(|meta| {
225 meta.columns
226 .into_iter()
227 .map(ast_column_from_core)
228 .collect::<Vec<_>>()
229 })
230 .unwrap_or_default();
231 all_columns.push(col.clone());
232 let resolved_policy =
233 validate_rank_policy_for_column(db, &p.table, col, &all_columns)?;
234 let core_col = core_column_from_ast(
235 col,
236 resolved_policy
237 .as_ref()
238 .map(|resolved| resolved.policy.clone()),
239 );
240 store
241 .alter_table_add_column(&p.table, core_col)
242 .map_err(Error::Other)?;
243 if let Some(table_meta) = db.table_meta(&p.table)
244 && let Some(column) = table_meta
245 .columns
246 .iter()
247 .find(|column| column.name == col.name)
248 {
249 db.register_vector_index_for_column(&p.table, column);
250 }
251 if col.expires {
252 let mut meta = store.table_meta.write();
253 let table_meta = meta.get_mut(&p.table).ok_or_else(|| {
254 Error::Other(format!("table '{}' not found", p.table))
255 })?;
256 table_meta.expires_column = Some(col.name.clone());
257 }
258 if let Some(resolved) = resolved_policy {
259 db.register_rank_formula(&p.table, &col.name, resolved.formula);
260 }
261 }
262 AlterAction::DropColumn {
263 column: name,
264 cascade,
265 } => {
266 if let Some(block) = rank_policy_drop_column_blocker(db, &p.table, name) {
267 return Err(block);
268 }
269 let dropped_vector_column = db
270 .table_meta(&p.table)
271 .and_then(|meta| {
272 meta.columns.into_iter().find(|column| column.name == *name)
273 })
274 .is_some_and(|column| {
275 matches!(column.column_type, contextdb_core::ColumnType::Vector(_))
276 });
277 if let Some(existing_meta) = db.table_meta(&p.table)
278 && let Some(col) = existing_meta.columns.iter().find(|c| c.name == *name)
279 && col.immutable
280 {
281 return Err(Error::ImmutableColumn {
282 table: p.table.clone(),
283 column: name.clone(),
284 });
285 }
286 if let Some(existing_meta) = db.table_meta(&p.table)
291 && let Some(col) = existing_meta.columns.iter().find(|c| c.name == *name)
292 && col.primary_key
293 {
294 return Err(Error::Other(format!(
295 "cannot drop primary key column {}.{}",
296 p.table, name
297 )));
298 }
299 let dependent_user_indexes: Vec<String> = db
303 .table_meta(&p.table)
304 .map(|m| {
305 m.indexes
306 .iter()
307 .filter(|i| {
308 i.kind == contextdb_core::IndexKind::UserDeclared
309 && i.columns.iter().any(|(c, _)| c == name)
310 })
311 .map(|i| i.name.clone())
312 .collect()
313 })
314 .unwrap_or_default();
315 let dependent_indexes: Vec<String> = db
316 .table_meta(&p.table)
317 .map(|m| {
318 m.indexes
319 .iter()
320 .filter(|i| i.columns.iter().any(|(c, _)| c == name))
321 .map(|i| i.name.clone())
322 .collect()
323 })
324 .unwrap_or_default();
325 if !*cascade && !dependent_user_indexes.is_empty() {
326 return Err(Error::ColumnInIndex {
327 table: p.table.clone(),
328 column: name.clone(),
329 index: dependent_user_indexes[0].clone(),
330 });
331 }
332 store
333 .alter_table_drop_column(&p.table, name)
334 .map_err(Error::Other)?;
335 db.remove_rank_formula(&p.table, name);
336 db.deregister_vector_index(&p.table, name);
337 rewrite_vectors_after_alter = dropped_vector_column;
338 if *cascade {
339 {
341 let mut metas = store.table_meta.write();
342 if let Some(m) = metas.get_mut(&p.table) {
343 m.indexes
344 .retain(|i| !i.columns.iter().any(|(c, _)| c == name));
345 }
346 }
347 for idx in &dependent_indexes {
348 store.drop_index_storage(&p.table, idx);
349 db.allocate_ddl_lsn(|lsn| db.log_drop_index_ddl(&p.table, idx, lsn))?;
350 }
351 }
352 let mut meta = store.table_meta.write();
353 if let Some(table_meta) = meta.get_mut(&p.table)
354 && table_meta.expires_column.as_deref() == Some(name.as_str())
355 {
356 table_meta.expires_column = None;
357 }
358 drop(meta);
359 if let Some(table_meta) = db.table_meta(&p.table) {
360 db.persist_table_meta(&p.table, &table_meta)?;
361 db.persist_table_rows(&p.table)?;
362 if rewrite_vectors_after_alter {
363 db.persist_vectors()?;
364 }
365 db.allocate_ddl_lsn(|lsn| {
366 db.log_alter_table_ddl(&p.table, &table_meta, lsn)
367 })?;
368 }
369 db.clear_statement_cache();
370 return Ok(QueryResult {
371 columns: vec![],
372 rows: vec![],
373 rows_affected: 0,
374 trace: crate::database::QueryTrace::scan(),
375 cascade: if *cascade {
376 Some(crate::database::CascadeReport {
377 dropped_indexes: dependent_indexes,
378 })
379 } else {
380 None
381 },
382 });
383 }
384 AlterAction::RenameColumn { from, to } => {
385 if let Some(block) = rank_policy_drop_column_blocker(db, &p.table, from) {
386 return Err(block);
387 }
388 let renamed_vector_column = db
389 .table_meta(&p.table)
390 .and_then(|meta| {
391 meta.columns.into_iter().find(|column| column.name == *from)
392 })
393 .is_some_and(|column| {
394 matches!(column.column_type, contextdb_core::ColumnType::Vector(_))
395 });
396 if let Some(existing_meta) = db.table_meta(&p.table)
397 && let Some(col) = existing_meta.columns.iter().find(|c| c.name == *from)
398 && col.immutable
399 {
400 return Err(Error::ImmutableColumn {
401 table: p.table.clone(),
402 column: from.clone(),
403 });
404 }
405 store
406 .alter_table_rename_column(&p.table, from, to)
407 .map_err(Error::Other)?;
408 if renamed_vector_column {
409 db.rename_vector_index(&p.table, from, to)?;
410 rewrite_vectors_after_alter = true;
411 }
412 let mut meta = store.table_meta.write();
413 if let Some(table_meta) = meta.get_mut(&p.table)
414 && table_meta.expires_column.as_deref() == Some(from.as_str())
415 {
416 table_meta.expires_column = Some(to.clone());
417 }
418 }
419 AlterAction::SetRetain {
420 duration_seconds,
421 sync_safe,
422 } => {
423 let mut meta = store.table_meta.write();
424 let table_meta = meta
425 .get_mut(&p.table)
426 .ok_or_else(|| Error::Other(format!("table '{}' not found", p.table)))?;
427 if table_meta.immutable {
428 return Err(Error::Other(
429 "IMMUTABLE and RETAIN are mutually exclusive".to_string(),
430 ));
431 }
432 table_meta.default_ttl_seconds = Some(*duration_seconds);
433 table_meta.sync_safe = *sync_safe;
434 }
435 AlterAction::DropRetain => {
436 let mut meta = store.table_meta.write();
437 let table_meta = meta
438 .get_mut(&p.table)
439 .ok_or_else(|| Error::Other(format!("table '{}' not found", p.table)))?;
440 table_meta.default_ttl_seconds = None;
441 table_meta.sync_safe = false;
442 }
443 AlterAction::SetSyncConflictPolicy(policy) => {
444 let cp = parse_conflict_policy(policy)?;
445 db.set_table_conflict_policy(&p.table, cp);
446 }
447 AlterAction::DropSyncConflictPolicy => {
448 db.drop_table_conflict_policy(&p.table);
449 }
450 }
451 if let Some(table_meta) = db.table_meta(&p.table) {
452 db.persist_table_meta(&p.table, &table_meta)?;
453 if !matches!(
454 p.action,
455 AlterAction::AddColumn(_)
456 | AlterAction::SetRetain { .. }
457 | AlterAction::DropRetain
458 | AlterAction::SetSyncConflictPolicy(_)
459 | AlterAction::DropSyncConflictPolicy
460 ) {
461 db.persist_table_rows(&p.table)?;
462 }
463 if rewrite_vectors_after_alter {
464 db.persist_vectors()?;
465 }
466 db.allocate_ddl_lsn(|lsn| db.log_alter_table_ddl(&p.table, &table_meta, lsn))?;
467 }
468 db.clear_statement_cache();
469 Ok(QueryResult::empty_with_affected(0))
470 }
471 PhysicalPlan::Insert(p) => exec_insert(db, p, params, tx),
472 PhysicalPlan::Delete(p) => exec_delete(db, p, params, tx),
473 PhysicalPlan::Update(p) => exec_update(db, p, params, tx),
474 PhysicalPlan::Scan { table, filter, .. } => {
475 if table == "dual" {
476 return Ok(QueryResult {
477 columns: vec![],
478 rows: vec![vec![]],
479 rows_affected: 0,
480 trace: crate::database::QueryTrace::scan(),
481 cascade: None,
482 });
483 }
484 let snapshot = db.snapshot_for_read();
485 let schema_columns = db.table_meta(table).map(|meta| {
486 meta.columns
487 .into_iter()
488 .map(|column| column.name)
489 .collect::<Vec<_>>()
490 });
491 let resolved_filter = filter
492 .as_ref()
493 .map(|expr| resolve_in_subqueries(db, expr, params, tx))
494 .transpose()?;
495
496 let meta_for_indexes = db.table_meta(table);
500 let indexes: Vec<contextdb_core::IndexDecl> = meta_for_indexes
501 .as_ref()
502 .map(|m| m.indexes.clone())
503 .unwrap_or_default();
504 let analysis = filter
505 .as_ref()
506 .filter(|_| !indexes.is_empty())
507 .map(|f| analyze_filter_for_index(f, &indexes, params));
508
509 if let Some(a) = analysis {
510 if let Some(pick) = a.pick {
511 let (rows, examined) = execute_index_scan(db, table, &pick, snapshot, tx)?;
513 db.__bump_rows_examined(examined);
514 let mut result = materialize_rows(
515 rows,
516 resolved_filter.as_ref(),
517 params,
518 schema_columns.as_deref(),
519 )?;
520 let mut pushed: smallvec::SmallVec<[std::borrow::Cow<'static, str>; 4]> =
521 smallvec::SmallVec::new();
522 pushed.push(std::borrow::Cow::Owned(pick.pushed_column.clone()));
523 let considered: smallvec::SmallVec<[crate::database::IndexCandidate; 4]> = a
524 .considered
525 .iter()
526 .filter(|c| c.name != pick.name)
527 .cloned()
528 .collect();
529 result.trace = crate::database::QueryTrace {
530 physical_plan: "IndexScan",
531 index_used: Some(pick.name.clone()),
532 predicates_pushed: pushed,
533 indexes_considered: considered,
534 sort_elided: false,
535 };
536 return Ok(result);
537 } else {
538 let rows = db.scan(table, snapshot)?;
540 db.__bump_rows_examined(rows.len() as u64);
541 let mut result = materialize_rows(
542 rows,
543 resolved_filter.as_ref(),
544 params,
545 schema_columns.as_deref(),
546 )?;
547 let considered: smallvec::SmallVec<[crate::database::IndexCandidate; 4]> =
548 a.considered.into_iter().collect();
549 result.trace = crate::database::QueryTrace {
550 physical_plan: "Scan",
551 index_used: None,
552 predicates_pushed: Default::default(),
553 indexes_considered: considered,
554 sort_elided: false,
555 };
556 return Ok(result);
557 }
558 }
559
560 let rows = db.scan(table, snapshot)?;
561 db.__bump_rows_examined(rows.len() as u64);
562 let mut result = materialize_rows(
563 rows,
564 resolved_filter.as_ref(),
565 params,
566 schema_columns.as_deref(),
567 )?;
568 result.trace = crate::database::QueryTrace::scan();
569 Ok(result)
570 }
571 PhysicalPlan::GraphBfs {
572 start_alias,
573 start_expr,
574 start_candidates,
575 steps,
576 filter,
577 } => {
578 let start_uuids = match resolve_uuid(start_expr, params) {
579 Ok(start) => vec![start],
580 Err(Error::PlanError(_))
581 if matches!(
582 start_expr,
583 Expr::Column(contextdb_parser::ast::ColumnRef { table: None, .. })
584 ) =>
585 {
586 if let Some(candidate_plan) = start_candidates {
588 resolve_graph_start_nodes_from_plan(db, candidate_plan, params, tx)?
589 } else if let Some(filter_expr) = filter {
590 let resolved_filter = resolve_in_subqueries(db, filter_expr, params, tx)?;
591 resolve_graph_start_nodes_from_filter(db, &resolved_filter, params)?
592 } else {
593 vec![]
594 }
595 }
596 Err(err) => return Err(err),
597 };
598 if start_uuids.is_empty() {
599 return Ok(QueryResult {
600 columns: vec!["id".to_string(), "depth".to_string()],
601 rows: vec![],
602 rows_affected: 0,
603 trace: crate::database::QueryTrace::scan(),
604 cascade: None,
605 });
606 }
607 let snapshot = db.snapshot();
608 let mut frontier = start_uuids
609 .into_iter()
610 .map(|id| (HashMap::from([(start_alias.clone(), id)]), id, 0_u32))
611 .collect::<Vec<_>>();
612 let bfs_bytes = estimate_bfs_working_bytes(&frontier, steps);
613 db.accountant().try_allocate_for(
614 bfs_bytes,
615 "bfs_frontier",
616 "graph_bfs",
617 "Reduce traversal depth/fan-out or raise MEMORY_LIMIT before running BFS.",
618 )?;
619
620 let result = (|| {
621 for step in steps {
622 let edge_types_ref = if step.edge_types.is_empty() {
623 None
624 } else {
625 Some(step.edge_types.as_slice())
626 };
627 let mut next = Vec::new();
628
629 for (bindings, start, base_depth) in &frontier {
630 let res = db.graph().bfs(
631 *start,
632 edge_types_ref,
633 step.direction,
634 step.min_depth,
635 step.max_depth,
636 snapshot,
637 )?;
638 for node in res.nodes {
639 let total_depth = base_depth.saturating_add(node.depth);
640 let mut next_bindings = bindings.clone();
641 next_bindings.insert(step.target_alias.clone(), node.id);
642 next.push((next_bindings, node.id, total_depth));
643 }
644 }
645
646 frontier = dedupe_graph_frontier(next, steps);
647 if frontier.is_empty() {
648 break;
649 }
650 }
651
652 let mut columns =
653 steps
654 .iter()
655 .fold(vec![format!("{start_alias}.id")], |mut cols, step| {
656 cols.push(format!("{}.id", step.target_alias));
657 cols
658 });
659 columns.push("id".to_string());
660 columns.push("depth".to_string());
661
662 Ok(QueryResult {
663 columns,
664 rows: project_graph_frontier_rows(frontier, start_alias, steps)?,
665 rows_affected: 0,
666 trace: crate::database::QueryTrace::scan(),
667 cascade: None,
668 })
669 })();
670 db.accountant().release(bfs_bytes);
671
672 result
673 }
674 PhysicalPlan::VectorSearch {
675 table,
676 column,
677 query_expr,
678 k,
679 candidates,
680 sort_key,
681 ..
682 }
683 | PhysicalPlan::HnswSearch {
684 table,
685 column,
686 query_expr,
687 k,
688 candidates,
689 sort_key,
690 ..
691 } => {
692 let query_vec = resolve_vector_from_expr(query_expr, params)?;
693 let snapshot = db.snapshot();
694 let mut candidate_trace = None;
695 let candidate_bitmap = if let Some(cands_plan) = candidates {
696 let qr = execute_plan(db, cands_plan, params, tx)?;
697 candidate_trace = Some(qr.trace.clone());
698 let mut bm = RoaringTreemap::new();
699 let row_id_idx = qr.columns.iter().position(|column| {
700 column == "row_id" || column.rsplit('.').next() == Some("row_id")
701 });
702 let id_idx = qr
703 .columns
704 .iter()
705 .position(|column| column == "id" || column.rsplit('.').next() == Some("id"));
706
707 if let Some(idx) = row_id_idx {
708 for row in qr.rows {
709 if let Some(Value::Int64(id)) = row.get(idx) {
710 bm.insert(*id as u64);
711 }
712 }
713 } else if let Some(idx) = id_idx {
714 let uuid_to_row_id = uuid_to_row_id_map(db, table, snapshot)?;
715 for row in qr.rows {
716 if let Some(Value::Uuid(uuid)) = row.get(idx)
717 && let Some(row_id) = uuid_to_row_id.get(uuid)
718 {
719 bm.insert(row_id.0);
720 }
721 }
722 }
723 Some(bm)
724 } else {
725 None
726 };
727
728 let vector_bytes = estimate_vector_search_bytes(query_vec.len(), *k as usize);
729 db.accountant().try_allocate_for(
730 vector_bytes,
731 "vector_search",
732 "search",
733 "Reduce LIMIT/dimensionality or raise MEMORY_LIMIT before vector search.",
734 )?;
735 if let Some(sort_key) = sort_key {
736 let mut semantic_query = crate::database::SemanticQuery::new(
737 table.clone(),
738 column.clone(),
739 query_vec,
740 *k as usize,
741 );
742 semantic_query.sort_key = Some(sort_key.clone());
743 let res = db.semantic_search_with_candidates(semantic_query, candidate_bitmap);
744 db.accountant().release(vector_bytes);
745 let results = res?;
746 let schema_columns = db.table_meta(table).map(|meta| {
747 meta.columns
748 .into_iter()
749 .map(|column| column.name)
750 .collect::<Vec<_>>()
751 });
752 let keys = schema_columns.unwrap_or_else(|| {
753 let mut ks = BTreeSet::new();
754 for result in &results {
755 for key in result.values.keys() {
756 ks.insert(key.clone());
757 }
758 }
759 ks.into_iter().collect()
760 });
761 let mut columns = vec!["row_id".to_string()];
762 columns.extend(keys.iter().cloned());
763 columns.push("score".to_string());
764 let rows = results
765 .into_iter()
766 .map(|result| {
767 let mut out = vec![Value::Int64(result.row_id.0 as i64)];
768 for key in &keys {
769 out.push(result.values.get(key).cloned().unwrap_or(Value::Null));
770 }
771 out.push(Value::Float64(result.rank as f64));
772 out
773 })
774 .collect();
775 return Ok(QueryResult {
776 columns,
777 rows,
778 rows_affected: 0,
779 trace: vector_search_trace("VectorSearch", candidate_trace),
780 cascade: None,
781 });
782 }
783 let res = db.query_vector_strict(
784 contextdb_core::VectorIndexRef::new(table.clone(), column.clone()),
785 &query_vec,
786 *k as usize,
787 candidate_bitmap.as_ref(),
788 db.snapshot(),
789 );
790 db.accountant().release(vector_bytes);
791 let res = res?;
792
793 let result_row_ids = res.iter().map(|(rid, _)| *rid).collect::<Vec<_>>();
795 let result_rows = rows_by_row_id(db, table, &result_row_ids, snapshot)?;
796 let schema_columns = db.table_meta(table).map(|meta| {
797 meta.columns
798 .into_iter()
799 .map(|column| column.name)
800 .collect::<Vec<_>>()
801 });
802 let keys = if let Some(ref sc) = schema_columns {
803 sc.clone()
804 } else {
805 let mut ks = BTreeSet::new();
806 for r in &result_rows {
807 for k in r.values.keys() {
808 ks.insert(k.clone());
809 }
810 }
811 ks.into_iter().collect::<Vec<_>>()
812 };
813
814 let row_map: HashMap<RowId, &VersionedRow> =
815 result_rows.iter().map(|r| (r.row_id, r)).collect();
816
817 let mut columns = vec!["row_id".to_string()];
818 columns.extend(keys.iter().cloned());
819 columns.push("score".to_string());
820
821 let rows = res
822 .into_iter()
823 .filter_map(|(rid, score)| {
824 row_map.get(&rid).map(|row| {
825 let mut out = vec![Value::Int64(rid.0 as i64)];
826 for k in &keys {
827 out.push(row.values.get(k).cloned().unwrap_or(Value::Null));
828 }
829 out.push(Value::Float64(score as f64));
830 out
831 })
832 })
833 .collect();
834
835 Ok(QueryResult {
836 columns,
837 rows,
838 rows_affected: 0,
839 trace: vector_search_trace(
840 if db
841 .__debug_vector_hnsw_len(contextdb_core::VectorIndexRef::new(
842 table.clone(),
843 column.clone(),
844 ))
845 .is_some()
846 {
847 "HNSWSearch"
848 } else {
849 "VectorSearch"
850 },
851 candidate_trace,
852 ),
853 cascade: None,
854 })
855 }
856 PhysicalPlan::MaterializeCte { input, .. } => execute_plan(db, input, params, tx),
857 PhysicalPlan::Project { input, columns } => {
858 let input_result = execute_plan(db, input, params, tx)?;
859 let has_aggregate = columns.iter().any(|column| {
860 matches!(
861 &column.expr,
862 Expr::FunctionCall { name, .. } if name.eq_ignore_ascii_case("count")
863 )
864 });
865 if has_aggregate {
866 if columns.iter().any(|column| {
867 !matches!(
868 &column.expr,
869 Expr::FunctionCall { name, .. } if name.eq_ignore_ascii_case("count")
870 )
871 }) {
872 return Err(Error::PlanError(
873 "mixed aggregate and non-aggregate columns without GROUP BY".to_string(),
874 ));
875 }
876
877 let output_columns = columns
878 .iter()
879 .map(|column| {
880 column.alias.clone().unwrap_or_else(|| match &column.expr {
881 Expr::FunctionCall { name, .. } => name.clone(),
882 _ => "expr".to_string(),
883 })
884 })
885 .collect::<Vec<_>>();
886
887 let aggregate_row = columns
888 .iter()
889 .map(|column| match &column.expr {
890 Expr::FunctionCall { name: _, args } => {
891 let count = if matches!(
892 args.as_slice(),
893 [Expr::Column(contextdb_parser::ast::ColumnRef { table: None, column })]
894 if column == "*"
895 ) {
896 input_result.rows.len() as i64
897 } else {
898 input_result
899 .rows
900 .iter()
901 .filter_map(|row| {
902 args.first().map(|arg| {
903 eval_query_result_expr(
904 arg,
905 row,
906 &input_result.columns,
907 params,
908 )
909 })
910 })
911 .collect::<Result<Vec<_>>>()?
912 .into_iter()
913 .filter(|value| *value != Value::Null)
914 .count() as i64
915 };
916 Ok(Value::Int64(count))
917 }
918 _ => Err(Error::PlanError(
919 "mixed aggregate and non-aggregate columns without GROUP BY"
920 .to_string(),
921 )),
922 })
923 .collect::<Result<Vec<_>>>()?;
924
925 return Ok(QueryResult {
926 columns: output_columns,
927 rows: vec![aggregate_row],
928 rows_affected: 0,
929 trace: input_result.trace.clone(),
930 cascade: None,
931 });
932 }
933
934 let output_columns = columns
935 .iter()
936 .map(|c| {
937 c.alias.clone().unwrap_or_else(|| match &c.expr {
938 Expr::Column(col) => col.column.clone(),
939 _ => "expr".to_string(),
940 })
941 })
942 .collect::<Vec<_>>();
943
944 let mut output_rows = Vec::with_capacity(input_result.rows.len());
945 for row in &input_result.rows {
946 let mut projected = Vec::with_capacity(columns.len());
947 for col in columns {
948 projected.push(eval_project_expr(
949 &col.expr,
950 row,
951 &input_result.columns,
952 params,
953 )?);
954 }
955 output_rows.push(projected);
956 }
957
958 Ok(QueryResult {
959 columns: output_columns,
960 rows: output_rows,
961 rows_affected: 0,
962 trace: input_result.trace.clone(),
963 cascade: None,
964 })
965 }
966 PhysicalPlan::Sort { input, keys } => {
967 let elided = try_elide_sort(db, input, keys, params, tx)?;
971 if let Some(mut result) = elided {
972 result.trace.sort_elided = true;
973 return Ok(result);
974 }
975 let mut input_result = execute_plan(db, input, params, tx)?;
976
977 if input_result.trace.physical_plan == "IndexScan"
982 && let Some(idx_name) = &input_result.trace.index_used
983 && sort_keys_match_index_prefix(db, input, idx_name, keys)
984 {
985 input_result.trace.sort_elided = true;
986 return Ok(input_result);
987 }
988 input_result.rows.sort_by(|left, right| {
989 for key in keys {
990 let Expr::Column(column_ref) = &key.expr else {
991 return Ordering::Equal;
992 };
993 let left_value =
994 match lookup_query_result_column(left, &input_result.columns, column_ref) {
995 Ok(value) => value,
996 Err(_) => return Ordering::Equal,
997 };
998 let right_value = match lookup_query_result_column(
999 right,
1000 &input_result.columns,
1001 column_ref,
1002 ) {
1003 Ok(value) => value,
1004 Err(_) => return Ordering::Equal,
1005 };
1006 let ordering = compare_sort_values(&left_value, &right_value, key.direction);
1007 if ordering != Ordering::Equal {
1008 return ordering;
1009 }
1010 }
1011 Ordering::Equal
1012 });
1013 if input_result.trace.physical_plan != "IndexScan" {
1019 input_result.trace.physical_plan = "Sort";
1020 }
1021 input_result.trace.sort_elided = false;
1022 Ok(input_result)
1023 }
1024 PhysicalPlan::Limit { input, count } => {
1025 let mut input_result = execute_plan(db, input, params, tx)?;
1026 input_result.rows.truncate(*count as usize);
1027 Ok(input_result)
1028 }
1029 PhysicalPlan::Filter { input, predicate } => {
1030 let mut input_result = execute_plan(db, input, params, tx)?;
1031 input_result.rows.retain(|row| {
1032 query_result_row_matches(row, &input_result.columns, predicate, params)
1033 .unwrap_or(false)
1034 });
1035 Ok(input_result)
1036 }
1037 PhysicalPlan::Distinct { input } => {
1038 let input_result = execute_plan(db, input, params, tx)?;
1039 let mut seen = HashSet::<Vec<u8>>::new();
1040 let rows = input_result
1041 .rows
1042 .into_iter()
1043 .filter(|row| seen.insert(distinct_row_key(row)))
1044 .collect();
1045 Ok(QueryResult {
1046 columns: input_result.columns,
1047 rows,
1048 rows_affected: input_result.rows_affected,
1049 trace: input_result.trace,
1050 cascade: None,
1051 })
1052 }
1053 PhysicalPlan::Join {
1054 left,
1055 right,
1056 condition,
1057 join_type,
1058 left_alias,
1059 right_alias,
1060 } => {
1061 let left_result = execute_plan(db, left, params, tx)?;
1062 let right_result = execute_plan(db, right, params, tx)?;
1063 let right_duplicate_names =
1064 duplicate_column_names(&left_result.columns, &right_result.columns);
1065 let right_prefix = right_alias
1066 .clone()
1067 .unwrap_or_else(|| right_table_name(right));
1068 let right_columns = right_result
1069 .columns
1070 .iter()
1071 .map(|column| {
1072 if right_duplicate_names.contains(column) {
1073 format!("{right_prefix}.{column}")
1074 } else {
1075 column.clone()
1076 }
1077 })
1078 .collect::<Vec<_>>();
1079
1080 let mut columns = left_result.columns.clone();
1081 columns.extend(right_columns);
1082
1083 let mut rows = Vec::new();
1084 for left_row in &left_result.rows {
1085 let mut matched = false;
1086 for right_row in &right_result.rows {
1087 let combined = concatenate_rows(left_row, right_row);
1088 if query_result_row_matches(&combined, &columns, condition, params)? {
1089 matched = true;
1090 rows.push(combined);
1091 }
1092 }
1093
1094 if !matched && matches!(join_type, contextdb_planner::JoinType::Left) {
1095 let mut combined = left_row.clone();
1096 combined.extend(std::iter::repeat_n(Value::Null, right_result.columns.len()));
1097 rows.push(combined);
1098 }
1099 }
1100
1101 let output_columns = qualify_join_columns(
1102 &columns,
1103 &left_result.columns,
1104 &right_result.columns,
1105 left_alias,
1106 &right_prefix,
1107 );
1108
1109 Ok(QueryResult {
1110 columns: output_columns,
1111 rows,
1112 rows_affected: 0,
1113 trace: crate::database::QueryTrace::scan(),
1114 cascade: None,
1115 })
1116 }
1117 PhysicalPlan::CreateIndex(p) => exec_create_index(db, p),
1118 PhysicalPlan::DropIndex(p) => exec_drop_index(db, p),
1119 PhysicalPlan::IndexScan {
1120 table,
1121 index,
1122 range: _,
1123 } => {
1124 let _ = (table, index);
1128 Ok(QueryResult {
1129 columns: vec![],
1130 rows: vec![],
1131 rows_affected: 0,
1132 trace: crate::database::QueryTrace {
1133 physical_plan: "IndexScan",
1134 index_used: None,
1135 ..crate::database::QueryTrace::default()
1136 },
1137 cascade: None,
1138 })
1139 }
1140 PhysicalPlan::SetMemoryLimit(val) => {
1141 let limit = match val {
1142 SetMemoryLimitValue::Bytes(bytes) => Some(*bytes),
1143 SetMemoryLimitValue::None => None,
1144 };
1145 db.accountant().set_budget(limit)?;
1146 db.persist_memory_limit(limit)?;
1147 Ok(QueryResult::empty())
1148 }
1149 PhysicalPlan::ShowMemoryLimit => {
1150 let usage = db.accountant().usage();
1151 Ok(QueryResult {
1152 columns: vec![
1153 "limit".to_string(),
1154 "used".to_string(),
1155 "available".to_string(),
1156 "startup_ceiling".to_string(),
1157 ],
1158 rows: vec![vec![
1159 usage
1160 .limit
1161 .map(|value| Value::Int64(value as i64))
1162 .unwrap_or_else(|| Value::Text("none".to_string())),
1163 Value::Int64(usage.used as i64),
1164 usage
1165 .available
1166 .map(|value| Value::Int64(value as i64))
1167 .unwrap_or_else(|| Value::Text("none".to_string())),
1168 usage
1169 .startup_ceiling
1170 .map(|value| Value::Int64(value as i64))
1171 .unwrap_or_else(|| Value::Text("none".to_string())),
1172 ]],
1173 rows_affected: 0,
1174 trace: crate::database::QueryTrace::scan(),
1175 cascade: None,
1176 })
1177 }
1178 PhysicalPlan::SetDiskLimit(val) => {
1179 let limit = match val {
1180 SetDiskLimitValue::Bytes(bytes) => Some(*bytes),
1181 SetDiskLimitValue::None => None,
1182 };
1183 db.set_disk_limit(limit)?;
1184 db.persist_disk_limit(limit)?;
1185 Ok(QueryResult::empty())
1186 }
1187 PhysicalPlan::ShowDiskLimit => {
1188 let limit = db.disk_limit();
1189 let used = db.disk_file_size();
1190 let startup_ceiling = db.disk_limit_startup_ceiling();
1191 Ok(QueryResult {
1192 columns: vec![
1193 "limit".to_string(),
1194 "used".to_string(),
1195 "available".to_string(),
1196 "startup_ceiling".to_string(),
1197 ],
1198 rows: vec![vec![
1199 limit
1200 .map(|value| Value::Int64(value as i64))
1201 .unwrap_or_else(|| Value::Text("none".to_string())),
1202 used.map(|value| Value::Int64(value as i64))
1203 .unwrap_or(Value::Null),
1204 match (limit, used) {
1205 (Some(limit), Some(used)) => {
1206 Value::Int64(limit.saturating_sub(used) as i64)
1207 }
1208 _ => Value::Null,
1209 },
1210 startup_ceiling
1211 .map(|value| Value::Int64(value as i64))
1212 .unwrap_or_else(|| Value::Text("none".to_string())),
1213 ]],
1214 rows_affected: 0,
1215 trace: crate::database::QueryTrace::scan(),
1216 cascade: None,
1217 })
1218 }
1219 PhysicalPlan::SetSyncConflictPolicy(policy) => {
1220 let cp = parse_conflict_policy(policy)?;
1221 db.set_default_conflict_policy(cp);
1222 Ok(QueryResult::empty())
1223 }
1224 PhysicalPlan::ShowSyncConflictPolicy => {
1225 let policies = db.conflict_policies();
1226 let default_str = conflict_policy_to_string(policies.default);
1227 let mut rows = vec![vec![Value::Text(default_str)]];
1228 for (table, policy) in &policies.per_table {
1229 rows.push(vec![Value::Text(format!(
1230 "{}={}",
1231 table,
1232 conflict_policy_to_string(*policy)
1233 ))]);
1234 }
1235 Ok(QueryResult {
1236 columns: vec!["policy".to_string()],
1237 rows,
1238 rows_affected: 0,
1239 trace: crate::database::QueryTrace::scan(),
1240 cascade: None,
1241 })
1242 }
1243 PhysicalPlan::ShowVectorIndexes => {
1244 let rows = db
1245 .vector_index_infos()
1246 .into_iter()
1247 .map(|info| {
1248 vec![
1249 Value::Text(info.index.table),
1250 Value::Text(info.index.column),
1251 Value::Int64(info.dimension as i64),
1252 Value::Text(info.quantization.as_str().to_string()),
1253 Value::Int64(info.vector_count as i64),
1254 Value::Int64(info.bytes as i64),
1255 ]
1256 })
1257 .collect();
1258 Ok(QueryResult {
1259 columns: vec![
1260 "table".to_string(),
1261 "column".to_string(),
1262 "dimension".to_string(),
1263 "quantization".to_string(),
1264 "vector_count".to_string(),
1265 "bytes".to_string(),
1266 ],
1267 rows,
1268 rows_affected: 0,
1269 trace: crate::database::QueryTrace::scan(),
1270 cascade: None,
1271 })
1272 }
1273 PhysicalPlan::Pipeline(plans) => {
1274 let mut last = QueryResult::empty();
1275 for p in plans {
1276 last = execute_plan(db, p, params, tx)?;
1277 }
1278 Ok(last)
1279 }
1280 _ => Err(Error::PlanError(
1281 "unsupported plan node in executor".to_string(),
1282 )),
1283 }
1284}
1285
1286fn eval_project_expr(
1287 expr: &Expr,
1288 row: &[Value],
1289 input_columns: &[String],
1290 params: &HashMap<String, Value>,
1291) -> Result<Value> {
1292 match expr {
1293 Expr::Column(c) => lookup_query_result_column(row, input_columns, c),
1294 Expr::Literal(lit) => resolve_expr(&Expr::Literal(lit.clone()), params),
1295 Expr::Parameter(name) => params
1296 .get(name)
1297 .cloned()
1298 .ok_or_else(|| Error::NotFound(format!("missing parameter: {}", name))),
1299 Expr::BinaryOp { left, op, right } => {
1300 let left = eval_query_result_expr(left, row, input_columns, params)?;
1301 let right = eval_query_result_expr(right, row, input_columns, params)?;
1302 eval_binary_op(op, &left, &right)
1303 }
1304 Expr::UnaryOp { op, operand } => {
1305 let value = eval_query_result_expr(operand, row, input_columns, params)?;
1306 match op {
1307 UnaryOp::Not => Ok(Value::Bool(!value_to_bool(&value))),
1308 UnaryOp::Neg => match value {
1309 Value::Int64(v) => Ok(Value::Int64(-v)),
1310 Value::Float64(v) => Ok(Value::Float64(-v)),
1311 _ => Err(Error::PlanError(
1312 "cannot negate non-numeric value".to_string(),
1313 )),
1314 },
1315 }
1316 }
1317 Expr::FunctionCall { name, args } => {
1318 let values = args
1319 .iter()
1320 .map(|arg| eval_query_result_expr(arg, row, input_columns, params))
1321 .collect::<Result<Vec<_>>>()?;
1322 eval_function(name, &values)
1323 }
1324 Expr::IsNull { expr, negated } => {
1325 let is_null = eval_query_result_expr(expr, row, input_columns, params)? == Value::Null;
1326 Ok(Value::Bool(if *negated { !is_null } else { is_null }))
1327 }
1328 Expr::InList {
1329 expr,
1330 list,
1331 negated,
1332 } => {
1333 let needle = eval_query_result_expr(expr, row, input_columns, params)?;
1334 let matched = list.iter().try_fold(false, |found, item| {
1335 if found {
1336 Ok(true)
1337 } else {
1338 let candidate = eval_query_result_expr(item, row, input_columns, params)?;
1339 Ok(
1340 matches!(compare_values(&needle, &candidate), Some(Ordering::Equal))
1341 || (needle != Value::Null
1342 && candidate != Value::Null
1343 && needle == candidate),
1344 )
1345 }
1346 })?;
1347 Ok(Value::Bool(if *negated { !matched } else { matched }))
1348 }
1349 Expr::Like {
1350 expr,
1351 pattern,
1352 negated,
1353 } => {
1354 let matches = match (
1355 eval_query_result_expr(expr, row, input_columns, params)?,
1356 eval_query_result_expr(pattern, row, input_columns, params)?,
1357 ) {
1358 (Value::Text(value), Value::Text(pattern)) => like_matches(&value, &pattern),
1359 _ => false,
1360 };
1361 Ok(Value::Bool(if *negated { !matches } else { matches }))
1362 }
1363 _ => resolve_expr(expr, params),
1364 }
1365}
1366
1367fn eval_query_result_expr(
1368 expr: &Expr,
1369 row: &[Value],
1370 input_columns: &[String],
1371 params: &HashMap<String, Value>,
1372) -> Result<Value> {
1373 match expr {
1374 Expr::Column(c) => lookup_query_result_column(row, input_columns, c),
1375 Expr::Literal(lit) => resolve_expr(&Expr::Literal(lit.clone()), params),
1376 Expr::Parameter(name) => params
1377 .get(name)
1378 .cloned()
1379 .ok_or_else(|| Error::NotFound(format!("missing parameter: {}", name))),
1380 Expr::FunctionCall { name, args } => {
1381 let values = args
1382 .iter()
1383 .map(|arg| eval_query_result_expr(arg, row, input_columns, params))
1384 .collect::<Result<Vec<_>>>()?;
1385 eval_function(name, &values)
1386 }
1387 _ => resolve_expr(expr, params),
1388 }
1389}
1390
1391fn exec_insert(
1392 db: &Database,
1393 p: &InsertPlan,
1394 params: &HashMap<String, Value>,
1395 tx: Option<TxId>,
1396) -> Result<QueryResult> {
1397 db.check_disk_budget("INSERT")?;
1398 let txid = tx.ok_or_else(|| Error::Other("missing tx for insert".to_string()))?;
1399
1400 let insert_meta = db
1401 .table_meta(&p.table)
1402 .ok_or_else(|| Error::TableNotFound(p.table.clone()))?;
1403 let columns: Vec<String> = if p.columns.is_empty() {
1406 insert_meta.columns.iter().map(|c| c.name.clone()).collect()
1407 } else {
1408 p.columns.clone()
1409 };
1410
1411 let current_tx_max = Some(db.committed_watermark());
1413 let route_inserts_to_graph =
1414 p.table.eq_ignore_ascii_case("edges") || !insert_meta.dag_edge_types.is_empty();
1415 let vector_columns = vector_columns_for_meta(&insert_meta);
1416 let has_column_defaults = insert_meta
1417 .columns
1418 .iter()
1419 .any(|column| column.default.is_some());
1420
1421 if !vector_columns.is_empty() {
1422 for row in &p.values {
1423 let mut values = HashMap::new();
1424 for (idx, expr) in row.iter().enumerate() {
1425 let col = columns
1426 .get(idx)
1427 .ok_or_else(|| Error::PlanError("column/value count mismatch".to_string()))?;
1428 let v = resolve_expr(expr, params)?;
1429 values.insert(
1430 col.clone(),
1431 coerce_value_for_column_with_meta(
1432 &p.table,
1433 &insert_meta,
1434 col,
1435 v,
1436 current_tx_max,
1437 Some(txid),
1438 )?,
1439 );
1440 }
1441 if has_column_defaults {
1442 apply_missing_column_defaults(db, &p.table, &mut values, Some(txid))?;
1443 }
1444 validate_vector_columns(db, &p.table, &values)?;
1445 }
1446 }
1447
1448 let mut rows_affected = 0;
1449 for row in &p.values {
1450 let mut values = HashMap::new();
1451 for (idx, expr) in row.iter().enumerate() {
1452 let col = columns
1453 .get(idx)
1454 .ok_or_else(|| Error::PlanError("column/value count mismatch".to_string()))?;
1455 let v = resolve_expr(expr, params)?;
1456 values.insert(
1457 col.clone(),
1458 coerce_value_for_column_with_meta(
1459 &p.table,
1460 &insert_meta,
1461 col,
1462 v,
1463 current_tx_max,
1464 Some(txid),
1465 )?,
1466 );
1467 }
1468
1469 if has_column_defaults {
1470 apply_missing_column_defaults(db, &p.table, &mut values, Some(txid))?;
1471 }
1472
1473 if !vector_columns.is_empty() {
1474 validate_vector_columns(db, &p.table, &values)?;
1475 }
1476 let row_bytes = estimate_row_bytes_for_meta(&values, &insert_meta, false);
1477 db.accountant().try_allocate_for(
1478 row_bytes,
1479 "insert",
1480 "row_insert",
1481 "Reduce row size or raise MEMORY_LIMIT before inserting more data.",
1482 )?;
1483 let checkpoint = db.write_set_checkpoint(txid)?;
1484 let mut vector_allocations = Vec::new();
1485 let graph_edge = if route_inserts_to_graph {
1486 match (
1487 values.get("source_id"),
1488 values.get("target_id"),
1489 values.get("edge_type"),
1490 ) {
1491 (
1492 Some(Value::Uuid(source)),
1493 Some(Value::Uuid(target)),
1494 Some(Value::Text(edge_type)),
1495 ) => Some((*source, *target, edge_type.clone())),
1496 _ => None,
1497 }
1498 } else {
1499 None
1500 };
1501 let vector_values = vector_values_for_table(db, &p.table, &values);
1502
1503 let row_id = if let Some(on_conflict) = &p.on_conflict {
1504 let conflict_col = &on_conflict.columns[0];
1505 let conflict_value = values
1506 .get(conflict_col)
1507 .ok_or_else(|| Error::Other("conflict column not in values".to_string()))?;
1508 let existing =
1509 db.point_lookup(&p.table, conflict_col, conflict_value, db.snapshot())?;
1510 let existing_row_id = existing.as_ref().map(|row| row.row_id);
1511 let existing_has_vector = existing
1512 .as_ref()
1513 .is_some_and(|row| db.has_live_vector(row.row_id, db.snapshot()));
1514 let upsert_values = if let Some(existing_row) = existing.as_ref() {
1515 match apply_on_conflict_updates(
1516 db,
1517 &p.table,
1518 values.clone(),
1519 existing_row,
1520 on_conflict,
1521 params,
1522 Some(txid),
1523 ) {
1524 Ok(v) => v,
1525 Err(err) => {
1526 db.accountant().release(row_bytes);
1527 let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1528 return Err(err);
1529 }
1530 }
1531 } else {
1532 values.clone()
1533 };
1534
1535 match db.upsert_row(txid, &p.table, conflict_col, upsert_values) {
1536 Ok(UpsertResult::Inserted) => {
1537 db.point_lookup_in_tx(
1538 txid,
1539 &p.table,
1540 conflict_col,
1541 conflict_value,
1542 db.snapshot(),
1543 )?
1544 .ok_or_else(|| {
1545 Error::Other("inserted upsert row not visible in tx".to_string())
1546 })?
1547 .row_id
1548 }
1549 Ok(UpsertResult::Updated) => {
1550 if existing_has_vector && let Some(existing_row_id) = existing_row_id {
1551 for index in vector_indexes_for_table(db, &p.table) {
1552 if db
1553 .vector_store_live_entry_for_row(
1554 &index,
1555 existing_row_id,
1556 db.snapshot(),
1557 )
1558 .is_some()
1559 {
1560 db.delete_vector(txid, index, existing_row_id)?;
1561 }
1562 }
1563 }
1564 db.point_lookup_in_tx(
1565 txid,
1566 &p.table,
1567 conflict_col,
1568 conflict_value,
1569 db.snapshot(),
1570 )?
1571 .ok_or_else(|| {
1572 Error::Other("updated upsert row not visible in tx".to_string())
1573 })?
1574 .row_id
1575 }
1576 Ok(UpsertResult::NoOp) => {
1577 db.accountant().release(row_bytes);
1578 RowId(0)
1579 }
1580 Err(err) => {
1581 db.accountant().release(row_bytes);
1582 return Err(err);
1583 }
1584 }
1585 } else {
1586 match db.insert_row_with_unique_noop(txid, &p.table, values) {
1587 Ok(InsertRowResult::Inserted(row_id)) => row_id,
1588 Ok(InsertRowResult::NoOp) => {
1589 db.accountant().release(row_bytes);
1590 continue;
1591 }
1592 Err(err) => {
1593 db.accountant().release(row_bytes);
1594 return Err(err);
1595 }
1596 }
1597 };
1598
1599 if let Some((source, target, edge_type)) = graph_edge {
1600 match db.insert_edge(txid, source, target, edge_type, HashMap::new()) {
1601 Ok(true) => {}
1602 Ok(false) => {
1603 let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1604 db.accountant().release(row_bytes);
1605 continue;
1606 }
1607 Err(err) => {
1608 let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1609 db.accountant().release(row_bytes);
1610 return Err(err);
1611 }
1612 }
1613 }
1614
1615 if row_id != RowId(0) {
1616 for (column, v) in &vector_values {
1617 let index = contextdb_core::VectorIndexRef::new(&p.table, column.clone());
1618 let vector_bytes = db.vector_insert_accounted_bytes(&index, v.len());
1619 if let Err(err) = db.insert_vector_strict(txid, index, row_id, v.clone()) {
1620 let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1621 db.accountant().release(row_bytes);
1622 release_accounted_bytes(db, &vector_allocations);
1623 return Err(err);
1624 }
1625 vector_allocations.push(vector_bytes);
1626 }
1627 }
1628
1629 rows_affected += 1;
1630 }
1631
1632 Ok(QueryResult::empty_with_affected(rows_affected))
1633}
1634
1635fn exec_delete(
1636 db: &Database,
1637 p: &DeletePlan,
1638 params: &HashMap<String, Value>,
1639 tx: Option<TxId>,
1640) -> Result<QueryResult> {
1641 let txid = tx.ok_or_else(|| Error::Other("missing tx for delete".to_string()))?;
1642 let snapshot = db.snapshot();
1643 let rows = db.scan(&p.table, snapshot)?;
1644 let resolved_where = p
1645 .where_clause
1646 .as_ref()
1647 .map(|expr| resolve_in_subqueries(db, expr, params, tx))
1648 .transpose()?;
1649 let matched: Vec<_> = rows
1650 .into_iter()
1651 .filter(|r| {
1652 resolved_where
1653 .as_ref()
1654 .is_none_or(|w| row_matches(r, w, params).unwrap_or(false))
1655 })
1656 .collect();
1657
1658 for row in &matched {
1659 for index in vector_indexes_for_table(db, &p.table) {
1660 if db
1661 .vector_store_live_entry_for_row(&index, row.row_id, snapshot)
1662 .is_some()
1663 {
1664 db.delete_vector(txid, index, row.row_id)?;
1665 }
1666 }
1667 db.delete_row(txid, &p.table, row.row_id)?;
1668 }
1669
1670 Ok(QueryResult::empty_with_affected(matched.len() as u64))
1671}
1672
1673fn exec_update(
1674 db: &Database,
1675 p: &UpdatePlan,
1676 params: &HashMap<String, Value>,
1677 tx: Option<TxId>,
1678) -> Result<QueryResult> {
1679 db.check_disk_budget("UPDATE")?;
1680 let txid = tx.ok_or_else(|| Error::Other("missing tx for update".to_string()))?;
1681 let snapshot = db.snapshot();
1682 let rows = db.scan_in_tx(txid, &p.table, snapshot)?;
1685 let resolved_where = p
1686 .where_clause
1687 .as_ref()
1688 .map(|expr| resolve_in_subqueries(db, expr, params, tx))
1689 .transpose()?;
1690 let matched: Vec<_> = rows
1691 .into_iter()
1692 .filter(|r| {
1693 resolved_where
1694 .as_ref()
1695 .is_none_or(|w| row_matches(r, w, params).unwrap_or(false))
1696 })
1697 .collect();
1698
1699 let current_tx_max = Some(db.committed_watermark());
1700
1701 for row in &matched {
1702 let mut values = row.values.clone();
1703 for (k, vexpr) in &p.assignments {
1704 let value = eval_assignment_expr(vexpr, &row.values, params)?;
1705 values.insert(
1706 k.clone(),
1707 coerce_value_for_column(db, &p.table, k, value, current_tx_max, Some(txid))?,
1708 );
1709 }
1710 validate_update_state_transition(db, &p.table, row, &values)?;
1711 let row_uuid = values.get("id").and_then(Value::as_uuid).copied();
1712 let new_state = db
1713 .table_meta(&p.table)
1714 .as_ref()
1715 .and_then(|meta| meta.state_machine.as_ref())
1716 .and_then(|sm| values.get(&sm.column))
1717 .and_then(Value::as_text)
1718 .map(std::borrow::ToOwned::to_owned);
1719
1720 validate_vector_columns(db, &p.table, &values)?;
1721 let assigned_vector_values: Vec<(String, Vec<f32>)> = p
1722 .assignments
1723 .iter()
1724 .filter_map(|(column, _)| match values.get(column) {
1725 Some(Value::Vector(vector)) => Some((column.clone(), vector.clone())),
1726 _ => None,
1727 })
1728 .collect();
1729 let assigned_vector_columns: HashSet<String> = assigned_vector_values
1730 .iter()
1731 .map(|(column, _)| column.clone())
1732 .collect();
1733 let new_row_bytes = estimate_table_row_bytes(db, &p.table, &values)?;
1734 db.accountant().try_allocate_for(
1735 new_row_bytes,
1736 "update",
1737 "row_replace",
1738 "Reduce row growth or raise MEMORY_LIMIT before updating this row.",
1739 )?;
1740 let checkpoint = db.write_set_checkpoint(txid)?;
1741 let mut vector_allocations = Vec::new();
1742
1743 if let Err(err) = db.delete_row(txid, &p.table, row.row_id) {
1744 db.accountant().release(new_row_bytes);
1745 return Err(err);
1746 }
1747 for column in &assigned_vector_columns {
1748 if let Err(err) = db.delete_vector(
1749 txid,
1750 contextdb_core::VectorIndexRef::new(&p.table, column.clone()),
1751 row.row_id,
1752 ) {
1753 db.accountant().release(new_row_bytes);
1754 let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1755 return Err(err);
1756 }
1757 }
1758
1759 let new_row_id = match db.insert_row_replacing(txid, &p.table, values, row.row_id) {
1760 Ok(row_id) => row_id,
1761 Err(err) => {
1762 db.accountant().release(new_row_bytes);
1763 let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1764 return Err(err);
1765 }
1766 };
1767 for index in vector_indexes_for_table(db, &p.table) {
1768 if assigned_vector_columns.contains(&index.column) {
1769 continue;
1770 }
1771 if let Some(old_entry) =
1772 db.vector_store_live_entry_for_row(&index, row.row_id, snapshot)
1773 && old_entry.deleted_tx.is_none()
1774 && let Err(err) = db.move_vector(txid, index, row.row_id, new_row_id)
1775 {
1776 db.accountant().release(new_row_bytes);
1777 let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1778 return Err(err);
1779 }
1780 }
1781 for (column, vector) in assigned_vector_values {
1782 let index = contextdb_core::VectorIndexRef::new(&p.table, column);
1783 let vector_bytes = db.vector_insert_accounted_bytes(&index, vector.len());
1784 if let Err(err) = db.insert_vector_strict(txid, index, new_row_id, vector) {
1785 db.accountant().release(new_row_bytes);
1786 release_accounted_bytes(db, &vector_allocations);
1787 let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1788 return Err(err);
1789 }
1790 vector_allocations.push(vector_bytes);
1791 }
1792 if let Err(err) =
1793 db.propagate_state_change_if_needed(txid, &p.table, row_uuid, new_state.as_deref())
1794 {
1795 db.accountant().release(new_row_bytes);
1796 release_accounted_bytes(db, &vector_allocations);
1797 let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1798 return Err(err);
1799 }
1800 }
1801
1802 Ok(QueryResult::empty_with_affected(matched.len() as u64))
1803}
1804
1805fn exec_create_index(
1806 db: &Database,
1807 plan: &contextdb_planner::CreateIndexPlan,
1808) -> Result<QueryResult> {
1809 for prefix in ["__pk_", "__unique_"] {
1812 if plan.name.starts_with(prefix) {
1813 return Err(Error::ReservedIndexName {
1814 table: plan.table.clone(),
1815 name: plan.name.clone(),
1816 prefix: prefix.to_string(),
1817 });
1818 }
1819 }
1820
1821 let meta = db
1825 .table_meta(&plan.table)
1826 .ok_or_else(|| Error::TableNotFound(plan.table.clone()))?;
1827
1828 for (col_name, _) in &plan.columns {
1830 if !meta.columns.iter().any(|c| c.name == *col_name) {
1831 return Err(Error::ColumnNotFound {
1832 table: plan.table.clone(),
1833 column: col_name.clone(),
1834 });
1835 }
1836 }
1837
1838 for (col_name, _) in &plan.columns {
1840 let col = meta
1841 .columns
1842 .iter()
1843 .find(|c| c.name == *col_name)
1844 .expect("column existence verified above");
1845 if matches!(col.column_type, ColumnType::Json | ColumnType::Vector(_)) {
1846 return Err(Error::ColumnNotIndexable {
1847 table: plan.table.clone(),
1848 column: col_name.clone(),
1849 column_type: col.column_type.clone(),
1850 });
1851 }
1852 }
1853
1854 if meta.indexes.iter().any(|i| i.name == plan.name) {
1856 return Err(Error::DuplicateIndex {
1857 table: plan.table.clone(),
1858 index: plan.name.clone(),
1859 });
1860 }
1861
1862 {
1865 let store = db.relational_store();
1866 let mut metas = store.table_meta.write();
1867 let m = metas
1868 .get_mut(&plan.table)
1869 .ok_or_else(|| Error::TableNotFound(plan.table.clone()))?;
1870 m.indexes.push(contextdb_core::IndexDecl {
1871 name: plan.name.clone(),
1872 columns: plan.columns.clone(),
1873 kind: contextdb_core::IndexKind::UserDeclared,
1874 });
1875 }
1876 db.relational_store()
1877 .create_index_storage(&plan.table, &plan.name, plan.columns.clone());
1878 db.relational_store().rebuild_index(&plan.table, &plan.name);
1879
1880 if let Some(table_meta) = db.table_meta(&plan.table) {
1881 db.persist_table_meta(&plan.table, &table_meta)?;
1882 }
1883
1884 db.allocate_ddl_lsn(|lsn| {
1885 db.log_create_index_ddl(&plan.table, &plan.name, &plan.columns, lsn)
1886 })?;
1887
1888 db.clear_statement_cache();
1889 Ok(QueryResult::empty_with_affected(0))
1890}
1891
1892fn exec_drop_index(db: &Database, plan: &contextdb_planner::DropIndexPlan) -> Result<QueryResult> {
1893 let meta = db
1894 .table_meta(&plan.table)
1895 .ok_or_else(|| Error::TableNotFound(plan.table.clone()))?;
1896 let exists = meta.indexes.iter().any(|i| i.name == plan.name);
1897 if !exists {
1898 if plan.if_exists {
1899 return Ok(QueryResult::empty_with_affected(0));
1900 }
1901 return Err(Error::IndexNotFound {
1902 table: plan.table.clone(),
1903 index: plan.name.clone(),
1904 });
1905 }
1906 if let Some(block) = rank_policy_drop_index_blocker(db, &plan.table, &plan.name) {
1907 return Err(block);
1908 }
1909 {
1910 let store = db.relational_store();
1911 let mut metas = store.table_meta.write();
1912 if let Some(m) = metas.get_mut(&plan.table) {
1913 m.indexes.retain(|i| i.name != plan.name);
1914 }
1915 }
1916 db.relational_store()
1917 .drop_index_storage(&plan.table, &plan.name);
1918 if let Some(table_meta) = db.table_meta(&plan.table) {
1919 db.persist_table_meta(&plan.table, &table_meta)?;
1920 }
1921 db.allocate_ddl_lsn(|lsn| db.log_drop_index_ddl(&plan.table, &plan.name, lsn))?;
1922 db.clear_statement_cache();
1923 Ok(QueryResult::empty_with_affected(0))
1924}
1925
1926fn estimate_table_row_bytes(
1927 db: &Database,
1928 table: &str,
1929 values: &HashMap<String, Value>,
1930) -> Result<usize> {
1931 let meta = db
1932 .table_meta(table)
1933 .ok_or_else(|| Error::TableNotFound(table.to_string()))?;
1934 Ok(estimate_row_bytes_for_meta(values, &meta, false))
1935}
1936
1937#[derive(Debug, Clone)]
1943pub(crate) enum IndexPredicateShape {
1944 Equality(Value),
1945 NotEqual(Value),
1946 Range {
1947 lower: std::ops::Bound<Value>,
1948 upper: std::ops::Bound<Value>,
1949 },
1950 InList(Vec<Value>),
1951 IsNull,
1952 IsNotNull,
1953}
1954
1955impl IndexPredicateShape {
1956 fn selectivity_tier(&self) -> u8 {
1958 match self {
1959 IndexPredicateShape::Equality(_) | IndexPredicateShape::InList(_) => 0,
1960 IndexPredicateShape::Range { .. } | IndexPredicateShape::NotEqual(_) => 1,
1961 IndexPredicateShape::IsNull | IndexPredicateShape::IsNotNull => 2,
1962 }
1963 }
1964}
1965
1966#[derive(Debug, Clone)]
1967struct IndexPick {
1968 name: String,
1969 columns: Vec<(String, contextdb_core::SortDirection)>,
1970 shape: IndexPredicateShape,
1972 pushed_column: String,
1974}
1975
1976struct IndexAnalysis {
1979 pick: Option<IndexPick>,
1980 considered: Vec<crate::database::IndexCandidate>,
1981}
1982
1983fn coerce_pick_shape_to_column_type(
1990 db: &Database,
1991 table: &str,
1992 pick: &IndexPick,
1993) -> Result<IndexPick> {
1994 use std::ops::Bound;
1995 let col = &pick.pushed_column;
1996 let coerce = |v: Value| coerce_value_for_column(db, table, col, v, None, None);
1997 let new_shape = match &pick.shape {
1998 IndexPredicateShape::Equality(v) => IndexPredicateShape::Equality(coerce(v.clone())?),
1999 IndexPredicateShape::InList(vs) => IndexPredicateShape::InList(
2000 vs.iter()
2001 .cloned()
2002 .map(&coerce)
2003 .collect::<Result<Vec<_>>>()?,
2004 ),
2005 IndexPredicateShape::Range { lower, upper } => {
2006 let lower = match lower {
2007 Bound::Included(v) => Bound::Included(coerce(v.clone())?),
2008 Bound::Excluded(v) => Bound::Excluded(coerce(v.clone())?),
2009 Bound::Unbounded => Bound::Unbounded,
2010 };
2011 let upper = match upper {
2012 Bound::Included(v) => Bound::Included(coerce(v.clone())?),
2013 Bound::Excluded(v) => Bound::Excluded(coerce(v.clone())?),
2014 Bound::Unbounded => Bound::Unbounded,
2015 };
2016 IndexPredicateShape::Range { lower, upper }
2017 }
2018 IndexPredicateShape::NotEqual(v) => IndexPredicateShape::NotEqual(coerce(v.clone())?),
2019 IndexPredicateShape::IsNull => IndexPredicateShape::IsNull,
2020 IndexPredicateShape::IsNotNull => IndexPredicateShape::IsNotNull,
2021 };
2022 Ok(IndexPick {
2023 name: pick.name.clone(),
2024 columns: pick.columns.clone(),
2025 shape: new_shape,
2026 pushed_column: pick.pushed_column.clone(),
2027 })
2028}
2029
2030fn analyze_filter_for_index(
2033 filter: &Expr,
2034 indexes: &[contextdb_core::IndexDecl],
2035 params: &HashMap<String, Value>,
2036) -> IndexAnalysis {
2037 use std::borrow::Cow;
2038 let mut considered: Vec<crate::database::IndexCandidate> = Vec::new();
2039
2040 let conjuncts = split_conjuncts(filter);
2042 let mut conjunct_shapes: Vec<(String, IndexPredicateShape)> = Vec::new();
2043 for conjunct in &conjuncts {
2044 if let Some((col, shape)) = classify_index_predicate(conjunct, params) {
2045 conjunct_shapes.push((col, shape));
2046 }
2047 }
2048
2049 let mut candidates: Vec<(IndexPick, u8, usize)> = Vec::new();
2051 for (i_idx, decl) in indexes.iter().enumerate() {
2052 let first_col = match decl.columns.first() {
2053 Some((c, _)) => c.clone(),
2054 None => continue,
2055 };
2056 let matching: Vec<&(String, IndexPredicateShape)> = conjunct_shapes
2058 .iter()
2059 .filter(|(c, _)| c == &first_col)
2060 .collect();
2061 if matching.is_empty() {
2062 let reason = classify_rejection_reason(filter, &first_col);
2066 considered.push(crate::database::IndexCandidate {
2067 name: decl.name.clone(),
2068 rejected_reason: Cow::Borrowed(reason),
2069 });
2070 continue;
2071 }
2072 let shape = combine_shapes(matching.iter().map(|(_, s)| s.clone()).collect());
2074 let tier = shape.selectivity_tier();
2075 candidates.push((
2076 IndexPick {
2077 name: decl.name.clone(),
2078 columns: decl.columns.clone(),
2079 shape,
2080 pushed_column: first_col.clone(),
2081 },
2082 tier,
2083 i_idx,
2084 ));
2085 }
2086
2087 let pick = candidates
2089 .into_iter()
2090 .min_by(|a, b| a.1.cmp(&b.1).then(a.2.cmp(&b.2)))
2091 .map(|(p, _, _)| p);
2092
2093 IndexAnalysis { pick, considered }
2094}
2095
2096fn combine_shapes(mut shapes: Vec<IndexPredicateShape>) -> IndexPredicateShape {
2099 shapes.sort_by_key(|s| s.selectivity_tier());
2101 let head = shapes.remove(0);
2102 if let IndexPredicateShape::Range {
2104 mut lower,
2105 mut upper,
2106 } = head.clone()
2107 {
2108 for s in shapes {
2109 if let IndexPredicateShape::Range { lower: l, upper: u } = s {
2110 lower = tighter_lower(&lower, &l);
2112 upper = tighter_upper(&upper, &u);
2113 }
2114 }
2115 return IndexPredicateShape::Range { lower, upper };
2116 }
2117 head
2118}
2119
2120fn tighter_lower(a: &std::ops::Bound<Value>, b: &std::ops::Bound<Value>) -> std::ops::Bound<Value> {
2121 use std::ops::Bound;
2122 match (a, b) {
2123 (Bound::Unbounded, _) => b.clone(),
2124 (_, Bound::Unbounded) => a.clone(),
2125 (Bound::Included(va), Bound::Included(vb)) => {
2126 if compare_values(va, vb).is_some_and(|o| o == std::cmp::Ordering::Greater) {
2127 a.clone()
2128 } else {
2129 b.clone()
2130 }
2131 }
2132 (Bound::Excluded(va), Bound::Excluded(vb)) => {
2133 if compare_values(va, vb).is_some_and(|o| o == std::cmp::Ordering::Greater) {
2134 a.clone()
2135 } else {
2136 b.clone()
2137 }
2138 }
2139 (Bound::Included(va), Bound::Excluded(vb)) => {
2140 if compare_values(va, vb).is_some_and(|o| o == std::cmp::Ordering::Greater) {
2141 a.clone()
2142 } else {
2143 b.clone()
2144 }
2145 }
2146 (Bound::Excluded(va), Bound::Included(vb)) => {
2147 if compare_values(va, vb).is_some_and(|o| o == std::cmp::Ordering::Less) {
2148 b.clone()
2149 } else {
2150 a.clone()
2151 }
2152 }
2153 }
2154}
2155
2156fn tighter_upper(a: &std::ops::Bound<Value>, b: &std::ops::Bound<Value>) -> std::ops::Bound<Value> {
2157 use std::ops::Bound;
2158 match (a, b) {
2159 (Bound::Unbounded, _) => b.clone(),
2160 (_, Bound::Unbounded) => a.clone(),
2161 (Bound::Included(va), Bound::Included(vb)) => {
2162 if compare_values(va, vb).is_some_and(|o| o == std::cmp::Ordering::Less) {
2163 a.clone()
2164 } else {
2165 b.clone()
2166 }
2167 }
2168 (Bound::Excluded(va), Bound::Excluded(vb)) => {
2169 if compare_values(va, vb).is_some_and(|o| o == std::cmp::Ordering::Less) {
2170 a.clone()
2171 } else {
2172 b.clone()
2173 }
2174 }
2175 (Bound::Included(va), Bound::Excluded(vb)) => {
2176 if compare_values(va, vb).is_some_and(|o| o == std::cmp::Ordering::Less) {
2177 a.clone()
2178 } else {
2179 b.clone()
2180 }
2181 }
2182 (Bound::Excluded(va), Bound::Included(vb)) => {
2183 if compare_values(va, vb).is_some_and(|o| o == std::cmp::Ordering::Greater) {
2184 b.clone()
2185 } else {
2186 a.clone()
2187 }
2188 }
2189 }
2190}
2191
2192fn split_conjuncts(expr: &Expr) -> Vec<Expr> {
2194 match expr {
2195 Expr::BinaryOp {
2196 left,
2197 op: BinOp::And,
2198 right,
2199 } => {
2200 let mut out = split_conjuncts(left);
2201 out.extend(split_conjuncts(right));
2202 out
2203 }
2204 other => vec![other.clone()],
2205 }
2206}
2207
2208fn classify_index_predicate(
2211 expr: &Expr,
2212 params: &HashMap<String, Value>,
2213) -> Option<(String, IndexPredicateShape)> {
2214 match expr {
2215 Expr::BinaryOp { left, op, right } => {
2216 let col = extract_simple_col_ref(left)?;
2217 if !is_literal_or_param(right) {
2219 return None;
2220 }
2221 let rhs = resolve_simple_rhs(right, params)?;
2222 let shape = match op {
2223 BinOp::Eq => IndexPredicateShape::Equality(rhs),
2224 BinOp::Neq => IndexPredicateShape::NotEqual(rhs),
2225 BinOp::Lt => IndexPredicateShape::Range {
2226 lower: std::ops::Bound::Unbounded,
2227 upper: std::ops::Bound::Excluded(rhs),
2228 },
2229 BinOp::Lte => IndexPredicateShape::Range {
2230 lower: std::ops::Bound::Unbounded,
2231 upper: std::ops::Bound::Included(rhs),
2232 },
2233 BinOp::Gt => IndexPredicateShape::Range {
2234 lower: std::ops::Bound::Excluded(rhs),
2235 upper: std::ops::Bound::Unbounded,
2236 },
2237 BinOp::Gte => IndexPredicateShape::Range {
2238 lower: std::ops::Bound::Included(rhs),
2239 upper: std::ops::Bound::Unbounded,
2240 },
2241 _ => return None,
2242 };
2243 Some((col, shape))
2244 }
2245 Expr::InList {
2246 expr: e,
2247 list,
2248 negated: false,
2249 } => {
2250 let col = extract_simple_col_ref(e)?;
2251 let mut values = Vec::with_capacity(list.len());
2252 for v in list {
2253 if !is_literal_or_param(v) {
2254 return None;
2255 }
2256 values.push(resolve_simple_rhs(v, params)?);
2257 }
2258 Some((col, IndexPredicateShape::InList(values)))
2259 }
2260 Expr::IsNull { expr: e, negated } => {
2261 let col = extract_simple_col_ref(e)?;
2262 Some((
2263 col,
2264 if *negated {
2265 IndexPredicateShape::IsNotNull
2266 } else {
2267 IndexPredicateShape::IsNull
2268 },
2269 ))
2270 }
2271 _ => None,
2272 }
2273}
2274
2275fn classify_rejection_reason(filter: &Expr, column: &str) -> &'static str {
2278 fn walk(expr: &Expr, column: &str) -> Option<&'static str> {
2281 match expr {
2282 Expr::BinaryOp {
2283 left,
2284 op: BinOp::And | BinOp::Or,
2285 right,
2286 } => walk(left, column).or_else(|| walk(right, column)),
2287 Expr::BinaryOp { left, op, right } => {
2288 if expr_uses_arithmetic_on(left, column) || expr_uses_arithmetic_on(right, column) {
2291 return Some("arithmetic in predicate");
2292 }
2293 if expr_uses_function_on(left, column) || expr_uses_function_on(right, column) {
2295 return Some("function call in predicate");
2296 }
2297 if mentions_column_ref(left, column) || mentions_column_ref(right, column) {
2299 let left_is_col = extract_simple_col_ref(left).as_deref() == Some(column);
2300 let right_is_col_ref = matches!(right.as_ref(), Expr::Column(_));
2301 if left_is_col && right_is_col_ref {
2302 return Some("non-literal rhs");
2303 }
2304 }
2305 let _ = op;
2306 None
2307 }
2308 Expr::Like { expr: e, .. } => {
2309 if mentions_column_ref(e, column) {
2310 Some("LIKE is residual-only")
2311 } else {
2312 None
2313 }
2314 }
2315 Expr::InSubquery { expr: e, .. } => {
2316 if mentions_column_ref(e, column) {
2317 Some("non-literal rhs")
2318 } else {
2319 None
2320 }
2321 }
2322 _ => None,
2323 }
2324 }
2325 walk(filter, column).unwrap_or("first column not in WHERE")
2326}
2327
2328fn extract_simple_col_ref(expr: &Expr) -> Option<String> {
2329 match expr {
2330 Expr::Column(r) => Some(r.column.clone()),
2331 _ => None,
2332 }
2333}
2334
2335fn is_literal_or_param(expr: &Expr) -> bool {
2336 match expr {
2337 Expr::Literal(_) | Expr::Parameter(_) => true,
2338 Expr::FunctionCall { name, args } => {
2339 matches!(name.as_str(), "__add" | "__sub" | "__mul" | "__div")
2342 && args.iter().all(is_literal_or_param)
2343 }
2344 _ => false,
2345 }
2346}
2347
2348fn resolve_simple_rhs(expr: &Expr, params: &HashMap<String, Value>) -> Option<Value> {
2349 match expr {
2350 Expr::Literal(lit) => Some(match lit {
2351 Literal::Null => Value::Null,
2352 Literal::Bool(b) => Value::Bool(*b),
2353 Literal::Integer(i) => Value::Int64(*i),
2354 Literal::Real(f) => Value::Float64(*f),
2355 Literal::Text(s) => Value::Text(s.clone()),
2356 Literal::Vector(_) => return None,
2357 }),
2358 Expr::Parameter(name) => params.get(name).cloned(),
2359 Expr::FunctionCall { name, args }
2360 if matches!(name.as_str(), "__add" | "__sub" | "__mul" | "__div") =>
2361 {
2362 if args.len() != 2 {
2363 return None;
2364 }
2365 let a = resolve_simple_rhs(&args[0], params)?;
2366 let b = resolve_simple_rhs(&args[1], params)?;
2367 match (a, b, name.as_str()) {
2368 (Value::Int64(x), Value::Int64(y), "__add") => {
2369 Some(Value::Int64(x.wrapping_add(y)))
2370 }
2371 (Value::Int64(x), Value::Int64(y), "__sub") => {
2372 Some(Value::Int64(x.wrapping_sub(y)))
2373 }
2374 (Value::Int64(x), Value::Int64(y), "__mul") => {
2375 Some(Value::Int64(x.wrapping_mul(y)))
2376 }
2377 (Value::Int64(x), Value::Int64(y), "__div") if y != 0 => Some(Value::Int64(x / y)),
2378 (Value::Float64(x), Value::Float64(y), "__add") => Some(Value::Float64(x + y)),
2379 (Value::Float64(x), Value::Float64(y), "__sub") => Some(Value::Float64(x - y)),
2380 (Value::Float64(x), Value::Float64(y), "__mul") => Some(Value::Float64(x * y)),
2381 (Value::Float64(x), Value::Float64(y), "__div") => Some(Value::Float64(x / y)),
2382 _ => None,
2383 }
2384 }
2385 _ => None,
2386 }
2387}
2388
2389fn expr_uses_function_on(expr: &Expr, column: &str) -> bool {
2390 match expr {
2391 Expr::FunctionCall { name, args } => {
2392 if matches!(name.as_str(), "__add" | "__sub" | "__mul" | "__div") {
2395 return false;
2396 }
2397 args.iter().any(|a| mentions_column_ref(a, column))
2398 }
2399 Expr::BinaryOp { left, right, .. } => {
2400 expr_uses_function_on(left, column) || expr_uses_function_on(right, column)
2401 }
2402 _ => false,
2403 }
2404}
2405
2406fn expr_uses_arithmetic_on(expr: &Expr, column: &str) -> bool {
2407 match expr {
2410 Expr::FunctionCall { name, args } => {
2411 matches!(name.as_str(), "__add" | "__sub" | "__mul" | "__div")
2412 && args.iter().any(|a| mentions_column_ref(a, column))
2413 }
2414 Expr::BinaryOp { left, right, .. } => {
2415 expr_uses_arithmetic_on(left, column) || expr_uses_arithmetic_on(right, column)
2416 }
2417 _ => false,
2418 }
2419}
2420
2421fn mentions_column_ref(expr: &Expr, column: &str) -> bool {
2422 match expr {
2423 Expr::Column(r) => r.column == column,
2424 Expr::FunctionCall { args, .. } => args.iter().any(|a| mentions_column_ref(a, column)),
2425 Expr::BinaryOp { left, right, .. } => {
2426 mentions_column_ref(left, column) || mentions_column_ref(right, column)
2427 }
2428 Expr::UnaryOp { operand, .. } => mentions_column_ref(operand, column),
2429 Expr::IsNull { expr: e, .. } => mentions_column_ref(e, column),
2430 Expr::Like { expr: e, .. } => mentions_column_ref(e, column),
2431 Expr::InList { expr: e, .. } => mentions_column_ref(e, column),
2432 Expr::InSubquery { expr: e, .. } => mentions_column_ref(e, column),
2433 _ => false,
2434 }
2435}
2436
2437#[allow(clippy::too_many_arguments)]
2440fn execute_index_scan(
2441 db: &Database,
2442 table: &str,
2443 pick: &IndexPick,
2444 snapshot: contextdb_core::SnapshotId,
2445 tx: Option<TxId>,
2446) -> Result<(Vec<VersionedRow>, u64)> {
2447 use contextdb_core::{DirectedValue, SortDirection, TotalOrdAsc, TotalOrdDesc};
2448 use std::ops::Bound;
2449
2450 if let IndexPredicateShape::Equality(rhs) = &pick.shape
2452 && let Value::Float64(f) = rhs
2453 && f.is_nan()
2454 {
2455 return Ok((Vec::new(), 0));
2456 }
2457 if let IndexPredicateShape::Equality(Value::Null) = &pick.shape {
2460 return Ok((Vec::new(), 0));
2461 }
2462
2463 let pick = match coerce_pick_shape_to_column_type(db, table, pick) {
2471 Ok(coerced) => coerced,
2472 Err(_) => return Ok((Vec::new(), 0)),
2473 };
2474 let pick = &pick;
2475
2476 let indexes = db.relational_store().indexes.read();
2477 let storage = match indexes.get(&(table.to_string(), pick.name.clone())) {
2478 Some(s) => s,
2479 None => return Ok((Vec::new(), 0)),
2480 };
2481
2482 let first_dir = pick
2486 .columns
2487 .first()
2488 .map(|(_, d)| *d)
2489 .unwrap_or(SortDirection::Asc);
2490
2491 let wrap = |v: Value| -> DirectedValue {
2492 match first_dir {
2493 SortDirection::Asc => DirectedValue::Asc(TotalOrdAsc(v)),
2494 SortDirection::Desc => DirectedValue::Desc(TotalOrdDesc(v)),
2495 }
2496 };
2497
2498 let mut postings: Vec<contextdb_relational::IndexEntry> = Vec::new();
2500 let mut rows_examined: u64 = 0;
2501
2502 let collect_range = |postings: &mut Vec<contextdb_relational::IndexEntry>,
2503 examined: &mut u64,
2504 lower: Bound<Vec<DirectedValue>>,
2505 upper: Bound<Vec<DirectedValue>>| {
2506 for (_k, entries) in storage.tree.range((lower, upper)) {
2507 for e in entries {
2508 *examined += 1;
2509 if e.visible_at(snapshot) {
2510 postings.push(e.clone());
2511 }
2512 }
2513 }
2514 };
2515
2516 let is_composite = pick.columns.len() > 1;
2521
2522 match &pick.shape {
2523 IndexPredicateShape::Equality(v) => {
2524 if is_composite {
2525 let want = wrap(v.clone());
2527 for (key, entries) in storage.tree.iter() {
2528 if key.first() != Some(&want) {
2529 continue;
2530 }
2531 for e in entries {
2532 rows_examined += 1;
2533 if e.visible_at(snapshot) {
2534 postings.push(e.clone());
2535 }
2536 }
2537 }
2538 } else {
2539 let lower = vec![wrap(v.clone())];
2540 let upper = lower.clone();
2541 collect_range(
2542 &mut postings,
2543 &mut rows_examined,
2544 Bound::Included(lower),
2545 Bound::Included(upper),
2546 );
2547 }
2548 }
2549 IndexPredicateShape::InList(vs) => {
2550 for v in vs {
2551 if is_composite {
2552 let want = wrap(v.clone());
2553 for (key, entries) in storage.tree.iter() {
2554 if key.first() != Some(&want) {
2555 continue;
2556 }
2557 for e in entries {
2558 rows_examined += 1;
2559 if e.visible_at(snapshot) {
2560 postings.push(e.clone());
2561 }
2562 }
2563 }
2564 } else {
2565 let k = vec![wrap(v.clone())];
2566 collect_range(
2567 &mut postings,
2568 &mut rows_examined,
2569 Bound::Included(k.clone()),
2570 Bound::Included(k),
2571 );
2572 }
2573 }
2574 }
2575 IndexPredicateShape::Range { lower, upper } => {
2576 if is_composite {
2577 for (key, entries) in storage.tree.iter() {
2580 let Some(first) = key.first() else { continue };
2581 let in_lower = match lower {
2582 Bound::Unbounded => true,
2583 Bound::Included(v) => first >= &wrap(v.clone()),
2584 Bound::Excluded(v) => first > &wrap(v.clone()),
2585 };
2586 let in_upper = match upper {
2587 Bound::Unbounded => true,
2588 Bound::Included(v) => first <= &wrap(v.clone()),
2589 Bound::Excluded(v) => first < &wrap(v.clone()),
2590 };
2591 if !(in_lower && in_upper) {
2592 continue;
2593 }
2594 for e in entries {
2595 rows_examined += 1;
2596 if e.visible_at(snapshot) {
2597 postings.push(e.clone());
2598 }
2599 }
2600 }
2601 } else {
2602 let l = match lower {
2603 Bound::Included(v) => Bound::Included(vec![wrap(v.clone())]),
2604 Bound::Excluded(v) => Bound::Excluded(vec![wrap(v.clone())]),
2605 Bound::Unbounded => Bound::Unbounded,
2606 };
2607 let u = match upper {
2608 Bound::Included(v) => Bound::Included(vec![wrap(v.clone())]),
2609 Bound::Excluded(v) => Bound::Excluded(vec![wrap(v.clone())]),
2610 Bound::Unbounded => Bound::Unbounded,
2611 };
2612 collect_range(&mut postings, &mut rows_examined, l, u);
2613 }
2614 }
2615 IndexPredicateShape::NotEqual(v) => {
2616 let except_key = vec![wrap(v.clone())];
2619 for (k, entries) in storage.tree.iter() {
2620 if *k == except_key {
2621 continue;
2622 }
2623 for e in entries {
2624 rows_examined += 1;
2625 if e.visible_at(snapshot) {
2626 postings.push(e.clone());
2627 }
2628 }
2629 }
2630 }
2631 IndexPredicateShape::IsNull => {
2632 let k = vec![wrap(Value::Null)];
2633 collect_range(
2634 &mut postings,
2635 &mut rows_examined,
2636 Bound::Included(k.clone()),
2637 Bound::Included(k),
2638 );
2639 }
2640 IndexPredicateShape::IsNotNull => {
2641 let null_key = vec![wrap(Value::Null)];
2643 for (k, entries) in storage.tree.iter() {
2644 if *k == null_key {
2645 continue;
2646 }
2647 for e in entries {
2648 rows_examined += 1;
2649 if e.visible_at(snapshot) {
2650 postings.push(e.clone());
2651 }
2652 }
2653 }
2654 }
2655 }
2656
2657 drop(indexes);
2660 let row_ids: Vec<RowId> = postings.iter().map(|p| p.row_id).collect();
2661 if row_ids.is_empty() {
2662 return Ok((Vec::new(), rows_examined));
2663 }
2664 let tables = db.relational_store().tables.read();
2665 let rows_slice = tables.get(table);
2666 let mut out: Vec<VersionedRow> = Vec::with_capacity(row_ids.len());
2667 if let Some(rows) = rows_slice {
2668 let by_id: HashMap<RowId, &VersionedRow> = rows.iter().map(|r| (r.row_id, r)).collect();
2669 for rid in &row_ids {
2670 if let Some(r) = by_id.get(rid) {
2671 if (*r).visible_at(snapshot) {
2674 out.push((**r).clone());
2675 }
2676 }
2677 }
2678 }
2679 drop(tables);
2682 if let Some(tx_id) = tx {
2683 let overlay = db.index_scan_tx_overlay(tx_id, table, &pick.pushed_column, &pick.shape)?;
2684 let deleted_row_ids = overlay.deleted_row_ids;
2685 out.retain(|row| !deleted_row_ids.contains(&row.row_id));
2686 out.extend(overlay.matching_inserts);
2687 }
2688 Ok((out, rows_examined))
2689}
2690
2691pub(crate) fn range_includes(
2692 v: &Value,
2693 lower: &std::ops::Bound<Value>,
2694 upper: &std::ops::Bound<Value>,
2695) -> bool {
2696 use std::ops::Bound;
2697 let ok_lower = match lower {
2698 Bound::Unbounded => true,
2699 Bound::Included(b) => compare_values(v, b).is_some_and(|o| o != std::cmp::Ordering::Less),
2700 Bound::Excluded(b) => {
2701 compare_values(v, b).is_some_and(|o| o == std::cmp::Ordering::Greater)
2702 }
2703 };
2704 let ok_upper = match upper {
2705 Bound::Unbounded => true,
2706 Bound::Included(b) => {
2707 compare_values(v, b).is_some_and(|o| o != std::cmp::Ordering::Greater)
2708 }
2709 Bound::Excluded(b) => compare_values(v, b).is_some_and(|o| o == std::cmp::Ordering::Less),
2710 };
2711 ok_lower && ok_upper
2712}
2713
2714fn try_elide_sort(
2721 db: &Database,
2722 input: &PhysicalPlan,
2723 keys: &[contextdb_planner::SortKey],
2724 params: &HashMap<String, Value>,
2725 tx: Option<TxId>,
2726) -> Result<Option<QueryResult>> {
2727 fn find_scan(plan: &PhysicalPlan) -> Option<(&String, &Option<String>, &Option<Expr>)> {
2728 match plan {
2729 PhysicalPlan::Scan {
2730 table,
2731 alias,
2732 filter,
2733 } => Some((table, alias, filter)),
2734 PhysicalPlan::Project { input, .. }
2735 | PhysicalPlan::Filter { input, .. }
2736 | PhysicalPlan::Distinct { input }
2737 | PhysicalPlan::Limit { input, .. } => find_scan(input),
2738 _ => None,
2739 }
2740 }
2741 let Some((table, _alias, filter)) = find_scan(input) else {
2742 return Ok(None);
2743 };
2744 if filter.is_some() {
2749 return Ok(None);
2750 }
2751 let key_cols: Option<Vec<(&str, &contextdb_parser::ast::SortDirection)>> = keys
2753 .iter()
2754 .map(|k| match &k.expr {
2755 Expr::Column(r) => Some((r.column.as_str(), &k.direction)),
2756 _ => None,
2757 })
2758 .collect();
2759 let Some(key_cols) = key_cols else {
2760 return Ok(None);
2761 };
2762 let meta = match db.table_meta(table) {
2763 Some(m) => m,
2764 None => return Ok(None),
2765 };
2766 let matching_index = meta.indexes.iter().find(|decl| {
2767 if decl.columns.len() < key_cols.len() {
2768 return false;
2769 }
2770 decl.columns
2771 .iter()
2772 .zip(key_cols.iter())
2773 .all(|((col, dir), (kcol, kdir))| col == kcol && core_dir_matches_ast(*dir, **kdir))
2774 });
2775 let Some(matching) = matching_index else {
2776 return Ok(None);
2777 };
2778 run_index_scan_with_order(db, table, matching, filter.as_ref(), params, tx)
2779}
2780
2781fn run_index_scan_with_order(
2785 db: &Database,
2786 table: &str,
2787 decl: &contextdb_core::IndexDecl,
2788 filter: Option<&Expr>,
2789 params: &HashMap<String, Value>,
2790 tx: Option<TxId>,
2791) -> Result<Option<QueryResult>> {
2792 use std::borrow::Cow;
2793 let snapshot = db.snapshot_for_read();
2794 let schema_columns = db.table_meta(table).map(|meta| {
2795 meta.columns
2796 .into_iter()
2797 .map(|column| column.name)
2798 .collect::<Vec<_>>()
2799 });
2800 let resolved_filter = filter
2801 .map(|expr| resolve_in_subqueries(db, expr, params, tx))
2802 .transpose()?;
2803
2804 let pick = IndexPick {
2807 name: decl.name.clone(),
2808 columns: decl.columns.clone(),
2809 shape: IndexPredicateShape::Range {
2810 lower: std::ops::Bound::Unbounded,
2811 upper: std::ops::Bound::Unbounded,
2812 },
2813 pushed_column: decl.columns[0].0.clone(),
2814 };
2815 let (rows, examined) = execute_index_scan(db, table, &pick, snapshot, tx)?;
2816 db.__bump_rows_examined(examined);
2817 let mut result = materialize_rows(
2818 rows,
2819 resolved_filter.as_ref(),
2820 params,
2821 schema_columns.as_deref(),
2822 )?;
2823 let mut pushed: smallvec::SmallVec<[Cow<'static, str>; 4]> = smallvec::SmallVec::new();
2824 pushed.push(Cow::Owned(decl.columns[0].0.clone()));
2825 result.trace = crate::database::QueryTrace {
2826 physical_plan: "IndexScan",
2827 index_used: Some(decl.name.clone()),
2828 predicates_pushed: pushed,
2829 indexes_considered: Default::default(),
2830 sort_elided: true,
2831 };
2832 Ok(Some(result))
2833}
2834
2835fn sort_keys_match_index_prefix(
2836 db: &Database,
2837 input: &PhysicalPlan,
2838 index_name: &str,
2839 keys: &[contextdb_planner::SortKey],
2840) -> bool {
2841 fn find_scan_and_filter(plan: &PhysicalPlan) -> Option<(&String, &Option<Expr>)> {
2842 match plan {
2843 PhysicalPlan::Scan { table, filter, .. } => Some((table, filter)),
2844 PhysicalPlan::Project { input, .. }
2845 | PhysicalPlan::Filter { input, .. }
2846 | PhysicalPlan::Distinct { input }
2847 | PhysicalPlan::Limit { input, .. } => find_scan_and_filter(input),
2848 _ => None,
2849 }
2850 }
2851 let (table, filter) = match find_scan_and_filter(input) {
2852 Some(t) => t,
2853 None => return false,
2854 };
2855 let meta = match db.table_meta(table) {
2856 Some(m) => m,
2857 None => return false,
2858 };
2859 let decl = meta.indexes.iter().find(|i| i.name == index_name);
2860 let Some(decl) = decl else {
2861 return false;
2862 };
2863 if let Some(filter_expr) = filter.as_ref()
2868 && let Some(leading_col) = decl.columns.first().map(|(c, _)| c.as_str())
2869 {
2870 let conjuncts = split_conjuncts(filter_expr);
2871 let empty_params = HashMap::new();
2872 for conjunct in &conjuncts {
2873 if let Some((col, shape)) = classify_index_predicate(conjunct, &empty_params)
2874 && col == leading_col
2875 && matches!(
2876 shape,
2877 IndexPredicateShape::InList(_) | IndexPredicateShape::NotEqual(_)
2878 )
2879 {
2880 return false;
2881 }
2882 }
2883 }
2884 let pinned_prefix_len = count_equality_prefix(filter.as_ref(), &decl.columns);
2889 let remaining_index_cols = &decl.columns[pinned_prefix_len..];
2890 if remaining_index_cols.len() < keys.len() {
2891 return false;
2892 }
2893 remaining_index_cols
2894 .iter()
2895 .zip(keys.iter())
2896 .all(|((col, dir), k)| match &k.expr {
2897 Expr::Column(r) => r.column == *col && core_dir_matches_ast(*dir, k.direction),
2898 _ => false,
2899 })
2900}
2901
2902fn count_equality_prefix(
2903 filter: Option<&Expr>,
2904 columns: &[(String, contextdb_core::SortDirection)],
2905) -> usize {
2906 let Some(filter) = filter else {
2907 return 0;
2908 };
2909 let conjuncts = split_conjuncts(filter);
2910 let mut pinned = 0usize;
2911 for (col, _) in columns {
2912 let has_eq = conjuncts.iter().any(|c| match c {
2913 Expr::BinaryOp {
2914 left,
2915 op: BinOp::Eq,
2916 right,
2917 } => {
2918 let left_is_col = matches!(left.as_ref(), Expr::Column(r) if r.column == *col);
2919 let right_is_simple =
2920 matches!(right.as_ref(), Expr::Literal(_) | Expr::Parameter(_));
2921 left_is_col && right_is_simple
2922 }
2923 _ => false,
2924 });
2925 if has_eq {
2926 pinned += 1;
2927 } else {
2928 break;
2929 }
2930 }
2931 pinned
2932}
2933
2934fn core_dir_matches_ast(
2935 core: contextdb_core::SortDirection,
2936 ast: contextdb_parser::ast::SortDirection,
2937) -> bool {
2938 matches!(
2939 (core, ast),
2940 (
2941 contextdb_core::SortDirection::Asc,
2942 contextdb_parser::ast::SortDirection::Asc
2943 ) | (
2944 contextdb_core::SortDirection::Desc,
2945 contextdb_parser::ast::SortDirection::Desc
2946 )
2947 )
2948}
2949
2950fn validate_update_state_transition(
2953 db: &Database,
2954 table: &str,
2955 existing: &VersionedRow,
2956 next_values: &HashMap<String, Value>,
2957) -> Result<()> {
2958 let Some(meta) = db.table_meta(table) else {
2959 return Ok(());
2960 };
2961 let Some(state_machine) = meta.state_machine else {
2962 return Ok(());
2963 };
2964
2965 let old_state = existing
2966 .values
2967 .get(&state_machine.column)
2968 .and_then(Value::as_text);
2969 let new_state = next_values
2970 .get(&state_machine.column)
2971 .and_then(Value::as_text);
2972
2973 let (Some(old_state), Some(new_state)) = (old_state, new_state) else {
2974 return Ok(());
2975 };
2976
2977 if old_state == new_state
2978 || db.relational_store().validate_state_transition(
2979 table,
2980 &state_machine.column,
2981 old_state,
2982 new_state,
2983 )
2984 {
2985 return Ok(());
2986 }
2987
2988 Err(Error::InvalidStateTransition(format!(
2989 "{old_state} -> {new_state}"
2990 )))
2991}
2992
2993fn estimate_row_bytes_for_meta(
2994 values: &HashMap<String, Value>,
2995 meta: &TableMeta,
2996 include_vectors: bool,
2997) -> usize {
2998 let mut bytes = 96usize;
2999 for column in &meta.columns {
3000 let Some(value) = values.get(&column.name) else {
3001 continue;
3002 };
3003 if !include_vectors && matches!(column.column_type, ColumnType::Vector(_)) {
3004 continue;
3005 }
3006 bytes = bytes.saturating_add(32 + column.name.len() * 8 + value.estimated_bytes());
3007 }
3008 bytes
3009}
3010
3011fn estimate_vector_search_bytes(dimension: usize, k: usize) -> usize {
3012 k.saturating_mul(3)
3013 .saturating_mul(dimension)
3014 .saturating_mul(std::mem::size_of::<f32>())
3015}
3016
3017fn estimate_bfs_working_bytes<T>(
3018 frontier: &[T],
3019 steps: &[contextdb_planner::GraphStepPlan],
3020) -> usize {
3021 let max_hops = steps.iter().fold(0usize, |acc, step| {
3022 acc.saturating_add(step.max_depth as usize)
3023 });
3024 frontier
3025 .len()
3026 .saturating_mul(2048)
3027 .saturating_mul(max_hops.max(1))
3028}
3029
3030fn dedupe_graph_frontier(
3031 frontier: Vec<(HashMap<String, uuid::Uuid>, uuid::Uuid, u32)>,
3032 steps: &[contextdb_planner::GraphStepPlan],
3033) -> Vec<(HashMap<String, uuid::Uuid>, uuid::Uuid, u32)> {
3034 let mut best =
3035 HashMap::<Vec<uuid::Uuid>, (HashMap<String, uuid::Uuid>, uuid::Uuid, u32)>::new();
3036
3037 for (bindings, current_id, depth) in frontier {
3038 let mut key = Vec::with_capacity(steps.len());
3039 for step in steps {
3040 if let Some(id) = bindings.get(&step.target_alias) {
3041 key.push(*id);
3042 }
3043 }
3044
3045 best.entry(key)
3046 .and_modify(|existing| {
3047 if depth < existing.2 {
3048 *existing = (bindings.clone(), current_id, depth);
3049 }
3050 })
3051 .or_insert((bindings, current_id, depth));
3052 }
3053
3054 best.into_values().collect()
3055}
3056
3057fn estimate_drop_table_bytes(db: &Database, table: &str) -> usize {
3058 let meta = db.table_meta(table);
3059 let metadata_bytes = meta.as_ref().map(TableMeta::estimated_bytes).unwrap_or(0);
3060 let snapshot = db.snapshot();
3061 let rows = db.scan(table, snapshot).unwrap_or_default();
3062 let row_bytes = rows.iter().fold(0usize, |acc, row| {
3063 acc.saturating_add(meta.as_ref().map_or_else(
3064 || row.estimated_bytes(),
3065 |meta| estimate_row_bytes_for_meta(&row.values, meta, false),
3066 ))
3067 });
3068 let vector_bytes = rows
3069 .iter()
3070 .filter_map(|row| db.live_vector_entry(row.row_id, snapshot))
3071 .fold(0usize, |acc, entry| {
3072 acc.saturating_add(entry.estimated_bytes())
3073 });
3074 let edge_bytes = rows.iter().fold(0usize, |acc, row| {
3075 match (
3076 row.values.get("source_id").and_then(Value::as_uuid),
3077 row.values.get("target_id").and_then(Value::as_uuid),
3078 row.values.get("edge_type").and_then(Value::as_text),
3079 ) {
3080 (Some(_), Some(_), Some(edge_type)) => acc.saturating_add(
3081 96 + edge_type.len().saturating_mul(16) + estimate_row_value_bytes(&HashMap::new()),
3082 ),
3083 _ => acc,
3084 }
3085 });
3086 metadata_bytes
3087 .saturating_add(row_bytes)
3088 .saturating_add(vector_bytes)
3089 .saturating_add(edge_bytes)
3090}
3091
3092fn materialize_rows(
3093 rows: Vec<VersionedRow>,
3094 filter: Option<&Expr>,
3095 params: &HashMap<String, Value>,
3096 schema_columns: Option<&[String]>,
3097) -> Result<QueryResult> {
3098 let filtered: Vec<VersionedRow> = rows
3099 .into_iter()
3100 .filter(|r| filter.is_none_or(|f| row_matches(r, f, params).unwrap_or(false)))
3101 .collect();
3102
3103 let keys = if let Some(schema_columns) = schema_columns {
3104 schema_columns.to_vec()
3105 } else {
3106 let mut keys = BTreeSet::new();
3107 for r in &filtered {
3108 for k in r.values.keys() {
3109 keys.insert(k.clone());
3110 }
3111 }
3112 keys.into_iter().collect::<Vec<_>>()
3113 };
3114
3115 let mut columns = vec!["row_id".to_string()];
3116 columns.extend(keys.iter().cloned());
3117
3118 let rows = filtered
3119 .into_iter()
3120 .map(|r| {
3121 let mut out = vec![Value::Int64(r.row_id.0 as i64)];
3122 for k in &keys {
3123 out.push(r.values.get(k).cloned().unwrap_or(Value::Null));
3124 }
3125 out
3126 })
3127 .collect();
3128
3129 Ok(QueryResult {
3130 columns,
3131 rows,
3132 rows_affected: 0,
3133 trace: crate::database::QueryTrace::scan(),
3134 cascade: None,
3135 })
3136}
3137
3138fn rows_by_row_id(
3139 db: &Database,
3140 table: &str,
3141 row_ids: &[RowId],
3142 snapshot: SnapshotId,
3143) -> Result<Vec<VersionedRow>> {
3144 if row_ids.is_empty() {
3145 return Ok(Vec::new());
3146 }
3147
3148 let wanted = row_ids.iter().copied().collect::<HashSet<_>>();
3149 let tables = db.relational_store().tables.read();
3150 let rows = tables
3151 .get(table)
3152 .ok_or_else(|| Error::TableNotFound(table.to_string()))?;
3153 let mut found = HashMap::with_capacity(wanted.len());
3154 for row in rows {
3155 if wanted.contains(&row.row_id) && row.visible_at(snapshot) {
3156 found.insert(row.row_id, row.clone());
3157 if found.len() == wanted.len() {
3158 break;
3159 }
3160 }
3161 }
3162
3163 Ok(row_ids
3164 .iter()
3165 .filter_map(|row_id| found.remove(row_id))
3166 .collect())
3167}
3168
3169fn uuid_to_row_id_map(
3170 db: &Database,
3171 table: &str,
3172 snapshot: SnapshotId,
3173) -> Result<HashMap<uuid::Uuid, RowId>> {
3174 let tables = db.relational_store().tables.read();
3175 let rows = tables
3176 .get(table)
3177 .ok_or_else(|| Error::TableNotFound(table.to_string()))?;
3178 Ok(rows
3179 .iter()
3180 .filter(|row| row.visible_at(snapshot))
3181 .filter_map(|row| match row.values.get("id") {
3182 Some(Value::Uuid(uuid)) => Some((*uuid, row.row_id)),
3183 _ => None,
3184 })
3185 .collect())
3186}
3187
3188fn vector_search_trace(operator: &'static str, candidate_trace: Option<QueryTrace>) -> QueryTrace {
3189 let Some(mut trace) = candidate_trace else {
3190 return QueryTrace {
3191 physical_plan: operator,
3192 ..Default::default()
3193 };
3194 };
3195
3196 trace.physical_plan = match (trace.physical_plan, operator) {
3197 ("IndexScan", "HNSWSearch") => "IndexScan -> HNSWSearch",
3198 ("IndexScan", _) => "IndexScan -> VectorSearch",
3199 ("Scan", "HNSWSearch") => "Scan -> HNSWSearch",
3200 ("Scan", _) => "Scan -> VectorSearch",
3201 (_, "HNSWSearch") => "HNSWSearch",
3202 _ => "VectorSearch",
3203 };
3204 trace.sort_elided = false;
3205 trace
3206}
3207
3208pub(crate) fn row_matches(
3209 row: &VersionedRow,
3210 expr: &Expr,
3211 params: &HashMap<String, Value>,
3212) -> Result<bool> {
3213 Ok(eval_bool_expr(row, expr, params)?.unwrap_or(false))
3214}
3215
3216fn eval_expr_value(
3217 row: &VersionedRow,
3218 expr: &Expr,
3219 params: &HashMap<String, Value>,
3220) -> Result<Value> {
3221 match expr {
3222 Expr::Column(c) => {
3223 if c.column == "row_id" {
3224 Ok(Value::Int64(row.row_id.0 as i64))
3225 } else {
3226 Ok(row.values.get(&c.column).cloned().unwrap_or(Value::Null))
3227 }
3228 }
3229 Expr::BinaryOp { left, op, right } => {
3230 let left = eval_expr_value(row, left, params)?;
3231 let right = eval_expr_value(row, right, params)?;
3232 eval_binary_op(op, &left, &right)
3233 }
3234 Expr::UnaryOp { op, operand } => {
3235 let value = eval_expr_value(row, operand, params)?;
3236 match op {
3237 UnaryOp::Not => Ok(Value::Bool(!value_to_bool(&value))),
3238 UnaryOp::Neg => match value {
3239 Value::Int64(v) => Ok(Value::Int64(-v)),
3240 Value::Float64(v) => Ok(Value::Float64(-v)),
3241 _ => Err(Error::PlanError(
3242 "cannot negate non-numeric value".to_string(),
3243 )),
3244 },
3245 }
3246 }
3247 Expr::FunctionCall { name, args } => eval_function_in_row_context(row, name, args, params),
3248 Expr::IsNull { expr, negated } => {
3249 let is_null = eval_expr_value(row, expr, params)? == Value::Null;
3250 Ok(Value::Bool(if *negated { !is_null } else { is_null }))
3251 }
3252 Expr::InList {
3253 expr,
3254 list,
3255 negated,
3256 } => {
3257 let needle = eval_expr_value(row, expr, params)?;
3258 let matched = list.iter().try_fold(false, |found, item| {
3259 if found {
3260 Ok(true)
3261 } else {
3262 let candidate = eval_expr_value(row, item, params)?;
3263 Ok(
3264 matches!(compare_values(&needle, &candidate), Some(Ordering::Equal))
3265 || (needle != Value::Null
3266 && candidate != Value::Null
3267 && needle == candidate),
3268 )
3269 }
3270 })?;
3271 Ok(Value::Bool(if *negated { !matched } else { matched }))
3272 }
3273 Expr::Like {
3274 expr,
3275 pattern,
3276 negated,
3277 } => {
3278 let matches = match (
3279 eval_expr_value(row, expr, params)?,
3280 eval_expr_value(row, pattern, params)?,
3281 ) {
3282 (Value::Text(value), Value::Text(pattern)) => like_matches(&value, &pattern),
3283 _ => false,
3284 };
3285 Ok(Value::Bool(if *negated { !matches } else { matches }))
3286 }
3287 _ => resolve_expr(expr, params),
3288 }
3289}
3290
3291pub fn resolve_expr(expr: &Expr, params: &HashMap<String, Value>) -> Result<Value> {
3292 match expr {
3293 Expr::Literal(l) => Ok(match l {
3294 Literal::Null => Value::Null,
3295 Literal::Bool(v) => Value::Bool(*v),
3296 Literal::Integer(v) => Value::Int64(*v),
3297 Literal::Real(v) => Value::Float64(*v),
3298 Literal::Text(v) => Value::Text(v.clone()),
3299 Literal::Vector(v) => Value::Vector(v.clone()),
3300 }),
3301 Expr::Parameter(p) => params
3302 .get(p)
3303 .cloned()
3304 .ok_or_else(|| Error::NotFound(format!("missing parameter: {}", p))),
3305 Expr::Column(c) => Ok(Value::Text(c.column.clone())),
3306 Expr::UnaryOp { op, operand } => match op {
3307 UnaryOp::Neg => match resolve_expr(operand, params)? {
3308 Value::Int64(v) => Ok(Value::Int64(-v)),
3309 Value::Float64(v) => Ok(Value::Float64(-v)),
3310 _ => Err(Error::PlanError(
3311 "cannot negate non-numeric value".to_string(),
3312 )),
3313 },
3314 UnaryOp::Not => Err(Error::PlanError(
3315 "boolean NOT requires row context".to_string(),
3316 )),
3317 },
3318 Expr::FunctionCall { name, args } => {
3319 let values = args
3320 .iter()
3321 .map(|arg| resolve_expr(arg, params))
3322 .collect::<Result<Vec<_>>>()?;
3323 eval_function(name, &values)
3324 }
3325 Expr::CosineDistance { right, .. } => resolve_expr(right, params),
3326 _ => Err(Error::PlanError("unsupported expression".to_string())),
3327 }
3328}
3329
3330fn compare_values(a: &Value, b: &Value) -> Option<Ordering> {
3331 match (a, b) {
3332 (Value::Int64(left), Value::Int64(right)) => Some(left.cmp(right)),
3333 (Value::Float64(left), Value::Float64(right)) => Some(left.total_cmp(right)),
3334 (Value::Text(left), Value::Text(right)) => Some(left.cmp(right)),
3335 (Value::Timestamp(left), Value::Timestamp(right)) => Some(left.cmp(right)),
3336 (Value::Int64(left), Value::Float64(right)) => Some((*left as f64).total_cmp(right)),
3337 (Value::Float64(left), Value::Int64(right)) => Some(left.total_cmp(&(*right as f64))),
3338 (Value::Timestamp(left), Value::Int64(right)) => Some(left.cmp(right)),
3339 (Value::Int64(left), Value::Timestamp(right)) => Some(left.cmp(right)),
3340 (Value::Bool(left), Value::Bool(right)) => Some(left.cmp(right)),
3341 (Value::Uuid(left), Value::Uuid(right)) => Some(left.cmp(right)),
3342 (Value::Uuid(u), Value::Text(t)) => {
3343 if let Ok(parsed) = t.parse::<uuid::Uuid>() {
3344 Some(u.cmp(&parsed))
3345 } else {
3346 None
3347 }
3348 }
3349 (Value::Text(t), Value::Uuid(u)) => {
3350 if let Ok(parsed) = t.parse::<uuid::Uuid>() {
3351 Some(parsed.cmp(u))
3352 } else {
3353 None
3354 }
3355 }
3356 (Value::TxId(a), Value::TxId(b)) => Some(a.0.cmp(&b.0)),
3357 (Value::TxId(a), Value::Int64(b)) => {
3358 if *b < 0 {
3359 Some(Ordering::Greater)
3360 } else {
3361 Some(a.0.cmp(&(*b as u64)))
3362 }
3363 }
3364 (Value::Int64(a), Value::TxId(b)) => {
3365 if *a < 0 {
3366 Some(Ordering::Less)
3367 } else {
3368 Some((*a as u64).cmp(&b.0))
3369 }
3370 }
3371 (Value::TxId(_), Value::Timestamp(_)) | (Value::Timestamp(_), Value::TxId(_)) => None,
3372 (Value::Null, _) | (_, Value::Null) => None,
3373 _ => None,
3374 }
3375}
3376
3377fn eval_bool_expr(
3378 row: &VersionedRow,
3379 expr: &Expr,
3380 params: &HashMap<String, Value>,
3381) -> Result<Option<bool>> {
3382 match expr {
3383 Expr::BinaryOp { left, op, right } => match op {
3384 BinOp::Eq | BinOp::Neq | BinOp::Lt | BinOp::Lte | BinOp::Gt | BinOp::Gte => {
3385 let left = eval_expr_value(row, left, params)?;
3386 let right = eval_expr_value(row, right, params)?;
3387 if left == Value::Null || right == Value::Null {
3388 return Ok(None);
3389 }
3390
3391 let result = match op {
3392 BinOp::Eq => {
3393 compare_values(&left, &right) == Some(Ordering::Equal) || left == right
3394 }
3395 BinOp::Neq => {
3396 !(compare_values(&left, &right) == Some(Ordering::Equal) || left == right)
3397 }
3398 BinOp::Lt => compare_values(&left, &right) == Some(Ordering::Less),
3399 BinOp::Lte => matches!(
3400 compare_values(&left, &right),
3401 Some(Ordering::Less | Ordering::Equal)
3402 ),
3403 BinOp::Gt => compare_values(&left, &right) == Some(Ordering::Greater),
3404 BinOp::Gte => matches!(
3405 compare_values(&left, &right),
3406 Some(Ordering::Greater | Ordering::Equal)
3407 ),
3408 BinOp::And | BinOp::Or => unreachable!(),
3409 };
3410 Ok(Some(result))
3411 }
3412 BinOp::And => {
3413 let left = eval_bool_expr(row, left, params)?;
3414 if left == Some(false) {
3415 return Ok(Some(false));
3416 }
3417 let right = eval_bool_expr(row, right, params)?;
3418 Ok(match (left, right) {
3419 (Some(true), Some(true)) => Some(true),
3420 (Some(true), other) => other,
3421 (None, Some(false)) => Some(false),
3422 (None, Some(true)) | (None, None) => None,
3423 (Some(false), _) => Some(false),
3424 })
3425 }
3426 BinOp::Or => {
3427 let left = eval_bool_expr(row, left, params)?;
3428 if left == Some(true) {
3429 return Ok(Some(true));
3430 }
3431 let right = eval_bool_expr(row, right, params)?;
3432 Ok(match (left, right) {
3433 (Some(false), Some(false)) => Some(false),
3434 (Some(false), other) => other,
3435 (None, Some(true)) => Some(true),
3436 (None, Some(false)) | (None, None) => None,
3437 (Some(true), _) => Some(true),
3438 })
3439 }
3440 },
3441 Expr::UnaryOp {
3442 op: UnaryOp::Not,
3443 operand,
3444 } => Ok(eval_bool_expr(row, operand, params)?.map(|value| !value)),
3445 Expr::InList {
3446 expr,
3447 list,
3448 negated,
3449 } => {
3450 let needle = eval_expr_value(row, expr, params)?;
3451 if needle == Value::Null {
3452 return Ok(None);
3453 }
3454
3455 let matched = list.iter().try_fold(false, |found, item| {
3456 if found {
3457 Ok(true)
3458 } else {
3459 let candidate = eval_expr_value(row, item, params)?;
3460 Ok(
3461 matches!(compare_values(&needle, &candidate), Some(Ordering::Equal))
3462 || (candidate != Value::Null && needle == candidate),
3463 )
3464 }
3465 })?;
3466 Ok(Some(if *negated { !matched } else { matched }))
3467 }
3468 Expr::InSubquery { .. } => Err(Error::PlanError(
3469 "IN (subquery) must be resolved before execution".to_string(),
3470 )),
3471 Expr::Like {
3472 expr,
3473 pattern,
3474 negated,
3475 } => {
3476 let left = eval_expr_value(row, expr, params)?;
3477 let right = eval_expr_value(row, pattern, params)?;
3478 let matched = match (left, right) {
3479 (Value::Text(value), Value::Text(pattern)) => like_matches(&value, &pattern),
3480 _ => false,
3481 };
3482 Ok(Some(if *negated { !matched } else { matched }))
3483 }
3484 Expr::IsNull { expr, negated } => {
3485 let is_null = eval_expr_value(row, expr, params)? == Value::Null;
3486 Ok(Some(if *negated { !is_null } else { is_null }))
3487 }
3488 Expr::FunctionCall { .. } => match eval_expr_value(row, expr, params)? {
3489 Value::Bool(value) => Ok(Some(value)),
3490 Value::Null => Ok(None),
3491 _ => Err(Error::PlanError(format!(
3492 "unsupported WHERE expression: {:?}",
3493 expr
3494 ))),
3495 },
3496 _ => Err(Error::PlanError(format!(
3497 "unsupported WHERE expression: {:?}",
3498 expr
3499 ))),
3500 }
3501}
3502
3503fn eval_binary_op(op: &BinOp, left: &Value, right: &Value) -> Result<Value> {
3504 let bool_value = match op {
3505 BinOp::Eq => {
3506 if left == &Value::Null || right == &Value::Null {
3507 false
3508 } else {
3509 compare_values(left, right) == Some(Ordering::Equal) || left == right
3510 }
3511 }
3512 BinOp::Neq => {
3513 if left == &Value::Null || right == &Value::Null {
3514 false
3515 } else {
3516 !(compare_values(left, right) == Some(Ordering::Equal) || left == right)
3517 }
3518 }
3519 BinOp::Lt => compare_values(left, right) == Some(Ordering::Less),
3520 BinOp::Lte => matches!(
3521 compare_values(left, right),
3522 Some(Ordering::Less | Ordering::Equal)
3523 ),
3524 BinOp::Gt => compare_values(left, right) == Some(Ordering::Greater),
3525 BinOp::Gte => matches!(
3526 compare_values(left, right),
3527 Some(Ordering::Greater | Ordering::Equal)
3528 ),
3529 BinOp::And => value_to_bool(left) && value_to_bool(right),
3530 BinOp::Or => value_to_bool(left) || value_to_bool(right),
3531 };
3532 Ok(Value::Bool(bool_value))
3533}
3534
3535fn value_to_bool(value: &Value) -> bool {
3536 matches!(value, Value::Bool(true))
3537}
3538
3539fn compare_sort_values(left: &Value, right: &Value, direction: SortDirection) -> Ordering {
3540 match (left, right) {
3541 (Value::Null, Value::Null) => Ordering::Equal,
3542 (Value::Null, _) => match direction {
3543 SortDirection::Asc => Ordering::Greater,
3544 SortDirection::Desc => Ordering::Less,
3545 SortDirection::CosineDistance => Ordering::Equal,
3546 },
3547 (_, Value::Null) => match direction {
3548 SortDirection::Asc => Ordering::Less,
3549 SortDirection::Desc => Ordering::Greater,
3550 SortDirection::CosineDistance => Ordering::Equal,
3551 },
3552 _ => {
3553 let ordering = compare_values(left, right).unwrap_or(Ordering::Equal);
3554 match direction {
3555 SortDirection::Asc => ordering,
3556 SortDirection::Desc => ordering.reverse(),
3557 SortDirection::CosineDistance => ordering,
3558 }
3559 }
3560 }
3561}
3562
3563fn eval_assignment_expr(
3564 expr: &Expr,
3565 row_values: &HashMap<String, Value>,
3566 params: &HashMap<String, Value>,
3567) -> Result<Value> {
3568 match expr {
3569 Expr::Literal(lit) => literal_to_value(lit),
3570 Expr::Parameter(name) => params
3571 .get(name)
3572 .cloned()
3573 .ok_or_else(|| Error::Other(format!("unknown parameter: {}", name))),
3574 Expr::Column(col_ref) => row_values
3575 .get(&col_ref.column)
3576 .cloned()
3577 .ok_or_else(|| Error::Other(format!("column not found: {}", col_ref.column))),
3578 Expr::BinaryOp { left, op, right } => {
3579 let left = eval_assignment_expr(left, row_values, params)?;
3580 let right = eval_assignment_expr(right, row_values, params)?;
3581 eval_binary_op(op, &left, &right)
3582 }
3583 Expr::UnaryOp { op, operand } => match op {
3584 UnaryOp::Neg => match eval_assignment_expr(operand, row_values, params)? {
3585 Value::Int64(value) => Ok(Value::Int64(-value)),
3586 Value::Float64(value) => Ok(Value::Float64(-value)),
3587 _ => Err(Error::Other(format!(
3588 "unsupported expression in UPDATE SET: {:?}",
3589 expr
3590 ))),
3591 },
3592 UnaryOp::Not => Err(Error::Other(format!(
3593 "unsupported expression in UPDATE SET: {:?}",
3594 expr
3595 ))),
3596 },
3597 Expr::FunctionCall { name, args } => {
3598 let evaluated = args
3599 .iter()
3600 .map(|arg| eval_assignment_expr(arg, row_values, params))
3601 .collect::<Result<Vec<_>>>()?;
3602 eval_function(name, &evaluated)
3603 }
3604 _ => Err(Error::Other(format!(
3605 "unsupported expression in UPDATE SET: {:?}",
3606 expr
3607 ))),
3608 }
3609}
3610
3611fn apply_on_conflict_updates(
3612 db: &Database,
3613 table: &str,
3614 mut insert_values: HashMap<String, Value>,
3615 existing_row: &VersionedRow,
3616 on_conflict: &OnConflictPlan,
3617 params: &HashMap<String, Value>,
3618 active_tx: Option<TxId>,
3619) -> Result<HashMap<String, Value>> {
3620 if on_conflict.update_columns.is_empty() {
3621 return Ok(insert_values);
3622 }
3623
3624 if let Some(meta) = db.table_meta(table) {
3629 for (column, _) in &on_conflict.update_columns {
3630 if let Some(col_def) = meta.columns.iter().find(|c| c.name == *column)
3631 && col_def.immutable
3632 {
3633 return Err(Error::ImmutableColumn {
3634 table: table.to_string(),
3635 column: column.clone(),
3636 });
3637 }
3638 }
3639 }
3640
3641 let current_tx_max = Some(db.committed_watermark());
3642
3643 let mut merged = existing_row.values.clone();
3644 for (column, expr) in &on_conflict.update_columns {
3645 let value = eval_assignment_expr(expr, &existing_row.values, params)?;
3646 merged.insert(
3647 column.clone(),
3648 coerce_value_for_column(db, table, column, value, current_tx_max, active_tx)?,
3649 );
3650 }
3651
3652 for (column, value) in insert_values.drain() {
3653 merged.entry(column).or_insert(value);
3654 }
3655
3656 Ok(merged)
3657}
3658
3659fn literal_to_value(lit: &Literal) -> Result<Value> {
3660 Ok(match lit {
3661 Literal::Null => Value::Null,
3662 Literal::Bool(v) => Value::Bool(*v),
3663 Literal::Integer(v) => Value::Int64(*v),
3664 Literal::Real(v) => Value::Float64(*v),
3665 Literal::Text(v) => Value::Text(v.clone()),
3666 Literal::Vector(v) => Value::Vector(v.clone()),
3667 })
3668}
3669
3670fn eval_arithmetic(name: &str, args: &[Value]) -> Result<Value> {
3671 let [left, right] = args else {
3672 return Err(Error::PlanError(format!(
3673 "function {} expects 2 arguments",
3674 name
3675 )));
3676 };
3677
3678 match (left, right) {
3679 (Value::Int64(left), Value::Int64(right)) => match name {
3680 "__add" => Ok(Value::Int64(left + right)),
3681 "__sub" => Ok(Value::Int64(left - right)),
3682 "__mul" => Ok(Value::Int64(left * right)),
3683 "__div" => Ok(Value::Int64(left / right)),
3684 _ => Err(Error::PlanError(format!("unknown function: {}", name))),
3685 },
3686 (Value::Float64(left), Value::Float64(right)) => match name {
3687 "__add" => Ok(Value::Float64(left + right)),
3688 "__sub" => Ok(Value::Float64(left - right)),
3689 "__mul" => Ok(Value::Float64(left * right)),
3690 "__div" => Ok(Value::Float64(left / right)),
3691 _ => Err(Error::PlanError(format!("unknown function: {}", name))),
3692 },
3693 (Value::Int64(left), Value::Float64(right)) => match name {
3694 "__add" => Ok(Value::Float64(*left as f64 + right)),
3695 "__sub" => Ok(Value::Float64(*left as f64 - right)),
3696 "__mul" => Ok(Value::Float64(*left as f64 * right)),
3697 "__div" => Ok(Value::Float64(*left as f64 / right)),
3698 _ => Err(Error::PlanError(format!("unknown function: {}", name))),
3699 },
3700 (Value::Float64(left), Value::Int64(right)) => match name {
3701 "__add" => Ok(Value::Float64(left + *right as f64)),
3702 "__sub" => Ok(Value::Float64(left - *right as f64)),
3703 "__mul" => Ok(Value::Float64(left * *right as f64)),
3704 "__div" => Ok(Value::Float64(left / *right as f64)),
3705 _ => Err(Error::PlanError(format!("unknown function: {}", name))),
3706 },
3707 _ => Err(Error::PlanError(format!(
3708 "function {} expects numeric arguments",
3709 name
3710 ))),
3711 }
3712}
3713
3714fn eval_function_in_row_context(
3715 row: &VersionedRow,
3716 name: &str,
3717 args: &[Expr],
3718 params: &HashMap<String, Value>,
3719) -> Result<Value> {
3720 let values = args
3721 .iter()
3722 .map(|arg| eval_expr_value(row, arg, params))
3723 .collect::<Result<Vec<_>>>()?;
3724 eval_function(name, &values)
3725}
3726
3727fn eval_function(name: &str, args: &[Value]) -> Result<Value> {
3728 match name.to_ascii_lowercase().as_str() {
3729 "__add" | "__sub" | "__mul" | "__div" => eval_arithmetic(name, args),
3730 "coalesce" => Ok(args
3731 .iter()
3732 .find(|value| **value != Value::Null)
3733 .cloned()
3734 .unwrap_or(Value::Null)),
3735 "now" => Ok(Value::Timestamp(
3736 SystemTime::now()
3737 .duration_since(UNIX_EPOCH)
3738 .map_err(|err| Error::PlanError(err.to_string()))?
3739 .as_secs() as i64,
3740 )),
3741 _ => Err(Error::PlanError(format!("unknown function: {}", name))),
3742 }
3743}
3744
3745fn like_matches(value: &str, pattern: &str) -> bool {
3746 let value_chars = value.chars().collect::<Vec<_>>();
3747 let pattern_chars = pattern.chars().collect::<Vec<_>>();
3748 let (mut vi, mut pi) = (0usize, 0usize);
3749 let (mut star_idx, mut match_idx) = (None, 0usize);
3750
3751 while vi < value_chars.len() {
3752 if pi < pattern_chars.len()
3753 && (pattern_chars[pi] == '_' || pattern_chars[pi] == value_chars[vi])
3754 {
3755 vi += 1;
3756 pi += 1;
3757 } else if pi < pattern_chars.len() && pattern_chars[pi] == '%' {
3758 star_idx = Some(pi);
3759 match_idx = vi;
3760 pi += 1;
3761 } else if let Some(star) = star_idx {
3762 pi = star + 1;
3763 match_idx += 1;
3764 vi = match_idx;
3765 } else {
3766 return false;
3767 }
3768 }
3769
3770 while pi < pattern_chars.len() && pattern_chars[pi] == '%' {
3771 pi += 1;
3772 }
3773
3774 pi == pattern_chars.len()
3775}
3776
3777fn resolve_in_subqueries(
3778 db: &Database,
3779 expr: &Expr,
3780 params: &HashMap<String, Value>,
3781 tx: Option<TxId>,
3782) -> Result<Expr> {
3783 resolve_in_subqueries_with_ctes(db, expr, params, tx, &[])
3784}
3785
3786pub(crate) fn resolve_in_subqueries_with_ctes(
3787 db: &Database,
3788 expr: &Expr,
3789 params: &HashMap<String, Value>,
3790 tx: Option<TxId>,
3791 ctes: &[Cte],
3792) -> Result<Expr> {
3793 match expr {
3794 Expr::InSubquery {
3795 expr,
3796 subquery,
3797 negated,
3798 } => {
3799 let mut subquery_tables: std::collections::HashSet<String> = subquery
3801 .from
3802 .iter()
3803 .filter_map(|item| match item {
3804 contextdb_parser::ast::FromItem::Table { name, .. } => Some(name.clone()),
3805 _ => None,
3806 })
3807 .collect();
3808 for cte in ctes {
3810 match cte {
3811 Cte::SqlCte { name, .. } | Cte::MatchCte { name, .. } => {
3812 subquery_tables.insert(name.clone());
3813 }
3814 }
3815 }
3816 if let Some(where_clause) = &subquery.where_clause
3817 && has_outer_table_ref(where_clause, &subquery_tables)
3818 {
3819 return Err(Error::Other(
3820 "correlated subqueries are not supported".to_string(),
3821 ));
3822 }
3823
3824 let query_plan = plan(&Statement::Select(SelectStatement {
3825 ctes: ctes.to_vec(),
3826 body: (**subquery).clone(),
3827 }))?;
3828 let result = execute_plan(db, &query_plan, params, tx)?;
3829 let select_expr = subquery
3830 .columns
3831 .first()
3832 .map(|column| column.expr.clone())
3833 .ok_or_else(|| Error::PlanError("subquery must select one column".to_string()))?;
3834 let list = result
3835 .rows
3836 .iter()
3837 .map(|row| eval_project_expr(&select_expr, row, &result.columns, params))
3838 .collect::<Result<Vec<_>>>()?
3839 .into_iter()
3840 .map(value_to_literal)
3841 .collect::<Result<Vec<_>>>()?;
3842 Ok(Expr::InList {
3843 expr: Box::new(resolve_in_subqueries_with_ctes(db, expr, params, tx, ctes)?),
3844 list,
3845 negated: *negated,
3846 })
3847 }
3848 Expr::BinaryOp { left, op, right } => Ok(Expr::BinaryOp {
3849 left: Box::new(resolve_in_subqueries_with_ctes(db, left, params, tx, ctes)?),
3850 op: *op,
3851 right: Box::new(resolve_in_subqueries_with_ctes(
3852 db, right, params, tx, ctes,
3853 )?),
3854 }),
3855 Expr::UnaryOp { op, operand } => Ok(Expr::UnaryOp {
3856 op: *op,
3857 operand: Box::new(resolve_in_subqueries_with_ctes(
3858 db, operand, params, tx, ctes,
3859 )?),
3860 }),
3861 Expr::InList {
3862 expr,
3863 list,
3864 negated,
3865 } => Ok(Expr::InList {
3866 expr: Box::new(resolve_in_subqueries_with_ctes(db, expr, params, tx, ctes)?),
3867 list: list
3868 .iter()
3869 .map(|item| resolve_in_subqueries_with_ctes(db, item, params, tx, ctes))
3870 .collect::<Result<Vec<_>>>()?,
3871 negated: *negated,
3872 }),
3873 Expr::Like {
3874 expr,
3875 pattern,
3876 negated,
3877 } => Ok(Expr::Like {
3878 expr: Box::new(resolve_in_subqueries_with_ctes(db, expr, params, tx, ctes)?),
3879 pattern: Box::new(resolve_in_subqueries_with_ctes(
3880 db, pattern, params, tx, ctes,
3881 )?),
3882 negated: *negated,
3883 }),
3884 Expr::IsNull { expr, negated } => Ok(Expr::IsNull {
3885 expr: Box::new(resolve_in_subqueries_with_ctes(db, expr, params, tx, ctes)?),
3886 negated: *negated,
3887 }),
3888 Expr::FunctionCall { name, args } => Ok(Expr::FunctionCall {
3889 name: name.clone(),
3890 args: args
3891 .iter()
3892 .map(|arg| resolve_in_subqueries_with_ctes(db, arg, params, tx, ctes))
3893 .collect::<Result<Vec<_>>>()?,
3894 }),
3895 _ => Ok(expr.clone()),
3896 }
3897}
3898
3899fn has_outer_table_ref(expr: &Expr, subquery_tables: &std::collections::HashSet<String>) -> bool {
3900 match expr {
3901 Expr::Column(ColumnRef {
3902 table: Some(table), ..
3903 }) => !subquery_tables.contains(table),
3904 Expr::BinaryOp { left, right, .. } => {
3905 has_outer_table_ref(left, subquery_tables)
3906 || has_outer_table_ref(right, subquery_tables)
3907 }
3908 Expr::UnaryOp { operand, .. } => has_outer_table_ref(operand, subquery_tables),
3909 Expr::InList { expr, list, .. } => {
3910 has_outer_table_ref(expr, subquery_tables)
3911 || list
3912 .iter()
3913 .any(|item| has_outer_table_ref(item, subquery_tables))
3914 }
3915 Expr::IsNull { expr, .. } => has_outer_table_ref(expr, subquery_tables),
3916 Expr::Like { expr, pattern, .. } => {
3917 has_outer_table_ref(expr, subquery_tables)
3918 || has_outer_table_ref(pattern, subquery_tables)
3919 }
3920 Expr::FunctionCall { args, .. } => args
3921 .iter()
3922 .any(|arg| has_outer_table_ref(arg, subquery_tables)),
3923 _ => false,
3924 }
3925}
3926
3927fn value_to_literal(value: Value) -> Result<Expr> {
3928 Ok(Expr::Literal(match value {
3929 Value::Null => Literal::Null,
3930 Value::Bool(v) => Literal::Bool(v),
3931 Value::Int64(v) => Literal::Integer(v),
3932 Value::Float64(v) => Literal::Real(v),
3933 Value::Text(v) => Literal::Text(v),
3934 Value::Uuid(v) => Literal::Text(v.to_string()),
3935 Value::Timestamp(v) => Literal::Integer(v),
3936 other => {
3937 return Err(Error::PlanError(format!(
3938 "unsupported subquery result value: {:?}",
3939 other
3940 )));
3941 }
3942 }))
3943}
3944
3945fn query_result_row_matches(
3946 row: &[Value],
3947 columns: &[String],
3948 expr: &Expr,
3949 params: &HashMap<String, Value>,
3950) -> Result<bool> {
3951 Ok(eval_query_result_bool_expr(row, columns, expr, params)?.unwrap_or(false))
3952}
3953
3954fn eval_query_result_bool_expr(
3955 row: &[Value],
3956 columns: &[String],
3957 expr: &Expr,
3958 params: &HashMap<String, Value>,
3959) -> Result<Option<bool>> {
3960 match expr {
3961 Expr::BinaryOp { left, op, right } => match op {
3962 BinOp::Eq | BinOp::Neq | BinOp::Lt | BinOp::Lte | BinOp::Gt | BinOp::Gte => {
3963 let left = eval_query_result_expr(left, row, columns, params)?;
3964 let right = eval_query_result_expr(right, row, columns, params)?;
3965 if left == Value::Null || right == Value::Null {
3966 return Ok(None);
3967 }
3968
3969 let result = match op {
3970 BinOp::Eq => {
3971 compare_values(&left, &right) == Some(Ordering::Equal) || left == right
3972 }
3973 BinOp::Neq => {
3974 !(compare_values(&left, &right) == Some(Ordering::Equal) || left == right)
3975 }
3976 BinOp::Lt => compare_values(&left, &right) == Some(Ordering::Less),
3977 BinOp::Lte => matches!(
3978 compare_values(&left, &right),
3979 Some(Ordering::Less | Ordering::Equal)
3980 ),
3981 BinOp::Gt => compare_values(&left, &right) == Some(Ordering::Greater),
3982 BinOp::Gte => matches!(
3983 compare_values(&left, &right),
3984 Some(Ordering::Greater | Ordering::Equal)
3985 ),
3986 BinOp::And | BinOp::Or => unreachable!(),
3987 };
3988 Ok(Some(result))
3989 }
3990 BinOp::And => {
3991 let left = eval_query_result_bool_expr(row, columns, left, params)?;
3992 if left == Some(false) {
3993 return Ok(Some(false));
3994 }
3995 let right = eval_query_result_bool_expr(row, columns, right, params)?;
3996 Ok(match (left, right) {
3997 (Some(true), Some(true)) => Some(true),
3998 (Some(true), other) => other,
3999 (None, Some(false)) => Some(false),
4000 (None, Some(true)) | (None, None) => None,
4001 (Some(false), _) => Some(false),
4002 })
4003 }
4004 BinOp::Or => {
4005 let left = eval_query_result_bool_expr(row, columns, left, params)?;
4006 if left == Some(true) {
4007 return Ok(Some(true));
4008 }
4009 let right = eval_query_result_bool_expr(row, columns, right, params)?;
4010 Ok(match (left, right) {
4011 (Some(false), Some(false)) => Some(false),
4012 (Some(false), other) => other,
4013 (None, Some(true)) => Some(true),
4014 (None, Some(false)) | (None, None) => None,
4015 (Some(true), _) => Some(true),
4016 })
4017 }
4018 },
4019 Expr::UnaryOp {
4020 op: UnaryOp::Not,
4021 operand,
4022 } => Ok(eval_query_result_bool_expr(row, columns, operand, params)?.map(|value| !value)),
4023 Expr::InList {
4024 expr,
4025 list,
4026 negated,
4027 } => {
4028 let needle = eval_query_result_expr(expr, row, columns, params)?;
4029 if needle == Value::Null {
4030 return Ok(None);
4031 }
4032
4033 let matched = list.iter().try_fold(false, |found, item| {
4034 if found {
4035 Ok(true)
4036 } else {
4037 let candidate = eval_query_result_expr(item, row, columns, params)?;
4038 Ok(
4039 matches!(compare_values(&needle, &candidate), Some(Ordering::Equal))
4040 || (candidate != Value::Null && needle == candidate),
4041 )
4042 }
4043 })?;
4044 Ok(Some(if *negated { !matched } else { matched }))
4045 }
4046 Expr::InSubquery { .. } => Err(Error::PlanError(
4047 "IN (subquery) must be resolved before execution".to_string(),
4048 )),
4049 Expr::Like {
4050 expr,
4051 pattern,
4052 negated,
4053 } => {
4054 let left = eval_query_result_expr(expr, row, columns, params)?;
4055 let right = eval_query_result_expr(pattern, row, columns, params)?;
4056 let matched = match (left, right) {
4057 (Value::Text(value), Value::Text(pattern)) => like_matches(&value, &pattern),
4058 _ => false,
4059 };
4060 Ok(Some(if *negated { !matched } else { matched }))
4061 }
4062 Expr::IsNull { expr, negated } => {
4063 let is_null = eval_query_result_expr(expr, row, columns, params)? == Value::Null;
4064 Ok(Some(if *negated { !is_null } else { is_null }))
4065 }
4066 Expr::FunctionCall { .. } => match eval_query_result_expr(expr, row, columns, params)? {
4067 Value::Bool(value) => Ok(Some(value)),
4068 Value::Null => Ok(None),
4069 _ => Err(Error::PlanError(format!(
4070 "unsupported WHERE expression: {:?}",
4071 expr
4072 ))),
4073 },
4074 _ => Err(Error::PlanError(format!(
4075 "unsupported WHERE expression: {:?}",
4076 expr
4077 ))),
4078 }
4079}
4080
4081fn lookup_query_result_column(
4082 row: &[Value],
4083 input_columns: &[String],
4084 column_ref: &ColumnRef,
4085) -> Result<Value> {
4086 if let Some(table) = &column_ref.table {
4087 let qualified = format!("{table}.{}", column_ref.column);
4088 let idx = input_columns
4091 .iter()
4092 .position(|name| name == &qualified)
4093 .or_else(|| {
4094 input_columns
4095 .iter()
4096 .position(|name| name == &column_ref.column)
4097 })
4098 .ok_or_else(|| Error::PlanError(format!("project column not found: {}", qualified)))?;
4099 return Ok(row.get(idx).cloned().unwrap_or(Value::Null));
4100 }
4101
4102 let matches = input_columns
4103 .iter()
4104 .enumerate()
4105 .filter_map(|(idx, name)| {
4106 (name == &column_ref.column
4107 || name.rsplit('.').next() == Some(column_ref.column.as_str()))
4108 .then_some(idx)
4109 })
4110 .collect::<Vec<_>>();
4111
4112 match matches.as_slice() {
4113 [] => Err(Error::PlanError(format!(
4114 "project column not found: {}",
4115 column_ref.column
4116 ))),
4117 [idx] => Ok(row.get(*idx).cloned().unwrap_or(Value::Null)),
4118 _ => Err(Error::PlanError(format!(
4119 "ambiguous column reference: {}",
4120 column_ref.column
4121 ))),
4122 }
4123}
4124
4125fn concatenate_rows(left: &[Value], right: &[Value]) -> Vec<Value> {
4126 let mut combined = Vec::with_capacity(left.len() + right.len());
4127 combined.extend_from_slice(left);
4128 combined.extend_from_slice(right);
4129 combined
4130}
4131
4132fn duplicate_column_names(left: &[String], right: &[String]) -> BTreeSet<String> {
4133 let left_names = left
4134 .iter()
4135 .map(|column| column.rsplit('.').next().unwrap_or(column.as_str()))
4136 .collect::<BTreeSet<_>>();
4137 right
4138 .iter()
4139 .filter_map(|column| {
4140 let bare = column.rsplit('.').next().unwrap_or(column.as_str());
4141 left_names.contains(bare).then(|| bare.to_string())
4142 })
4143 .collect()
4144}
4145
4146fn qualify_join_columns(
4147 columns: &[String],
4148 left_columns: &[String],
4149 right_columns: &[String],
4150 left_alias: &Option<String>,
4151 right_prefix: &str,
4152) -> Vec<String> {
4153 let left_prefix = left_alias.as_deref();
4154 columns
4155 .iter()
4156 .enumerate()
4157 .map(|(idx, column)| {
4158 if idx < left_columns.len() {
4159 if let Some(prefix) = left_prefix {
4160 format!(
4161 "{prefix}.{}",
4162 left_columns[idx].rsplit('.').next().unwrap_or(column)
4163 )
4164 } else {
4165 left_columns[idx].clone()
4166 }
4167 } else {
4168 let right_idx = idx - left_columns.len();
4169 let bare = right_columns[right_idx]
4170 .rsplit('.')
4171 .next()
4172 .unwrap_or(right_columns[right_idx].as_str());
4173 if column == bare {
4174 format!("{right_prefix}.{bare}")
4175 } else {
4176 column.clone()
4177 }
4178 }
4179 })
4180 .collect()
4181}
4182
4183fn right_table_name(plan: &PhysicalPlan) -> String {
4184 match plan {
4185 PhysicalPlan::Scan { table, alias, .. } => alias.clone().unwrap_or_else(|| table.clone()),
4186 _ => "right".to_string(),
4187 }
4188}
4189
4190fn distinct_row_key(row: &[Value]) -> Vec<u8> {
4191 bincode::serde::encode_to_vec(row, bincode::config::standard())
4192 .expect("query rows should serialize for DISTINCT")
4193}
4194
4195fn resolve_uuid(expr: &Expr, params: &HashMap<String, Value>) -> Result<uuid::Uuid> {
4196 match resolve_expr(expr, params)? {
4197 Value::Uuid(u) => Ok(u),
4198 Value::Text(t) => uuid::Uuid::parse_str(&t)
4199 .map_err(|e| Error::PlanError(format!("invalid uuid '{}': {}", t, e))),
4200 _ => Err(Error::PlanError(
4201 "graph start node must be UUID".to_string(),
4202 )),
4203 }
4204}
4205
4206fn resolve_graph_start_nodes_from_filter(
4209 db: &Database,
4210 filter: &Expr,
4211 params: &HashMap<String, Value>,
4212) -> Result<Vec<uuid::Uuid>> {
4213 if let Some(ids) = resolve_graph_start_ids_from_filter(filter, params)? {
4214 return Ok(ids);
4215 }
4216
4217 let (col_name, expected_value) = match filter {
4219 Expr::BinaryOp {
4220 left,
4221 op: BinOp::Eq,
4222 right,
4223 } => {
4224 if let Some(col) = extract_column_name(left) {
4225 (col, resolve_expr(right, params)?)
4226 } else if let Some(col) = extract_column_name(right) {
4227 (col, resolve_expr(left, params)?)
4228 } else {
4229 return Ok(vec![]);
4230 }
4231 }
4232 _ => return Ok(vec![]),
4233 };
4234
4235 let snapshot = db.snapshot();
4236 let mut uuids = Vec::new();
4237 for table_name in db.table_names() {
4238 let meta = match db.table_meta(&table_name) {
4239 Some(m) => m,
4240 None => continue,
4241 };
4242 let has_col = meta.columns.iter().any(|c| c.name == col_name);
4244 let has_id = meta.columns.iter().any(|c| c.name == "id");
4245 if !has_col || !has_id {
4246 continue;
4247 }
4248 let rows = db.scan_filter(&table_name, snapshot, &|row| {
4249 row.values.get(&col_name) == Some(&expected_value)
4250 })?;
4251 for row in rows {
4252 if let Some(Value::Uuid(id)) = row.values.get("id") {
4253 uuids.push(*id);
4254 }
4255 }
4256 }
4257 Ok(uuids)
4258}
4259
4260fn resolve_graph_start_nodes_from_plan(
4261 db: &Database,
4262 plan: &PhysicalPlan,
4263 params: &HashMap<String, Value>,
4264 tx: Option<TxId>,
4265) -> Result<Vec<uuid::Uuid>> {
4266 let result = execute_plan(db, plan, params, tx)?;
4267 result
4268 .rows
4269 .into_iter()
4270 .filter_map(|row| row.into_iter().next())
4271 .map(|value| match value {
4272 Value::Uuid(id) => Ok(id),
4273 Value::Text(text) => uuid::Uuid::parse_str(&text)
4274 .map_err(|_| Error::PlanError(format!("invalid UUID in graph start plan: {text}"))),
4275 other => Err(Error::PlanError(format!(
4276 "invalid graph start identifier from plan: {other:?}"
4277 ))),
4278 })
4279 .collect()
4280}
4281
4282fn resolve_graph_start_ids_from_filter(
4283 filter: &Expr,
4284 params: &HashMap<String, Value>,
4285) -> Result<Option<Vec<uuid::Uuid>>> {
4286 match filter {
4287 Expr::BinaryOp {
4288 left,
4289 op: BinOp::Eq,
4290 right,
4291 } if is_graph_id_ref(left) || is_graph_id_ref(right) => {
4292 let value = if is_graph_id_ref(left) {
4293 resolve_expr(right, params)?
4294 } else {
4295 resolve_expr(left, params)?
4296 };
4297 let id = match value {
4298 Value::Uuid(id) => id,
4299 Value::Text(text) => uuid::Uuid::parse_str(&text).map_err(|_| {
4300 Error::PlanError(format!("invalid UUID in graph filter: {text}"))
4301 })?,
4302 other => {
4303 return Err(Error::PlanError(format!(
4304 "invalid graph start identifier in filter: {other:?}"
4305 )));
4306 }
4307 };
4308 Ok(Some(vec![id]))
4309 }
4310 Expr::InList { expr, list, .. } if is_graph_id_ref(expr) => {
4311 let ids = list
4312 .iter()
4313 .map(|item| resolve_expr(item, params))
4314 .map(|value| match value? {
4315 Value::Uuid(id) => Ok(id),
4316 Value::Text(text) => uuid::Uuid::parse_str(&text).map_err(|_| {
4317 Error::PlanError(format!("invalid UUID in graph filter: {text}"))
4318 }),
4319 other => Err(Error::PlanError(format!(
4320 "invalid graph start identifier in filter: {other:?}"
4321 ))),
4322 })
4323 .collect::<Result<Vec<_>>>()?;
4324 Ok(Some(ids))
4325 }
4326 Expr::BinaryOp { left, right, .. } => {
4327 if let Some(ids) = resolve_graph_start_ids_from_filter(left, params)? {
4328 return Ok(Some(ids));
4329 }
4330 resolve_graph_start_ids_from_filter(right, params)
4331 }
4332 Expr::UnaryOp { operand, .. } => resolve_graph_start_ids_from_filter(operand, params),
4333 _ => Ok(None),
4334 }
4335}
4336
4337fn is_graph_id_ref(expr: &Expr) -> bool {
4338 matches!(
4339 expr,
4340 Expr::Column(contextdb_parser::ast::ColumnRef { column, .. }) if column == "id"
4341 )
4342}
4343
4344fn extract_column_name(expr: &Expr) -> Option<String> {
4346 match expr {
4347 Expr::Column(contextdb_parser::ast::ColumnRef { column, .. }) => Some(column.clone()),
4348 _ => None,
4349 }
4350}
4351
4352fn resolve_vector_from_expr(expr: &Expr, params: &HashMap<String, Value>) -> Result<Vec<f32>> {
4353 match resolve_expr(expr, params)? {
4354 Value::Vector(v) => Ok(v),
4355 Value::Text(text) if text.trim_start().starts_with('[') => parse_text_vector_literal(&text),
4356 Value::Text(name) => match params.get(&name) {
4357 Some(Value::Vector(v)) => Ok(v.clone()),
4358 _ => Err(Error::PlanError("vector parameter missing".to_string())),
4359 },
4360 _ => Err(Error::PlanError(
4361 "invalid vector query expression".to_string(),
4362 )),
4363 }
4364}
4365
4366fn validate_vector_columns(
4367 db: &Database,
4368 table: &str,
4369 values: &HashMap<String, Value>,
4370) -> Result<()> {
4371 let Some(meta) = db.table_meta(table) else {
4372 return Ok(());
4373 };
4374
4375 for column in &meta.columns {
4376 if let contextdb_core::ColumnType::Vector(expected) = column.column_type
4377 && let Some(Value::Vector(vector)) = values.get(&column.name)
4378 {
4379 let got = vector.len();
4380 if got != expected {
4381 return Err(vector_dimension_error(table, &column.name, expected, got));
4382 }
4383 }
4384 }
4385
4386 Ok(())
4387}
4388
4389fn vector_columns_for_meta(meta: &TableMeta) -> Vec<String> {
4390 meta.columns
4391 .iter()
4392 .filter(|column| matches!(column.column_type, contextdb_core::ColumnType::Vector(_)))
4393 .map(|column| column.name.clone())
4394 .collect()
4395}
4396
4397fn vector_values_for_table(
4398 db: &Database,
4399 table: &str,
4400 values: &HashMap<String, Value>,
4401) -> Vec<(String, Vec<f32>)> {
4402 db.table_meta(table)
4403 .map(|meta| {
4404 meta.columns
4405 .iter()
4406 .filter_map(|column| match column.column_type {
4407 contextdb_core::ColumnType::Vector(_) => match values.get(&column.name) {
4408 Some(Value::Vector(vector)) => Some((column.name.clone(), vector.clone())),
4409 _ => None,
4410 },
4411 _ => None,
4412 })
4413 .collect()
4414 })
4415 .unwrap_or_default()
4416}
4417
4418fn vector_indexes_for_table(db: &Database, table: &str) -> Vec<contextdb_core::VectorIndexRef> {
4419 db.table_meta(table)
4420 .map(|meta| {
4421 meta.columns
4422 .iter()
4423 .filter(|column| {
4424 matches!(column.column_type, contextdb_core::ColumnType::Vector(_))
4425 })
4426 .map(|column| contextdb_core::VectorIndexRef::new(table, column.name.clone()))
4427 .collect()
4428 })
4429 .unwrap_or_default()
4430}
4431
4432pub(crate) fn coerce_into_column(
4433 db: &Database,
4434 table: &str,
4435 col: &str,
4436 v: Value,
4437 current_tx_max: Option<TxId>,
4438 active_tx: Option<TxId>,
4439) -> Result<Value> {
4440 coerce_value_for_column(db, table, col, v, current_tx_max, active_tx)
4441}
4442
4443fn coerce_value_for_column(
4444 db: &Database,
4445 table: &str,
4446 col_name: &str,
4447 v: Value,
4448 current_tx_max: Option<TxId>,
4449 active_tx: Option<TxId>,
4450) -> Result<Value> {
4451 let Some(meta) = db.table_meta(table) else {
4452 if let Value::TxId(_) = &v {
4454 return Err(Error::ColumnTypeMismatch {
4455 table: table.to_string(),
4456 column: col_name.to_string(),
4457 expected: "UNKNOWN",
4458 actual: "TxId",
4459 });
4460 }
4461 return Ok(coerce_uuid_if_needed(col_name, v));
4462 };
4463 coerce_value_for_column_with_meta(table, &meta, col_name, v, current_tx_max, active_tx)
4464}
4465
4466fn coerce_value_for_column_with_meta(
4467 table: &str,
4468 meta: &TableMeta,
4469 col_name: &str,
4470 v: Value,
4471 current_tx_max: Option<TxId>,
4472 active_tx: Option<TxId>,
4473) -> Result<Value> {
4474 let Some(col) = meta.columns.iter().find(|c| c.name == col_name) else {
4475 if let Value::TxId(_) = &v {
4476 return Err(Error::ColumnTypeMismatch {
4477 table: table.to_string(),
4478 column: col_name.to_string(),
4479 expected: "UNKNOWN",
4480 actual: "TxId",
4481 });
4482 }
4483 return Ok(coerce_uuid_if_needed(col_name, v));
4484 };
4485
4486 match col.column_type {
4487 contextdb_core::ColumnType::Uuid => match v {
4488 Value::TxId(_) => Err(Error::ColumnTypeMismatch {
4489 table: table.to_string(),
4490 column: col_name.to_string(),
4491 expected: "UUID",
4492 actual: "TxId",
4493 }),
4494 other => coerce_uuid_value(other),
4495 },
4496 contextdb_core::ColumnType::Timestamp => match v {
4497 Value::TxId(_) => Err(Error::ColumnTypeMismatch {
4498 table: table.to_string(),
4499 column: col_name.to_string(),
4500 expected: "TIMESTAMP",
4501 actual: "TxId",
4502 }),
4503 other => coerce_timestamp_value(other),
4504 },
4505 contextdb_core::ColumnType::Vector(dim) => match v {
4506 Value::TxId(_) => Err(Error::ColumnTypeMismatch {
4507 table: table.to_string(),
4508 column: col_name.to_string(),
4509 expected: format_vector_type(dim),
4510 actual: "TxId",
4511 }),
4512 other => coerce_vector_value(table, col_name, other, dim),
4513 },
4514 contextdb_core::ColumnType::Integer => match v {
4515 Value::TxId(_) => Err(Error::ColumnTypeMismatch {
4516 table: table.to_string(),
4517 column: col_name.to_string(),
4518 expected: "INTEGER",
4519 actual: "TxId",
4520 }),
4521 other => Ok(coerce_uuid_if_needed(col_name, other)),
4522 },
4523 contextdb_core::ColumnType::Real => match v {
4524 Value::TxId(_) => Err(Error::ColumnTypeMismatch {
4525 table: table.to_string(),
4526 column: col_name.to_string(),
4527 expected: "REAL",
4528 actual: "TxId",
4529 }),
4530 other => Ok(coerce_uuid_if_needed(col_name, other)),
4531 },
4532 contextdb_core::ColumnType::Text => match v {
4533 Value::TxId(_) => Err(Error::ColumnTypeMismatch {
4534 table: table.to_string(),
4535 column: col_name.to_string(),
4536 expected: "TEXT",
4537 actual: "TxId",
4538 }),
4539 other => Ok(coerce_uuid_if_needed(col_name, other)),
4540 },
4541 contextdb_core::ColumnType::Boolean => match v {
4542 Value::TxId(_) => Err(Error::ColumnTypeMismatch {
4543 table: table.to_string(),
4544 column: col_name.to_string(),
4545 expected: "BOOLEAN",
4546 actual: "TxId",
4547 }),
4548 other => Ok(coerce_uuid_if_needed(col_name, other)),
4549 },
4550 contextdb_core::ColumnType::Json => match v {
4551 Value::TxId(_) => Err(Error::ColumnTypeMismatch {
4552 table: table.to_string(),
4553 column: col_name.to_string(),
4554 expected: "JSON",
4555 actual: "TxId",
4556 }),
4557 other => Ok(coerce_uuid_if_needed(col_name, other)),
4558 },
4559 contextdb_core::ColumnType::TxId => {
4560 coerce_txid_value(table, col_name, v, col.nullable, current_tx_max, active_tx)
4561 }
4562 }
4563}
4564
4565fn format_vector_type(dim: usize) -> &'static str {
4566 match dim {
4568 1 => "VECTOR(1)",
4569 2 => "VECTOR(2)",
4570 3 => "VECTOR(3)",
4571 4 => "VECTOR(4)",
4572 8 => "VECTOR(8)",
4573 16 => "VECTOR(16)",
4574 32 => "VECTOR(32)",
4575 64 => "VECTOR(64)",
4576 128 => "VECTOR(128)",
4577 256 => "VECTOR(256)",
4578 512 => "VECTOR(512)",
4579 768 => "VECTOR(768)",
4580 1024 => "VECTOR(1024)",
4581 1536 => "VECTOR(1536)",
4582 3072 => "VECTOR(3072)",
4583 _ => "VECTOR",
4584 }
4585}
4586
4587fn coerce_txid_value(
4588 table: &str,
4589 col: &str,
4590 v: Value,
4591 nullable: bool,
4592 current_tx_max: Option<TxId>,
4593 active_tx: Option<TxId>,
4594) -> Result<Value> {
4595 match v {
4596 Value::Null => {
4597 if nullable {
4598 Ok(Value::Null)
4599 } else {
4600 Err(Error::ColumnNotNullable {
4601 table: table.to_string(),
4602 column: col.to_string(),
4603 })
4604 }
4605 }
4606 Value::TxId(tx) => {
4607 if let Some(max) = current_tx_max {
4616 let ceiling = max.0.max(active_tx.map(|t| t.0).unwrap_or(0));
4617 if tx.0 > ceiling {
4618 return Err(Error::TxIdOutOfRange {
4619 table: table.to_string(),
4620 column: col.to_string(),
4621 value: tx.0,
4622 max: max.0,
4623 });
4624 }
4625 }
4626 Ok(Value::TxId(tx))
4627 }
4628 other => Err(Error::ColumnTypeMismatch {
4629 table: table.to_string(),
4630 column: col.to_string(),
4631 expected: "TXID",
4632 actual: value_variant_name(&other),
4633 }),
4634 }
4635}
4636
4637fn value_variant_name(v: &Value) -> &'static str {
4638 match v {
4639 Value::Null => "Null",
4640 Value::Bool(_) => "Bool",
4641 Value::Int64(_) => "Int64",
4642 Value::Float64(_) => "Float64",
4643 Value::Text(_) => "Text",
4644 Value::Uuid(_) => "Uuid",
4645 Value::Timestamp(_) => "Timestamp",
4646 Value::Json(_) => "Json",
4647 Value::Vector(_) => "Vector",
4648 Value::TxId(_) => "TxId",
4649 }
4650}
4651
4652fn coerce_uuid_value(v: Value) -> Result<Value> {
4653 match v {
4654 Value::Null => Ok(Value::Null),
4655 Value::Uuid(id) => Ok(Value::Uuid(id)),
4656 Value::Text(text) => uuid::Uuid::parse_str(&text)
4657 .map(Value::Uuid)
4658 .map_err(|err| Error::Other(format!("invalid UUID literal '{text}': {err}"))),
4659 other => Err(Error::Other(format!(
4660 "UUID column requires UUID or text literal, got {other:?}"
4661 ))),
4662 }
4663}
4664
4665fn coerce_uuid_if_needed(col: &str, v: Value) -> Value {
4666 if (col == "id" || col.ends_with("_id"))
4667 && let Value::Text(s) = &v
4668 && let Ok(u) = uuid::Uuid::parse_str(s)
4669 {
4670 return Value::Uuid(u);
4671 }
4672 v
4673}
4674
4675fn coerce_timestamp_value(v: Value) -> Result<Value> {
4676 match v {
4677 Value::Null => Ok(Value::Null),
4678 Value::Text(text) if text.eq_ignore_ascii_case("infinity") => {
4679 Ok(Value::Timestamp(i64::MAX))
4680 }
4681 Value::Text(text) => {
4682 let parsed = OffsetDateTime::parse(&text, &Rfc3339).map_err(|err| {
4683 Error::Other(format!("invalid TIMESTAMP literal '{text}': {err}"))
4684 })?;
4685 Ok(Value::Timestamp(
4686 parsed.unix_timestamp_nanos() as i64 / 1_000_000,
4687 ))
4688 }
4689 other => Ok(other),
4690 }
4691}
4692
4693fn coerce_vector_value(table: &str, column: &str, v: Value, expected_dim: usize) -> Result<Value> {
4694 let vector = match v {
4695 Value::Null => return Ok(Value::Null),
4696 Value::Vector(vector) => vector,
4697 Value::Text(text) => parse_text_vector_literal(&text)?,
4698 other => return Ok(other),
4699 };
4700
4701 if vector.len() != expected_dim {
4702 return Err(vector_dimension_error(
4703 table,
4704 column,
4705 expected_dim,
4706 vector.len(),
4707 ));
4708 }
4709
4710 Ok(Value::Vector(vector))
4711}
4712
4713fn vector_dimension_error(table: &str, column: &str, expected: usize, got: usize) -> Error {
4714 Error::VectorIndexDimensionMismatch {
4715 index: contextdb_core::VectorIndexRef::new(table, column),
4716 expected,
4717 actual: got,
4718 }
4719}
4720
4721fn parse_text_vector_literal(text: &str) -> Result<Vec<f32>> {
4722 let trimmed = text.trim();
4723 let inner = trimmed
4724 .strip_prefix('[')
4725 .and_then(|s| s.strip_suffix(']'))
4726 .ok_or_else(|| Error::Other(format!("invalid VECTOR literal '{text}'")))?;
4727
4728 if inner.trim().is_empty() {
4729 return Ok(Vec::new());
4730 }
4731
4732 inner
4733 .split(',')
4734 .map(|part| {
4735 part.trim().parse::<f32>().map_err(|err| {
4736 Error::Other(format!("invalid VECTOR component '{}': {err}", part.trim()))
4737 })
4738 })
4739 .collect()
4740}
4741
4742fn apply_missing_column_defaults(
4743 db: &Database,
4744 table: &str,
4745 values: &mut HashMap<String, Value>,
4746 active_tx: Option<TxId>,
4747) -> Result<()> {
4748 let Some(meta) = db.table_meta(table) else {
4749 return Ok(());
4750 };
4751
4752 let current_tx_max = Some(db.committed_watermark());
4753
4754 for column in &meta.columns {
4755 if values.contains_key(&column.name) {
4756 continue;
4757 }
4758 let Some(default) = &column.default else {
4759 continue;
4760 };
4761 let value = evaluate_stored_default_expr(default)?;
4762 values.insert(
4763 column.name.clone(),
4764 coerce_value_for_column(db, table, &column.name, value, current_tx_max, active_tx)?,
4765 );
4766 }
4767
4768 Ok(())
4769}
4770
4771fn evaluate_stored_default_expr(default: &str) -> Result<Value> {
4772 if default.eq_ignore_ascii_case("NOW()") {
4773 return eval_function("now", &[]);
4774 }
4775 if default.contains("FunctionCall") && default.contains("name: \"NOW\"") {
4776 return eval_function("now", &[]);
4777 }
4778 if default == "Literal(Null)" || default.eq_ignore_ascii_case("NULL") {
4779 return Ok(Value::Null);
4780 }
4781 if default.eq_ignore_ascii_case("TRUE") {
4782 return Ok(Value::Bool(true));
4783 }
4784 if default.eq_ignore_ascii_case("FALSE") {
4785 return Ok(Value::Bool(false));
4786 }
4787 if default.starts_with('\'') && default.ends_with('\'') && default.len() >= 2 {
4788 return Ok(Value::Text(
4789 default[1..default.len() - 1].replace("''", "'"),
4790 ));
4791 }
4792 if let Some(text) = default
4793 .strip_prefix("Literal(Text(\"")
4794 .and_then(|value| value.strip_suffix("\"))"))
4795 {
4796 return Ok(Value::Text(text.to_string()));
4797 }
4798 if let Some(value) = default
4799 .strip_prefix("Literal(Integer(")
4800 .and_then(|value| value.strip_suffix("))"))
4801 {
4802 let parsed = value.parse::<i64>().map_err(|err| {
4803 Error::Other(format!("invalid stored integer default '{value}': {err}"))
4804 })?;
4805 return Ok(Value::Int64(parsed));
4806 }
4807 if let Some(value) = default
4808 .strip_prefix("Literal(Real(")
4809 .and_then(|value| value.strip_suffix("))"))
4810 {
4811 let parsed = value
4812 .parse::<f64>()
4813 .map_err(|err| Error::Other(format!("invalid stored real default '{value}': {err}")))?;
4814 return Ok(Value::Float64(parsed));
4815 }
4816 if let Some(value) = default
4817 .strip_prefix("Literal(Bool(")
4818 .and_then(|value| value.strip_suffix("))"))
4819 {
4820 let parsed = value
4821 .parse::<bool>()
4822 .map_err(|err| Error::Other(format!("invalid stored bool default '{value}': {err}")))?;
4823 return Ok(Value::Bool(parsed));
4824 }
4825
4826 Err(Error::Other(format!(
4827 "unsupported stored DEFAULT expression: {default}"
4828 )))
4829}
4830
4831pub(crate) fn stored_default_expr(expr: &Expr) -> String {
4832 match expr {
4833 Expr::Literal(Literal::Null) => "NULL".to_string(),
4834 Expr::Literal(Literal::Bool(value)) => {
4835 if *value {
4836 "TRUE".to_string()
4837 } else {
4838 "FALSE".to_string()
4839 }
4840 }
4841 Expr::Literal(Literal::Integer(value)) => value.to_string(),
4842 Expr::Literal(Literal::Real(value)) => value.to_string(),
4843 Expr::Literal(Literal::Text(value)) => format!("'{}'", value.replace('\'', "''")),
4844 Expr::FunctionCall { name, args }
4845 if name.eq_ignore_ascii_case("NOW") && args.is_empty() =>
4846 {
4847 "NOW()".to_string()
4848 }
4849 _ => format!("{expr:?}"),
4850 }
4851}
4852
4853fn validate_expires_column(col: &contextdb_parser::ast::ColumnDef) -> Result<()> {
4854 if col.expires && !matches!(col.data_type, DataType::Timestamp) {
4855 return Err(Error::Other(
4856 "EXPIRES is only valid on TIMESTAMP columns".to_string(),
4857 ));
4858 }
4859 Ok(())
4860}
4861
4862fn expires_column_name(columns: &[contextdb_parser::ast::ColumnDef]) -> Result<Option<String>> {
4863 let mut expires_column = None;
4864 for col in columns {
4865 validate_expires_column(col)?;
4866 if col.expires {
4867 if expires_column.is_some() {
4868 return Err(Error::Other(
4869 "only one EXPIRES column is supported per table".to_string(),
4870 ));
4871 }
4872 expires_column = Some(col.name.clone());
4873 }
4874 }
4875 Ok(expires_column)
4876}
4877
4878pub(crate) fn map_column_type(dtype: &DataType) -> contextdb_core::ColumnType {
4879 match dtype {
4880 DataType::Uuid => contextdb_core::ColumnType::Uuid,
4881 DataType::Text => contextdb_core::ColumnType::Text,
4882 DataType::Integer => contextdb_core::ColumnType::Integer,
4883 DataType::Real => contextdb_core::ColumnType::Real,
4884 DataType::Boolean => contextdb_core::ColumnType::Boolean,
4885 DataType::Timestamp => contextdb_core::ColumnType::Timestamp,
4886 DataType::Json => contextdb_core::ColumnType::Json,
4887 DataType::Vector(dim) => contextdb_core::ColumnType::Vector(*dim as usize),
4888 DataType::TxId => contextdb_core::ColumnType::TxId,
4889 }
4890}
4891
4892pub(crate) fn map_rank_policy(
4893 policy: &contextdb_parser::ast::RankPolicyAst,
4894) -> contextdb_core::RankPolicy {
4895 contextdb_core::RankPolicy {
4896 joined_table: policy.joined_table.clone(),
4897 joined_column: policy.joined_column.clone(),
4898 anchor_column: String::new(),
4899 sort_key: policy.sort_key.clone(),
4900 formula: policy.formula.clone(),
4901 protected_index: String::new(),
4902 }
4903}
4904
4905struct ResolvedRankPolicy {
4906 policy: contextdb_core::RankPolicy,
4907 formula: Arc<RankFormula>,
4908}
4909
4910fn core_column_from_ast(
4911 col: &contextdb_parser::ast::ColumnDef,
4912 rank_policy: Option<contextdb_core::RankPolicy>,
4913) -> contextdb_core::ColumnDef {
4914 contextdb_core::ColumnDef {
4915 name: col.name.clone(),
4916 column_type: map_column_type(&col.data_type),
4917 nullable: col.nullable,
4918 primary_key: col.primary_key,
4919 unique: col.unique,
4920 default: col.default.as_ref().map(stored_default_expr),
4921 references: col
4922 .references
4923 .as_ref()
4924 .map(|reference| contextdb_core::ForeignKeyReference {
4925 table: reference.table.clone(),
4926 column: reference.column.clone(),
4927 }),
4928 expires: col.expires,
4929 immutable: col.immutable,
4930 quantization: map_vector_quantization(col.quantization),
4931 rank_policy,
4932 }
4933}
4934
4935fn ast_column_from_core(col: contextdb_core::ColumnDef) -> contextdb_parser::ast::ColumnDef {
4936 contextdb_parser::ast::ColumnDef {
4937 name: col.name,
4938 data_type: match col.column_type {
4939 ColumnType::Uuid => DataType::Uuid,
4940 ColumnType::Text => DataType::Text,
4941 ColumnType::Integer => DataType::Integer,
4942 ColumnType::Real => DataType::Real,
4943 ColumnType::Boolean => DataType::Boolean,
4944 ColumnType::Timestamp => DataType::Timestamp,
4945 ColumnType::Json => DataType::Json,
4946 ColumnType::Vector(dim) => DataType::Vector(dim as u32),
4947 ColumnType::TxId => DataType::TxId,
4948 },
4949 nullable: col.nullable,
4950 primary_key: col.primary_key,
4951 unique: col.unique,
4952 default: None,
4953 references: None,
4954 expires: col.expires,
4955 immutable: col.immutable,
4956 quantization: match col.quantization {
4957 contextdb_core::VectorQuantization::F32 => {
4958 contextdb_parser::ast::VectorQuantization::F32
4959 }
4960 contextdb_core::VectorQuantization::SQ8 => {
4961 contextdb_parser::ast::VectorQuantization::SQ8
4962 }
4963 contextdb_core::VectorQuantization::SQ4 => {
4964 contextdb_parser::ast::VectorQuantization::SQ4
4965 }
4966 },
4967 rank_policy: None,
4968 }
4969}
4970
4971fn validate_rank_policy_for_column(
4972 db: &Database,
4973 table: &str,
4974 column: &contextdb_parser::ast::ColumnDef,
4975 all_columns: &[contextdb_parser::ast::ColumnDef],
4976) -> Result<Option<ResolvedRankPolicy>> {
4977 let Some(policy_ast) = column.rank_policy.as_deref() else {
4978 return Ok(None);
4979 };
4980 let index = rank_index_name(table, &column.name);
4981 if !matches!(column.data_type, DataType::Vector(_)) {
4982 return Err(Error::RankPolicyColumnType {
4983 index,
4984 column: column.name.clone(),
4985 expected: "VECTOR(N)".to_string(),
4986 actual: data_type_name(&column.data_type).to_string(),
4987 });
4988 }
4989 let joined_meta = db.table_meta(&policy_ast.joined_table).ok_or_else(|| {
4990 Error::RankPolicyJoinTableUnknown {
4991 index: index.clone(),
4992 table: policy_ast.joined_table.clone(),
4993 }
4994 })?;
4995 if !joined_meta
4996 .columns
4997 .iter()
4998 .any(|col| col.name == policy_ast.joined_column)
4999 {
5000 return Err(Error::RankPolicyJoinColumnUnknown {
5001 index: index.clone(),
5002 table: policy_ast.joined_table.clone(),
5003 column: policy_ast.joined_column.clone(),
5004 });
5005 }
5006 let protected_index = protected_rank_policy_index(&joined_meta, &policy_ast.joined_column)
5007 .ok_or_else(|| Error::RankPolicyJoinColumnUnindexed {
5008 index: index.clone(),
5009 joined_table: policy_ast.joined_table.clone(),
5010 column: policy_ast.joined_column.clone(),
5011 })?;
5012 let anchor_column =
5013 resolve_rank_policy_anchor_column(&index, policy_ast, all_columns, &joined_meta)?;
5014 let formula = Arc::new(RankFormula::compile_for_index(&index, &policy_ast.formula)?);
5015 validate_rank_formula_columns(
5016 &index,
5017 &column.name,
5018 all_columns,
5019 &joined_meta,
5020 formula.column_refs(),
5021 )?;
5022 Ok(Some(ResolvedRankPolicy {
5023 policy: contextdb_core::RankPolicy {
5024 joined_table: policy_ast.joined_table.clone(),
5025 joined_column: policy_ast.joined_column.clone(),
5026 anchor_column,
5027 sort_key: policy_ast.sort_key.clone(),
5028 formula: policy_ast.formula.clone(),
5029 protected_index,
5030 },
5031 formula,
5032 }))
5033}
5034
5035fn protected_rank_policy_index(meta: &TableMeta, joined_column: &str) -> Option<String> {
5036 meta.indexes
5037 .iter()
5038 .filter(|index| index.kind == contextdb_core::IndexKind::UserDeclared)
5039 .chain(meta.indexes.iter())
5040 .find(|index| {
5041 index
5042 .columns
5043 .first()
5044 .is_some_and(|(column, _)| column == joined_column)
5045 })
5046 .map(|index| index.name.clone())
5047}
5048
5049fn resolve_rank_policy_anchor_column(
5050 index: &str,
5051 policy: &contextdb_parser::ast::RankPolicyAst,
5052 anchor_columns: &[contextdb_parser::ast::ColumnDef],
5053 joined_meta: &TableMeta,
5054) -> Result<String> {
5055 let joined_column = joined_meta
5056 .columns
5057 .iter()
5058 .find(|col| col.name == policy.joined_column)
5059 .ok_or_else(|| Error::RankPolicyJoinColumnUnknown {
5060 index: index.to_string(),
5061 table: policy.joined_table.clone(),
5062 column: policy.joined_column.clone(),
5063 })?;
5064
5065 let anchor_by_name = |name: &str| anchor_columns.iter().find(|col| col.name == name);
5066 let mut candidates = Vec::new();
5067 if joined_column.primary_key {
5068 let singular = singular_table_name(&policy.joined_table);
5069 for name in [
5070 format!("{singular}_id"),
5071 format!("{}_id", policy.joined_table),
5072 ] {
5073 if anchor_by_name(&name).is_some() && !candidates.contains(&name) {
5074 candidates.push(name);
5075 }
5076 }
5077 }
5078 if candidates.is_empty() && anchor_by_name(&policy.joined_column).is_some() {
5079 candidates.push(policy.joined_column.clone());
5080 }
5081 if candidates.is_empty()
5082 && let Some(primary_key) = anchor_columns.iter().find(|col| col.primary_key)
5083 {
5084 candidates.push(primary_key.name.clone());
5085 }
5086 if candidates.is_empty() && anchor_by_name("id").is_some() {
5087 candidates.push("id".to_string());
5088 }
5089
5090 let anchor_column = match candidates.as_slice() {
5091 [single] => single.clone(),
5092 [] => {
5093 return Err(Error::RankPolicyColumnUnknown {
5094 index: index.to_string(),
5095 column: policy.joined_column.clone(),
5096 });
5097 }
5098 _ => {
5099 return Err(Error::RankPolicyColumnAmbiguous {
5100 index: index.to_string(),
5101 column: candidates.join(","),
5102 });
5103 }
5104 };
5105 let anchor_def =
5106 anchor_by_name(&anchor_column).ok_or_else(|| Error::RankPolicyColumnUnknown {
5107 index: index.to_string(),
5108 column: anchor_column.clone(),
5109 })?;
5110 let anchor_type = map_column_type(&anchor_def.data_type);
5111 if anchor_type != joined_column.column_type {
5112 return Err(Error::RankPolicyColumnType {
5113 index: index.to_string(),
5114 column: anchor_column,
5115 expected: column_type_name(&joined_column.column_type).to_string(),
5116 actual: data_type_name(&anchor_def.data_type).to_string(),
5117 });
5118 }
5119 Ok(anchor_column)
5120}
5121
5122fn singular_table_name(table: &str) -> String {
5123 if let Some(stem) = table.strip_suffix("ies") {
5124 format!("{stem}y")
5125 } else if let Some(stem) = table.strip_suffix('s') {
5126 stem.to_string()
5127 } else {
5128 table.to_string()
5129 }
5130}
5131
5132fn validate_rank_formula_columns(
5133 index: &str,
5134 anchor_vector_column: &str,
5135 anchor_columns: &[contextdb_parser::ast::ColumnDef],
5136 joined_meta: &TableMeta,
5137 refs: &[String],
5138) -> Result<()> {
5139 let vector_score_column_exists = anchor_columns.iter().any(|col| col.name == "vector_score")
5140 || joined_meta
5141 .columns
5142 .iter()
5143 .any(|col| col.name == "vector_score");
5144 for column in refs {
5145 if column == "vector_score" {
5146 if vector_score_column_exists {
5147 return Err(Error::RankPolicyColumnAmbiguous {
5148 index: index.to_string(),
5149 column: column.clone(),
5150 });
5151 }
5152 continue;
5153 }
5154 let anchor = anchor_columns.iter().find(|col| col.name == *column);
5155 let joined = joined_meta.columns.iter().find(|col| col.name == *column);
5156 if anchor.is_none() && joined.is_none() {
5157 return Err(Error::RankPolicyColumnUnknown {
5158 index: index.to_string(),
5159 column: column.clone(),
5160 });
5161 }
5162 if column == "id" && anchor.is_some() && joined.is_some() {
5163 return Err(Error::RankPolicyColumnAmbiguous {
5164 index: index.to_string(),
5165 column: column.clone(),
5166 });
5167 }
5168 if let Some(anchor) = anchor {
5169 validate_rank_formula_type(
5170 index,
5171 column,
5172 data_type_name(&anchor.data_type),
5173 &map_column_type(&anchor.data_type),
5174 )?;
5175 } else if let Some(joined) = joined {
5176 validate_rank_formula_type(
5177 index,
5178 column,
5179 column_type_name(&joined.column_type),
5180 &joined.column_type,
5181 )?;
5182 }
5183 }
5184 if refs.iter().any(|column| column == anchor_vector_column) {
5185 return Err(Error::RankPolicyColumnType {
5186 index: index.to_string(),
5187 column: anchor_vector_column.to_string(),
5188 expected: "number-or-bool".to_string(),
5189 actual: "VECTOR".to_string(),
5190 });
5191 }
5192 Ok(())
5193}
5194
5195fn validate_rank_formula_type(
5196 index: &str,
5197 column: &str,
5198 actual_name: &str,
5199 column_type: &ColumnType,
5200) -> Result<()> {
5201 if matches!(
5202 column_type,
5203 ColumnType::Real | ColumnType::Integer | ColumnType::Boolean
5204 ) {
5205 return Ok(());
5206 }
5207 Err(Error::RankPolicyColumnType {
5208 index: index.to_string(),
5209 column: column.to_string(),
5210 expected: "number-or-bool".to_string(),
5211 actual: actual_name.to_string(),
5212 })
5213}
5214
5215fn data_type_name(data_type: &DataType) -> &'static str {
5216 match data_type {
5217 DataType::Uuid => "UUID",
5218 DataType::Text => "TEXT",
5219 DataType::Integer => "INTEGER",
5220 DataType::Real => "REAL",
5221 DataType::Boolean => "BOOLEAN",
5222 DataType::Timestamp => "TIMESTAMP",
5223 DataType::Json => "JSON",
5224 DataType::Vector(_) => "VECTOR",
5225 DataType::TxId => "TXID",
5226 }
5227}
5228
5229fn column_type_name(column_type: &ColumnType) -> &'static str {
5230 match column_type {
5231 ColumnType::Uuid => "UUID",
5232 ColumnType::Text => "TEXT",
5233 ColumnType::Integer => "INTEGER",
5234 ColumnType::Real => "REAL",
5235 ColumnType::Boolean => "BOOLEAN",
5236 ColumnType::Timestamp => "TIMESTAMP",
5237 ColumnType::Json => "JSON",
5238 ColumnType::Vector(_) => "VECTOR",
5239 ColumnType::TxId => "TXID",
5240 }
5241}
5242
5243pub(crate) fn rank_policy_drop_table_blocker(db: &Database, table: &str) -> Option<Error> {
5244 for (policy_table, policy_column, policy) in all_rank_policies(db) {
5245 if policy.joined_table == table && policy_table != table {
5246 return Some(Error::DropBlockedByRankPolicy {
5247 table: table.into(),
5248 column: None,
5249 dropped_index: None,
5250 policy_table: policy_table.into_boxed_str(),
5251 policy_column: policy_column.into_boxed_str(),
5252 sort_key: policy.sort_key.into_boxed_str(),
5253 });
5254 }
5255 }
5256 None
5257}
5258
5259pub(crate) fn rank_policy_drop_index_blocker(
5260 db: &Database,
5261 table: &str,
5262 index: &str,
5263) -> Option<Error> {
5264 for (policy_table, policy_column, policy) in all_rank_policies(db) {
5265 if policy.joined_table == table && policy.protected_index == index {
5266 return Some(Error::DropBlockedByRankPolicy {
5267 table: table.into(),
5268 column: None,
5269 dropped_index: Some(index.into()),
5270 policy_table: policy_table.into_boxed_str(),
5271 policy_column: policy_column.into_boxed_str(),
5272 sort_key: policy.sort_key.into_boxed_str(),
5273 });
5274 }
5275 }
5276 None
5277}
5278
5279fn rank_policy_drop_column_blocker(db: &Database, table: &str, column: &str) -> Option<Error> {
5280 let metas = db
5281 .table_names()
5282 .into_iter()
5283 .filter_map(|name| db.table_meta(&name).map(|meta| (name, meta)))
5284 .collect::<HashMap<_, _>>();
5285 for (policy_table, meta) in &metas {
5286 for policy_col in &meta.columns {
5287 let Some(policy) = &policy_col.rank_policy else {
5288 continue;
5289 };
5290 if policy_table == table && policy_col.name == column {
5291 return Some(drop_column_rank_error(
5292 table,
5293 column,
5294 policy_table,
5295 &policy_col.name,
5296 policy,
5297 ));
5298 }
5299 if policy.joined_table == table && policy.joined_column == column {
5300 return Some(drop_column_rank_error(
5301 table,
5302 column,
5303 policy_table,
5304 &policy_col.name,
5305 policy,
5306 ));
5307 }
5308 if policy_table == table && policy.anchor_column == column {
5309 return Some(drop_column_rank_error(
5310 table,
5311 column,
5312 policy_table,
5313 &policy_col.name,
5314 policy,
5315 ));
5316 }
5317 let Ok(formula) = RankFormula::compile_for_index(
5318 &rank_index_name(policy_table, &policy_col.name),
5319 &policy.formula,
5320 ) else {
5321 continue;
5322 };
5323 let joined_meta = metas.get(&policy.joined_table);
5324 for reference in formula.column_refs() {
5325 if reference == "vector_score" {
5326 continue;
5327 }
5328 let anchor_has = meta.columns.iter().any(|col| col.name == *reference);
5329 let joined_has = joined_meta
5330 .is_some_and(|joined| joined.columns.iter().any(|col| col.name == *reference));
5331 if anchor_has && policy_table == table && reference == column {
5332 return Some(drop_column_rank_error(
5333 table,
5334 column,
5335 policy_table,
5336 &policy_col.name,
5337 policy,
5338 ));
5339 }
5340 if !anchor_has && joined_has && policy.joined_table == table && reference == column
5341 {
5342 return Some(drop_column_rank_error(
5343 table,
5344 column,
5345 policy_table,
5346 &policy_col.name,
5347 policy,
5348 ));
5349 }
5350 }
5351 }
5352 }
5353 None
5354}
5355
5356fn drop_column_rank_error(
5357 table: &str,
5358 column: &str,
5359 policy_table: &str,
5360 policy_column: &str,
5361 policy: &contextdb_core::RankPolicy,
5362) -> Error {
5363 Error::DropBlockedByRankPolicy {
5364 table: table.into(),
5365 column: Some(column.into()),
5366 dropped_index: None,
5367 policy_table: policy_table.into(),
5368 policy_column: policy_column.into(),
5369 sort_key: policy.sort_key.clone().into_boxed_str(),
5370 }
5371}
5372
5373fn all_rank_policies(db: &Database) -> Vec<(String, String, contextdb_core::RankPolicy)> {
5374 db.table_names()
5375 .into_iter()
5376 .filter_map(|table| db.table_meta(&table).map(|meta| (table, meta)))
5377 .flat_map(|(table, meta)| {
5378 meta.columns.into_iter().filter_map(move |column| {
5379 column
5380 .rank_policy
5381 .map(|policy| (table.clone(), column.name, policy))
5382 })
5383 })
5384 .collect()
5385}
5386
5387fn map_vector_quantization(
5388 quantization: contextdb_parser::ast::VectorQuantization,
5389) -> contextdb_core::VectorQuantization {
5390 match quantization {
5391 contextdb_parser::ast::VectorQuantization::F32 => contextdb_core::VectorQuantization::F32,
5392 contextdb_parser::ast::VectorQuantization::SQ8 => contextdb_core::VectorQuantization::SQ8,
5393 contextdb_parser::ast::VectorQuantization::SQ4 => contextdb_core::VectorQuantization::SQ4,
5394 }
5395}
5396
5397fn parse_conflict_policy(s: &str) -> Result<ConflictPolicy> {
5398 match s {
5399 "latest_wins" => Ok(ConflictPolicy::LatestWins),
5400 "server_wins" => Ok(ConflictPolicy::ServerWins),
5401 "edge_wins" => Ok(ConflictPolicy::EdgeWins),
5402 "insert_if_not_exists" => Ok(ConflictPolicy::InsertIfNotExists),
5403 _ => Err(Error::Other(format!("unknown conflict policy: {s}"))),
5404 }
5405}
5406
5407fn conflict_policy_to_string(p: ConflictPolicy) -> String {
5408 match p {
5409 ConflictPolicy::LatestWins => "latest_wins".to_string(),
5410 ConflictPolicy::ServerWins => "server_wins".to_string(),
5411 ConflictPolicy::EdgeWins => "edge_wins".to_string(),
5412 ConflictPolicy::InsertIfNotExists => "insert_if_not_exists".to_string(),
5413 }
5414}
5415
5416fn project_graph_frontier_rows(
5417 frontier: Vec<(HashMap<String, uuid::Uuid>, uuid::Uuid, u32)>,
5418 start_alias: &str,
5419 steps: &[GraphStepPlan],
5420) -> Result<Vec<Vec<Value>>> {
5421 frontier
5422 .into_iter()
5423 .map(|(bindings, id, depth)| {
5424 let mut row = Vec::with_capacity(steps.len() + 3);
5425 let start_id = bindings.get(start_alias).ok_or_else(|| {
5426 Error::PlanError(format!(
5427 "graph frontier missing required start alias binding '{start_alias}'"
5428 ))
5429 })?;
5430 row.push(Value::Uuid(*start_id));
5431 for step in steps {
5432 let target_id = bindings.get(&step.target_alias).ok_or_else(|| {
5433 Error::PlanError(format!(
5434 "graph frontier missing required target alias binding '{}'",
5435 step.target_alias
5436 ))
5437 })?;
5438 row.push(Value::Uuid(*target_id));
5439 }
5440 row.push(Value::Uuid(id));
5441 row.push(Value::Int64(depth as i64));
5442 Ok(row)
5443 })
5444 .collect()
5445}
5446
5447fn release_accounted_bytes(db: &Database, bytes: &[usize]) {
5448 for bytes in bytes {
5449 db.accountant().release(*bytes);
5450 }
5451}
5452
5453#[cfg(test)]
5454mod tests {
5455 use super::*;
5456 use contextdb_planner::GraphStepPlan;
5457 use uuid::Uuid;
5458
5459 #[test]
5460 fn graph_01_frontier_projection_requires_complete_bindings() {
5461 let steps = vec![GraphStepPlan {
5462 edge_types: vec!["EDGE".to_string()],
5463 direction: Direction::Outgoing,
5464 min_depth: 1,
5465 max_depth: 1,
5466 target_alias: "b".to_string(),
5467 }];
5468
5469 let missing_start = vec![(HashMap::new(), Uuid::new_v4(), 0)];
5470 let missing_target = vec![(
5471 HashMap::from([("a".to_string(), Uuid::new_v4())]),
5472 Uuid::new_v4(),
5473 0,
5474 )];
5475
5476 let start_result = project_graph_frontier_rows(missing_start, "a", &steps);
5477 assert!(
5478 matches!(start_result, Err(Error::PlanError(_))),
5479 "graph frontier projection should return a plan error on missing start alias binding, got {start_result:?}"
5480 );
5481
5482 let target_result = project_graph_frontier_rows(missing_target, "a", &steps);
5483 assert!(
5484 matches!(target_result, Err(Error::PlanError(_))),
5485 "graph frontier projection should return a plan error on missing target alias binding, got {target_result:?}"
5486 );
5487 }
5488}