1use crate::database::{Database, InsertRowResult, QueryResult};
2use crate::sync_types::ConflictPolicy;
3use contextdb_core::*;
4use contextdb_parser::ast::{
5 AlterAction, BinOp, ColumnRef, Cte, DataType, Expr, Literal, SelectStatement,
6 SetDiskLimitValue, SetMemoryLimitValue, SortDirection, Statement, UnaryOp,
7};
8use contextdb_planner::{
9 DeletePlan, GraphStepPlan, InsertPlan, OnConflictPlan, PhysicalPlan, UpdatePlan, plan,
10};
11use roaring::RoaringTreemap;
12use std::cmp::Ordering;
13use std::collections::{BTreeSet, HashMap, HashSet};
14use std::time::{SystemTime, UNIX_EPOCH};
15use time::OffsetDateTime;
16use time::format_description::well_known::Rfc3339;
17
18pub(crate) fn execute_plan(
19 db: &Database,
20 plan: &PhysicalPlan,
21 params: &HashMap<String, Value>,
22 tx: Option<TxId>,
23) -> Result<QueryResult> {
24 match plan {
25 PhysicalPlan::CreateTable(p) => {
26 db.check_disk_budget("CREATE TABLE")?;
27 let expires_column = expires_column_name(&p.columns)?;
28 let mut auto_indexes: Vec<contextdb_core::IndexDecl> = Vec::new();
32 for c in &p.columns {
33 if c.primary_key
34 && !matches!(
35 map_column_type(&c.data_type),
36 ColumnType::Json | ColumnType::Vector(_)
37 )
38 {
39 auto_indexes.push(contextdb_core::IndexDecl {
40 name: format!("__pk_{}", c.name),
41 columns: vec![(c.name.clone(), contextdb_core::SortDirection::Asc)],
42 kind: contextdb_core::IndexKind::Auto,
43 });
44 }
45 if c.unique
46 && !c.primary_key
47 && !matches!(
48 map_column_type(&c.data_type),
49 ColumnType::Json | ColumnType::Vector(_)
50 )
51 {
52 auto_indexes.push(contextdb_core::IndexDecl {
53 name: format!("__unique_{}", c.name),
54 columns: vec![(c.name.clone(), contextdb_core::SortDirection::Asc)],
55 kind: contextdb_core::IndexKind::Auto,
56 });
57 }
58 }
59 for uc in &p.unique_constraints {
60 let all_indexable = uc.iter().all(|col_name| {
63 p.columns
64 .iter()
65 .find(|c| c.name == *col_name)
66 .map(|c| {
67 !matches!(
68 map_column_type(&c.data_type),
69 ColumnType::Json | ColumnType::Vector(_)
70 )
71 })
72 .unwrap_or(false)
73 });
74 if !all_indexable || uc.is_empty() {
75 continue;
76 }
77 let name = format!("__unique_{}", uc.join("_"));
78 let cols: Vec<(String, contextdb_core::SortDirection)> = uc
79 .iter()
80 .map(|c| (c.clone(), contextdb_core::SortDirection::Asc))
81 .collect();
82 auto_indexes.push(contextdb_core::IndexDecl {
83 name,
84 columns: cols,
85 kind: contextdb_core::IndexKind::Auto,
86 });
87 }
88 let meta = TableMeta {
89 columns: p
90 .columns
91 .iter()
92 .map(|c| contextdb_core::ColumnDef {
93 name: c.name.clone(),
94 column_type: map_column_type(&c.data_type),
95 nullable: c.nullable,
96 primary_key: c.primary_key,
97 unique: c.unique,
98 default: c.default.as_ref().map(stored_default_expr),
99 references: c.references.as_ref().map(|reference| {
100 contextdb_core::ForeignKeyReference {
101 table: reference.table.clone(),
102 column: reference.column.clone(),
103 }
104 }),
105 expires: c.expires,
106 immutable: c.immutable,
107 })
108 .collect(),
109 immutable: p.immutable,
110 state_machine: p.state_machine.as_ref().map(|sm| StateMachineConstraint {
111 column: sm.column.clone(),
112 transitions: sm
113 .transitions
114 .iter()
115 .map(|(from, tos)| (from.clone(), tos.clone()))
116 .collect(),
117 }),
118 dag_edge_types: p.dag_edge_types.clone(),
119 unique_constraints: p.unique_constraints.clone(),
120 natural_key_column: None,
121 propagation_rules: p.propagation_rules.clone(),
122 default_ttl_seconds: p.retain.as_ref().map(|r| r.duration_seconds),
123 sync_safe: p.retain.as_ref().is_some_and(|r| r.sync_safe),
124 expires_column,
125 indexes: auto_indexes.clone(),
132 };
133 let metadata_bytes = meta.estimated_bytes();
134 db.accountant().try_allocate_for(
135 metadata_bytes,
136 "ddl",
137 "create_table",
138 "Reduce schema size or raise MEMORY_LIMIT before creating more tables.",
139 )?;
140 db.relational_store().create_table(&p.name, meta);
141 for idx in &auto_indexes {
142 db.relational_store()
143 .create_index_storage(&p.name, &idx.name, idx.columns.clone());
144 }
145 if let Some(table_meta) = db.table_meta(&p.name) {
146 db.persist_table_meta(&p.name, &table_meta)?;
147 db.allocate_ddl_lsn(|lsn| db.log_create_table_ddl(&p.name, &table_meta, lsn))?;
148 }
149 db.clear_statement_cache();
150 Ok(QueryResult::empty_with_affected(0))
151 }
152 PhysicalPlan::DropTable(name) => {
153 let bytes_to_release = estimate_drop_table_bytes(db, name);
154 db.drop_table_aux_state(name);
155 db.relational_store().drop_table(name);
156 db.remove_persisted_table(name)?;
157 db.allocate_ddl_lsn(|lsn| db.log_drop_table_ddl(name, lsn))?;
158 db.accountant().release(bytes_to_release);
159 db.clear_statement_cache();
160 Ok(QueryResult::empty_with_affected(0))
161 }
162 PhysicalPlan::AlterTable(p) => {
163 db.check_disk_budget("ALTER TABLE")?;
164 let store = db.relational_store();
165 match &p.action {
166 AlterAction::AddColumn(col) => {
167 if col.primary_key {
168 return Err(Error::Other(
169 "adding a primary key column via ALTER TABLE is not supported"
170 .to_string(),
171 ));
172 }
173 validate_expires_column(col)?;
174 if col.immutable
178 && let Some(existing_meta) = db.table_meta(&p.table)
179 {
180 let targets_col =
181 existing_meta
182 .propagation_rules
183 .iter()
184 .any(|rule| {
185 match rule {
186 contextdb_core::table_meta::PropagationRule::ForeignKey {
187 target_state,
188 ..
189 } => *target_state == col.name,
190 contextdb_core::table_meta::PropagationRule::Edge {
191 target_state,
192 ..
193 } => *target_state == col.name,
194 contextdb_core::table_meta::PropagationRule::VectorExclusion {
195 ..
196 } => false,
197 }
198 });
199 if targets_col {
200 return Err(Error::ImmutableColumn {
201 table: p.table.clone(),
202 column: col.name.clone(),
203 });
204 }
205 }
206 let core_col = contextdb_core::ColumnDef {
207 name: col.name.clone(),
208 column_type: map_column_type(&col.data_type),
209 nullable: col.nullable,
210 primary_key: col.primary_key,
211 unique: col.unique,
212 default: col.default.as_ref().map(stored_default_expr),
213 references: col.references.as_ref().map(|reference| {
214 contextdb_core::ForeignKeyReference {
215 table: reference.table.clone(),
216 column: reference.column.clone(),
217 }
218 }),
219 expires: col.expires,
220 immutable: col.immutable,
221 };
222 store
223 .alter_table_add_column(&p.table, core_col)
224 .map_err(Error::Other)?;
225 if col.expires {
226 let mut meta = store.table_meta.write();
227 let table_meta = meta.get_mut(&p.table).ok_or_else(|| {
228 Error::Other(format!("table '{}' not found", p.table))
229 })?;
230 table_meta.expires_column = Some(col.name.clone());
231 }
232 }
233 AlterAction::DropColumn {
234 column: name,
235 cascade,
236 } => {
237 if let Some(existing_meta) = db.table_meta(&p.table)
238 && let Some(col) = existing_meta.columns.iter().find(|c| c.name == *name)
239 && col.immutable
240 {
241 return Err(Error::ImmutableColumn {
242 table: p.table.clone(),
243 column: name.clone(),
244 });
245 }
246 if let Some(existing_meta) = db.table_meta(&p.table)
251 && let Some(col) = existing_meta.columns.iter().find(|c| c.name == *name)
252 && col.primary_key
253 {
254 return Err(Error::Other(format!(
255 "cannot drop primary key column {}.{}",
256 p.table, name
257 )));
258 }
259 let dependent_user_indexes: Vec<String> = db
263 .table_meta(&p.table)
264 .map(|m| {
265 m.indexes
266 .iter()
267 .filter(|i| {
268 i.kind == contextdb_core::IndexKind::UserDeclared
269 && i.columns.iter().any(|(c, _)| c == name)
270 })
271 .map(|i| i.name.clone())
272 .collect()
273 })
274 .unwrap_or_default();
275 let dependent_indexes: Vec<String> = db
276 .table_meta(&p.table)
277 .map(|m| {
278 m.indexes
279 .iter()
280 .filter(|i| i.columns.iter().any(|(c, _)| c == name))
281 .map(|i| i.name.clone())
282 .collect()
283 })
284 .unwrap_or_default();
285 if !*cascade && !dependent_user_indexes.is_empty() {
286 return Err(Error::ColumnInIndex {
287 table: p.table.clone(),
288 column: name.clone(),
289 index: dependent_user_indexes[0].clone(),
290 });
291 }
292 store
293 .alter_table_drop_column(&p.table, name)
294 .map_err(Error::Other)?;
295 if *cascade {
296 {
298 let mut metas = store.table_meta.write();
299 if let Some(m) = metas.get_mut(&p.table) {
300 m.indexes
301 .retain(|i| !i.columns.iter().any(|(c, _)| c == name));
302 }
303 }
304 for idx in &dependent_indexes {
305 store.drop_index_storage(&p.table, idx);
306 db.allocate_ddl_lsn(|lsn| db.log_drop_index_ddl(&p.table, idx, lsn))?;
307 }
308 }
309 let mut meta = store.table_meta.write();
310 if let Some(table_meta) = meta.get_mut(&p.table)
311 && table_meta.expires_column.as_deref() == Some(name.as_str())
312 {
313 table_meta.expires_column = None;
314 }
315 drop(meta);
316 if let Some(table_meta) = db.table_meta(&p.table) {
317 db.persist_table_meta(&p.table, &table_meta)?;
318 db.persist_table_rows(&p.table)?;
319 db.allocate_ddl_lsn(|lsn| {
320 db.log_alter_table_ddl(&p.table, &table_meta, lsn)
321 })?;
322 }
323 db.clear_statement_cache();
324 return Ok(QueryResult {
325 columns: vec![],
326 rows: vec![],
327 rows_affected: 0,
328 trace: crate::database::QueryTrace::scan(),
329 cascade: if *cascade {
330 Some(crate::database::CascadeReport {
331 dropped_indexes: dependent_indexes,
332 })
333 } else {
334 None
335 },
336 });
337 }
338 AlterAction::RenameColumn { from, to } => {
339 if let Some(existing_meta) = db.table_meta(&p.table)
340 && let Some(col) = existing_meta.columns.iter().find(|c| c.name == *from)
341 && col.immutable
342 {
343 return Err(Error::ImmutableColumn {
344 table: p.table.clone(),
345 column: from.clone(),
346 });
347 }
348 store
349 .alter_table_rename_column(&p.table, from, to)
350 .map_err(Error::Other)?;
351 let mut meta = store.table_meta.write();
352 if let Some(table_meta) = meta.get_mut(&p.table)
353 && table_meta.expires_column.as_deref() == Some(from.as_str())
354 {
355 table_meta.expires_column = Some(to.clone());
356 }
357 }
358 AlterAction::SetRetain {
359 duration_seconds,
360 sync_safe,
361 } => {
362 let mut meta = store.table_meta.write();
363 let table_meta = meta
364 .get_mut(&p.table)
365 .ok_or_else(|| Error::Other(format!("table '{}' not found", p.table)))?;
366 if table_meta.immutable {
367 return Err(Error::Other(
368 "IMMUTABLE and RETAIN are mutually exclusive".to_string(),
369 ));
370 }
371 table_meta.default_ttl_seconds = Some(*duration_seconds);
372 table_meta.sync_safe = *sync_safe;
373 }
374 AlterAction::DropRetain => {
375 let mut meta = store.table_meta.write();
376 let table_meta = meta
377 .get_mut(&p.table)
378 .ok_or_else(|| Error::Other(format!("table '{}' not found", p.table)))?;
379 table_meta.default_ttl_seconds = None;
380 table_meta.sync_safe = false;
381 }
382 AlterAction::SetSyncConflictPolicy(policy) => {
383 let cp = parse_conflict_policy(policy)?;
384 db.set_table_conflict_policy(&p.table, cp);
385 }
386 AlterAction::DropSyncConflictPolicy => {
387 db.drop_table_conflict_policy(&p.table);
388 }
389 }
390 if let Some(table_meta) = db.table_meta(&p.table) {
391 db.persist_table_meta(&p.table, &table_meta)?;
392 if !matches!(
393 p.action,
394 AlterAction::AddColumn(_)
395 | AlterAction::SetRetain { .. }
396 | AlterAction::DropRetain
397 | AlterAction::SetSyncConflictPolicy(_)
398 | AlterAction::DropSyncConflictPolicy
399 ) {
400 db.persist_table_rows(&p.table)?;
401 }
402 db.allocate_ddl_lsn(|lsn| db.log_alter_table_ddl(&p.table, &table_meta, lsn))?;
403 }
404 db.clear_statement_cache();
405 Ok(QueryResult::empty_with_affected(0))
406 }
407 PhysicalPlan::Insert(p) => exec_insert(db, p, params, tx),
408 PhysicalPlan::Delete(p) => exec_delete(db, p, params, tx),
409 PhysicalPlan::Update(p) => exec_update(db, p, params, tx),
410 PhysicalPlan::Scan { table, filter, .. } => {
411 if table == "dual" {
412 return Ok(QueryResult {
413 columns: vec![],
414 rows: vec![vec![]],
415 rows_affected: 0,
416 trace: crate::database::QueryTrace::scan(),
417 cascade: None,
418 });
419 }
420 let snapshot = db.snapshot_for_read();
421 let schema_columns = db.table_meta(table).map(|meta| {
422 meta.columns
423 .into_iter()
424 .map(|column| column.name)
425 .collect::<Vec<_>>()
426 });
427 let resolved_filter = filter
428 .as_ref()
429 .map(|expr| resolve_in_subqueries(db, expr, params, tx))
430 .transpose()?;
431
432 let meta_for_indexes = db.table_meta(table);
436 let indexes: Vec<contextdb_core::IndexDecl> = meta_for_indexes
437 .as_ref()
438 .map(|m| m.indexes.clone())
439 .unwrap_or_default();
440 let analysis = filter
441 .as_ref()
442 .filter(|_| !indexes.is_empty())
443 .map(|f| analyze_filter_for_index(f, &indexes, params));
444
445 if let Some(a) = analysis {
446 if let Some(pick) = a.pick {
447 let (rows, examined) = execute_index_scan(db, table, &pick, snapshot, tx)?;
449 db.__bump_rows_examined(examined);
450 let mut result = materialize_rows(
451 rows,
452 resolved_filter.as_ref(),
453 params,
454 schema_columns.as_deref(),
455 )?;
456 let mut pushed: smallvec::SmallVec<[std::borrow::Cow<'static, str>; 4]> =
457 smallvec::SmallVec::new();
458 pushed.push(std::borrow::Cow::Owned(pick.pushed_column.clone()));
459 let considered: smallvec::SmallVec<[crate::database::IndexCandidate; 4]> = a
460 .considered
461 .iter()
462 .filter(|c| c.name != pick.name)
463 .cloned()
464 .collect();
465 result.trace = crate::database::QueryTrace {
466 physical_plan: "IndexScan",
467 index_used: Some(pick.name.clone()),
468 predicates_pushed: pushed,
469 indexes_considered: considered,
470 sort_elided: false,
471 };
472 return Ok(result);
473 } else {
474 let rows = db.scan(table, snapshot)?;
476 db.__bump_rows_examined(rows.len() as u64);
477 let mut result = materialize_rows(
478 rows,
479 resolved_filter.as_ref(),
480 params,
481 schema_columns.as_deref(),
482 )?;
483 let considered: smallvec::SmallVec<[crate::database::IndexCandidate; 4]> =
484 a.considered.into_iter().collect();
485 result.trace = crate::database::QueryTrace {
486 physical_plan: "Scan",
487 index_used: None,
488 predicates_pushed: Default::default(),
489 indexes_considered: considered,
490 sort_elided: false,
491 };
492 return Ok(result);
493 }
494 }
495
496 let rows = db.scan(table, snapshot)?;
497 db.__bump_rows_examined(rows.len() as u64);
498 let mut result = materialize_rows(
499 rows,
500 resolved_filter.as_ref(),
501 params,
502 schema_columns.as_deref(),
503 )?;
504 result.trace = crate::database::QueryTrace::scan();
505 Ok(result)
506 }
507 PhysicalPlan::GraphBfs {
508 start_alias,
509 start_expr,
510 start_candidates,
511 steps,
512 filter,
513 } => {
514 let start_uuids = match resolve_uuid(start_expr, params) {
515 Ok(start) => vec![start],
516 Err(Error::PlanError(_))
517 if matches!(
518 start_expr,
519 Expr::Column(contextdb_parser::ast::ColumnRef { table: None, .. })
520 ) =>
521 {
522 if let Some(candidate_plan) = start_candidates {
524 resolve_graph_start_nodes_from_plan(db, candidate_plan, params, tx)?
525 } else if let Some(filter_expr) = filter {
526 let resolved_filter = resolve_in_subqueries(db, filter_expr, params, tx)?;
527 resolve_graph_start_nodes_from_filter(db, &resolved_filter, params)?
528 } else {
529 vec![]
530 }
531 }
532 Err(err) => return Err(err),
533 };
534 if start_uuids.is_empty() {
535 return Ok(QueryResult {
536 columns: vec!["id".to_string(), "depth".to_string()],
537 rows: vec![],
538 rows_affected: 0,
539 trace: crate::database::QueryTrace::scan(),
540 cascade: None,
541 });
542 }
543 let snapshot = db.snapshot();
544 let mut frontier = start_uuids
545 .into_iter()
546 .map(|id| (HashMap::from([(start_alias.clone(), id)]), id, 0_u32))
547 .collect::<Vec<_>>();
548 let bfs_bytes = estimate_bfs_working_bytes(&frontier, steps);
549 db.accountant().try_allocate_for(
550 bfs_bytes,
551 "bfs_frontier",
552 "graph_bfs",
553 "Reduce traversal depth/fan-out or raise MEMORY_LIMIT before running BFS.",
554 )?;
555
556 let result = (|| {
557 for step in steps {
558 let edge_types_ref = if step.edge_types.is_empty() {
559 None
560 } else {
561 Some(step.edge_types.as_slice())
562 };
563 let mut next = Vec::new();
564
565 for (bindings, start, base_depth) in &frontier {
566 let res = db.graph().bfs(
567 *start,
568 edge_types_ref,
569 step.direction,
570 step.min_depth,
571 step.max_depth,
572 snapshot,
573 )?;
574 for node in res.nodes {
575 let total_depth = base_depth.saturating_add(node.depth);
576 let mut next_bindings = bindings.clone();
577 next_bindings.insert(step.target_alias.clone(), node.id);
578 next.push((next_bindings, node.id, total_depth));
579 }
580 }
581
582 frontier = dedupe_graph_frontier(next, steps);
583 if frontier.is_empty() {
584 break;
585 }
586 }
587
588 let mut columns =
589 steps
590 .iter()
591 .fold(vec![format!("{start_alias}.id")], |mut cols, step| {
592 cols.push(format!("{}.id", step.target_alias));
593 cols
594 });
595 columns.push("id".to_string());
596 columns.push("depth".to_string());
597
598 Ok(QueryResult {
599 columns,
600 rows: project_graph_frontier_rows(frontier, start_alias, steps)?,
601 rows_affected: 0,
602 trace: crate::database::QueryTrace::scan(),
603 cascade: None,
604 })
605 })();
606 db.accountant().release(bfs_bytes);
607
608 result
609 }
610 PhysicalPlan::VectorSearch {
611 table,
612 query_expr,
613 k,
614 candidates,
615 ..
616 }
617 | PhysicalPlan::HnswSearch {
618 table,
619 query_expr,
620 k,
621 candidates,
622 ..
623 } => {
624 let query_vec = resolve_vector_from_expr(query_expr, params)?;
625 let snapshot = db.snapshot();
626 let all_rows = db.scan(table, snapshot)?;
627 let candidate_bitmap = if let Some(cands_plan) = candidates {
628 let qr = execute_plan(db, cands_plan, params, tx)?;
629 let mut bm = RoaringTreemap::new();
630 let row_id_idx = qr.columns.iter().position(|column| {
631 column == "row_id" || column.rsplit('.').next() == Some("row_id")
632 });
633 let id_idx = qr
634 .columns
635 .iter()
636 .position(|column| column == "id" || column.rsplit('.').next() == Some("id"));
637
638 if let Some(idx) = row_id_idx {
639 for row in qr.rows {
640 if let Some(Value::Int64(id)) = row.get(idx) {
641 bm.insert(*id as u64);
642 }
643 }
644 } else if let Some(idx) = id_idx {
645 let uuid_to_row_id: HashMap<uuid::Uuid, RowId> = all_rows
646 .iter()
647 .filter_map(|row| match row.values.get("id") {
648 Some(Value::Uuid(uuid)) => Some((*uuid, row.row_id)),
649 _ => None,
650 })
651 .collect();
652 for row in qr.rows {
653 if let Some(Value::Uuid(uuid)) = row.get(idx)
654 && let Some(row_id) = uuid_to_row_id.get(uuid)
655 {
656 bm.insert(row_id.0);
657 }
658 }
659 }
660 Some(bm)
661 } else {
662 None
663 };
664
665 let vector_bytes = estimate_vector_search_bytes(query_vec.len(), *k as usize);
666 db.accountant().try_allocate_for(
667 vector_bytes,
668 "vector_search",
669 "search",
670 "Reduce LIMIT/dimensionality or raise MEMORY_LIMIT before vector search.",
671 )?;
672 let res = db.query_vector(
673 &query_vec,
674 *k as usize,
675 candidate_bitmap.as_ref(),
676 db.snapshot(),
677 );
678 db.accountant().release(vector_bytes);
679 let res = res?;
680
681 let schema_columns = db.table_meta(table).map(|meta| {
683 meta.columns
684 .into_iter()
685 .map(|column| column.name)
686 .collect::<Vec<_>>()
687 });
688 let keys = if let Some(ref sc) = schema_columns {
689 sc.clone()
690 } else {
691 let mut ks = BTreeSet::new();
692 for r in &all_rows {
693 for k in r.values.keys() {
694 ks.insert(k.clone());
695 }
696 }
697 ks.into_iter().collect::<Vec<_>>()
698 };
699
700 let row_map: HashMap<RowId, &VersionedRow> =
701 all_rows.iter().map(|r| (r.row_id, r)).collect();
702
703 let mut columns = vec!["row_id".to_string()];
704 columns.extend(keys.iter().cloned());
705 columns.push("score".to_string());
706
707 let rows = res
708 .into_iter()
709 .filter_map(|(rid, score)| {
710 row_map.get(&rid).map(|row| {
711 let mut out = vec![Value::Int64(rid.0 as i64)];
712 for k in &keys {
713 out.push(row.values.get(k).cloned().unwrap_or(Value::Null));
714 }
715 out.push(Value::Float64(score as f64));
716 out
717 })
718 })
719 .collect();
720
721 Ok(QueryResult {
722 columns,
723 rows,
724 rows_affected: 0,
725 trace: crate::database::QueryTrace::scan(),
726 cascade: None,
727 })
728 }
729 PhysicalPlan::MaterializeCte { input, .. } => execute_plan(db, input, params, tx),
730 PhysicalPlan::Project { input, columns } => {
731 let input_result = execute_plan(db, input, params, tx)?;
732 let has_aggregate = columns.iter().any(|column| {
733 matches!(
734 &column.expr,
735 Expr::FunctionCall { name, .. } if name.eq_ignore_ascii_case("count")
736 )
737 });
738 if has_aggregate {
739 if columns.iter().any(|column| {
740 !matches!(
741 &column.expr,
742 Expr::FunctionCall { name, .. } if name.eq_ignore_ascii_case("count")
743 )
744 }) {
745 return Err(Error::PlanError(
746 "mixed aggregate and non-aggregate columns without GROUP BY".to_string(),
747 ));
748 }
749
750 let output_columns = columns
751 .iter()
752 .map(|column| {
753 column.alias.clone().unwrap_or_else(|| match &column.expr {
754 Expr::FunctionCall { name, .. } => name.clone(),
755 _ => "expr".to_string(),
756 })
757 })
758 .collect::<Vec<_>>();
759
760 let aggregate_row = columns
761 .iter()
762 .map(|column| match &column.expr {
763 Expr::FunctionCall { name: _, args } => {
764 let count = if matches!(
765 args.as_slice(),
766 [Expr::Column(contextdb_parser::ast::ColumnRef { table: None, column })]
767 if column == "*"
768 ) {
769 input_result.rows.len() as i64
770 } else {
771 input_result
772 .rows
773 .iter()
774 .filter_map(|row| {
775 args.first().map(|arg| {
776 eval_query_result_expr(
777 arg,
778 row,
779 &input_result.columns,
780 params,
781 )
782 })
783 })
784 .collect::<Result<Vec<_>>>()?
785 .into_iter()
786 .filter(|value| *value != Value::Null)
787 .count() as i64
788 };
789 Ok(Value::Int64(count))
790 }
791 _ => Err(Error::PlanError(
792 "mixed aggregate and non-aggregate columns without GROUP BY"
793 .to_string(),
794 )),
795 })
796 .collect::<Result<Vec<_>>>()?;
797
798 return Ok(QueryResult {
799 columns: output_columns,
800 rows: vec![aggregate_row],
801 rows_affected: 0,
802 trace: input_result.trace.clone(),
803 cascade: None,
804 });
805 }
806
807 let output_columns = columns
808 .iter()
809 .map(|c| {
810 c.alias.clone().unwrap_or_else(|| match &c.expr {
811 Expr::Column(col) => col.column.clone(),
812 _ => "expr".to_string(),
813 })
814 })
815 .collect::<Vec<_>>();
816
817 let mut output_rows = Vec::with_capacity(input_result.rows.len());
818 for row in &input_result.rows {
819 let mut projected = Vec::with_capacity(columns.len());
820 for col in columns {
821 projected.push(eval_project_expr(
822 &col.expr,
823 row,
824 &input_result.columns,
825 params,
826 )?);
827 }
828 output_rows.push(projected);
829 }
830
831 Ok(QueryResult {
832 columns: output_columns,
833 rows: output_rows,
834 rows_affected: 0,
835 trace: input_result.trace.clone(),
836 cascade: None,
837 })
838 }
839 PhysicalPlan::Sort { input, keys } => {
840 let elided = try_elide_sort(db, input, keys, params, tx)?;
844 if let Some(mut result) = elided {
845 result.trace.sort_elided = true;
846 return Ok(result);
847 }
848 let mut input_result = execute_plan(db, input, params, tx)?;
849
850 if input_result.trace.physical_plan == "IndexScan"
855 && let Some(idx_name) = &input_result.trace.index_used
856 && sort_keys_match_index_prefix(db, input, idx_name, keys)
857 {
858 input_result.trace.sort_elided = true;
859 return Ok(input_result);
860 }
861 input_result.rows.sort_by(|left, right| {
862 for key in keys {
863 let Expr::Column(column_ref) = &key.expr else {
864 return Ordering::Equal;
865 };
866 let left_value =
867 match lookup_query_result_column(left, &input_result.columns, column_ref) {
868 Ok(value) => value,
869 Err(_) => return Ordering::Equal,
870 };
871 let right_value = match lookup_query_result_column(
872 right,
873 &input_result.columns,
874 column_ref,
875 ) {
876 Ok(value) => value,
877 Err(_) => return Ordering::Equal,
878 };
879 let ordering = compare_sort_values(&left_value, &right_value, key.direction);
880 if ordering != Ordering::Equal {
881 return ordering;
882 }
883 }
884 Ordering::Equal
885 });
886 if input_result.trace.physical_plan != "IndexScan" {
892 input_result.trace.physical_plan = "Sort";
893 }
894 input_result.trace.sort_elided = false;
895 Ok(input_result)
896 }
897 PhysicalPlan::Limit { input, count } => {
898 let mut input_result = execute_plan(db, input, params, tx)?;
899 input_result.rows.truncate(*count as usize);
900 Ok(input_result)
901 }
902 PhysicalPlan::Filter { input, predicate } => {
903 let mut input_result = execute_plan(db, input, params, tx)?;
904 input_result.rows.retain(|row| {
905 query_result_row_matches(row, &input_result.columns, predicate, params)
906 .unwrap_or(false)
907 });
908 Ok(input_result)
909 }
910 PhysicalPlan::Distinct { input } => {
911 let input_result = execute_plan(db, input, params, tx)?;
912 let mut seen = HashSet::<Vec<u8>>::new();
913 let rows = input_result
914 .rows
915 .into_iter()
916 .filter(|row| seen.insert(distinct_row_key(row)))
917 .collect();
918 Ok(QueryResult {
919 columns: input_result.columns,
920 rows,
921 rows_affected: input_result.rows_affected,
922 trace: input_result.trace,
923 cascade: None,
924 })
925 }
926 PhysicalPlan::Join {
927 left,
928 right,
929 condition,
930 join_type,
931 left_alias,
932 right_alias,
933 } => {
934 let left_result = execute_plan(db, left, params, tx)?;
935 let right_result = execute_plan(db, right, params, tx)?;
936 let right_duplicate_names =
937 duplicate_column_names(&left_result.columns, &right_result.columns);
938 let right_prefix = right_alias
939 .clone()
940 .unwrap_or_else(|| right_table_name(right));
941 let right_columns = right_result
942 .columns
943 .iter()
944 .map(|column| {
945 if right_duplicate_names.contains(column) {
946 format!("{right_prefix}.{column}")
947 } else {
948 column.clone()
949 }
950 })
951 .collect::<Vec<_>>();
952
953 let mut columns = left_result.columns.clone();
954 columns.extend(right_columns);
955
956 let mut rows = Vec::new();
957 for left_row in &left_result.rows {
958 let mut matched = false;
959 for right_row in &right_result.rows {
960 let combined = concatenate_rows(left_row, right_row);
961 if query_result_row_matches(&combined, &columns, condition, params)? {
962 matched = true;
963 rows.push(combined);
964 }
965 }
966
967 if !matched && matches!(join_type, contextdb_planner::JoinType::Left) {
968 let mut combined = left_row.clone();
969 combined.extend(std::iter::repeat_n(Value::Null, right_result.columns.len()));
970 rows.push(combined);
971 }
972 }
973
974 let output_columns = qualify_join_columns(
975 &columns,
976 &left_result.columns,
977 &right_result.columns,
978 left_alias,
979 &right_prefix,
980 );
981
982 Ok(QueryResult {
983 columns: output_columns,
984 rows,
985 rows_affected: 0,
986 trace: crate::database::QueryTrace::scan(),
987 cascade: None,
988 })
989 }
990 PhysicalPlan::CreateIndex(p) => exec_create_index(db, p),
991 PhysicalPlan::DropIndex(p) => exec_drop_index(db, p),
992 PhysicalPlan::IndexScan {
993 table,
994 index,
995 range: _,
996 } => {
997 let _ = (table, index);
1001 Ok(QueryResult {
1002 columns: vec![],
1003 rows: vec![],
1004 rows_affected: 0,
1005 trace: crate::database::QueryTrace {
1006 physical_plan: "IndexScan",
1007 index_used: None,
1008 ..crate::database::QueryTrace::default()
1009 },
1010 cascade: None,
1011 })
1012 }
1013 PhysicalPlan::SetMemoryLimit(val) => {
1014 let limit = match val {
1015 SetMemoryLimitValue::Bytes(bytes) => Some(*bytes),
1016 SetMemoryLimitValue::None => None,
1017 };
1018 db.accountant().set_budget(limit)?;
1019 db.persist_memory_limit(limit)?;
1020 Ok(QueryResult::empty())
1021 }
1022 PhysicalPlan::ShowMemoryLimit => {
1023 let usage = db.accountant().usage();
1024 Ok(QueryResult {
1025 columns: vec![
1026 "limit".to_string(),
1027 "used".to_string(),
1028 "available".to_string(),
1029 "startup_ceiling".to_string(),
1030 ],
1031 rows: vec![vec![
1032 usage
1033 .limit
1034 .map(|value| Value::Int64(value as i64))
1035 .unwrap_or_else(|| Value::Text("none".to_string())),
1036 Value::Int64(usage.used as i64),
1037 usage
1038 .available
1039 .map(|value| Value::Int64(value as i64))
1040 .unwrap_or_else(|| Value::Text("none".to_string())),
1041 usage
1042 .startup_ceiling
1043 .map(|value| Value::Int64(value as i64))
1044 .unwrap_or_else(|| Value::Text("none".to_string())),
1045 ]],
1046 rows_affected: 0,
1047 trace: crate::database::QueryTrace::scan(),
1048 cascade: None,
1049 })
1050 }
1051 PhysicalPlan::SetDiskLimit(val) => {
1052 let limit = match val {
1053 SetDiskLimitValue::Bytes(bytes) => Some(*bytes),
1054 SetDiskLimitValue::None => None,
1055 };
1056 db.set_disk_limit(limit)?;
1057 db.persist_disk_limit(limit)?;
1058 Ok(QueryResult::empty())
1059 }
1060 PhysicalPlan::ShowDiskLimit => {
1061 let limit = db.disk_limit();
1062 let used = db.disk_file_size();
1063 let startup_ceiling = db.disk_limit_startup_ceiling();
1064 Ok(QueryResult {
1065 columns: vec![
1066 "limit".to_string(),
1067 "used".to_string(),
1068 "available".to_string(),
1069 "startup_ceiling".to_string(),
1070 ],
1071 rows: vec![vec![
1072 limit
1073 .map(|value| Value::Int64(value as i64))
1074 .unwrap_or_else(|| Value::Text("none".to_string())),
1075 used.map(|value| Value::Int64(value as i64))
1076 .unwrap_or(Value::Null),
1077 match (limit, used) {
1078 (Some(limit), Some(used)) => {
1079 Value::Int64(limit.saturating_sub(used) as i64)
1080 }
1081 _ => Value::Null,
1082 },
1083 startup_ceiling
1084 .map(|value| Value::Int64(value as i64))
1085 .unwrap_or_else(|| Value::Text("none".to_string())),
1086 ]],
1087 rows_affected: 0,
1088 trace: crate::database::QueryTrace::scan(),
1089 cascade: None,
1090 })
1091 }
1092 PhysicalPlan::SetSyncConflictPolicy(policy) => {
1093 let cp = parse_conflict_policy(policy)?;
1094 db.set_default_conflict_policy(cp);
1095 Ok(QueryResult::empty())
1096 }
1097 PhysicalPlan::ShowSyncConflictPolicy => {
1098 let policies = db.conflict_policies();
1099 let default_str = conflict_policy_to_string(policies.default);
1100 let mut rows = vec![vec![Value::Text(default_str)]];
1101 for (table, policy) in &policies.per_table {
1102 rows.push(vec![Value::Text(format!(
1103 "{}={}",
1104 table,
1105 conflict_policy_to_string(*policy)
1106 ))]);
1107 }
1108 Ok(QueryResult {
1109 columns: vec!["policy".to_string()],
1110 rows,
1111 rows_affected: 0,
1112 trace: crate::database::QueryTrace::scan(),
1113 cascade: None,
1114 })
1115 }
1116 PhysicalPlan::Pipeline(plans) => {
1117 let mut last = QueryResult::empty();
1118 for p in plans {
1119 last = execute_plan(db, p, params, tx)?;
1120 }
1121 Ok(last)
1122 }
1123 _ => Err(Error::PlanError(
1124 "unsupported plan node in executor".to_string(),
1125 )),
1126 }
1127}
1128
1129fn eval_project_expr(
1130 expr: &Expr,
1131 row: &[Value],
1132 input_columns: &[String],
1133 params: &HashMap<String, Value>,
1134) -> Result<Value> {
1135 match expr {
1136 Expr::Column(c) => lookup_query_result_column(row, input_columns, c),
1137 Expr::Literal(lit) => resolve_expr(&Expr::Literal(lit.clone()), params),
1138 Expr::Parameter(name) => params
1139 .get(name)
1140 .cloned()
1141 .ok_or_else(|| Error::NotFound(format!("missing parameter: {}", name))),
1142 Expr::BinaryOp { left, op, right } => {
1143 let left = eval_query_result_expr(left, row, input_columns, params)?;
1144 let right = eval_query_result_expr(right, row, input_columns, params)?;
1145 eval_binary_op(op, &left, &right)
1146 }
1147 Expr::UnaryOp { op, operand } => {
1148 let value = eval_query_result_expr(operand, row, input_columns, params)?;
1149 match op {
1150 UnaryOp::Not => Ok(Value::Bool(!value_to_bool(&value))),
1151 UnaryOp::Neg => match value {
1152 Value::Int64(v) => Ok(Value::Int64(-v)),
1153 Value::Float64(v) => Ok(Value::Float64(-v)),
1154 _ => Err(Error::PlanError(
1155 "cannot negate non-numeric value".to_string(),
1156 )),
1157 },
1158 }
1159 }
1160 Expr::FunctionCall { name, args } => {
1161 let values = args
1162 .iter()
1163 .map(|arg| eval_query_result_expr(arg, row, input_columns, params))
1164 .collect::<Result<Vec<_>>>()?;
1165 eval_function(name, &values)
1166 }
1167 Expr::IsNull { expr, negated } => {
1168 let is_null = eval_query_result_expr(expr, row, input_columns, params)? == Value::Null;
1169 Ok(Value::Bool(if *negated { !is_null } else { is_null }))
1170 }
1171 Expr::InList {
1172 expr,
1173 list,
1174 negated,
1175 } => {
1176 let needle = eval_query_result_expr(expr, row, input_columns, params)?;
1177 let matched = list.iter().try_fold(false, |found, item| {
1178 if found {
1179 Ok(true)
1180 } else {
1181 let candidate = eval_query_result_expr(item, row, input_columns, params)?;
1182 Ok(
1183 matches!(compare_values(&needle, &candidate), Some(Ordering::Equal))
1184 || (needle != Value::Null
1185 && candidate != Value::Null
1186 && needle == candidate),
1187 )
1188 }
1189 })?;
1190 Ok(Value::Bool(if *negated { !matched } else { matched }))
1191 }
1192 Expr::Like {
1193 expr,
1194 pattern,
1195 negated,
1196 } => {
1197 let matches = match (
1198 eval_query_result_expr(expr, row, input_columns, params)?,
1199 eval_query_result_expr(pattern, row, input_columns, params)?,
1200 ) {
1201 (Value::Text(value), Value::Text(pattern)) => like_matches(&value, &pattern),
1202 _ => false,
1203 };
1204 Ok(Value::Bool(if *negated { !matches } else { matches }))
1205 }
1206 _ => resolve_expr(expr, params),
1207 }
1208}
1209
1210fn eval_query_result_expr(
1211 expr: &Expr,
1212 row: &[Value],
1213 input_columns: &[String],
1214 params: &HashMap<String, Value>,
1215) -> Result<Value> {
1216 match expr {
1217 Expr::Column(c) => lookup_query_result_column(row, input_columns, c),
1218 Expr::Literal(lit) => resolve_expr(&Expr::Literal(lit.clone()), params),
1219 Expr::Parameter(name) => params
1220 .get(name)
1221 .cloned()
1222 .ok_or_else(|| Error::NotFound(format!("missing parameter: {}", name))),
1223 Expr::FunctionCall { name, args } => {
1224 let values = args
1225 .iter()
1226 .map(|arg| eval_query_result_expr(arg, row, input_columns, params))
1227 .collect::<Result<Vec<_>>>()?;
1228 eval_function(name, &values)
1229 }
1230 _ => resolve_expr(expr, params),
1231 }
1232}
1233
1234fn exec_insert(
1235 db: &Database,
1236 p: &InsertPlan,
1237 params: &HashMap<String, Value>,
1238 tx: Option<TxId>,
1239) -> Result<QueryResult> {
1240 db.check_disk_budget("INSERT")?;
1241 let txid = tx.ok_or_else(|| Error::Other("missing tx for insert".to_string()))?;
1242
1243 let insert_meta = db
1244 .table_meta(&p.table)
1245 .ok_or_else(|| Error::TableNotFound(p.table.clone()))?;
1246 let columns: Vec<String> = if p.columns.is_empty() {
1249 insert_meta.columns.iter().map(|c| c.name.clone()).collect()
1250 } else {
1251 p.columns.clone()
1252 };
1253
1254 let current_tx_max = Some(db.committed_watermark());
1256 let route_inserts_to_graph =
1257 p.table.eq_ignore_ascii_case("edges") || !insert_meta.dag_edge_types.is_empty();
1258 let vector_column = insert_meta.columns.iter().find_map(|column| {
1259 if matches!(column.column_type, contextdb_core::ColumnType::Vector(_)) {
1260 Some(column.name.clone())
1261 } else {
1262 None
1263 }
1264 });
1265 let has_column_defaults = insert_meta
1266 .columns
1267 .iter()
1268 .any(|column| column.default.is_some());
1269
1270 let mut rows_affected = 0;
1271 for row in &p.values {
1272 let mut values = HashMap::new();
1273 for (idx, expr) in row.iter().enumerate() {
1274 let col = columns
1275 .get(idx)
1276 .ok_or_else(|| Error::PlanError("column/value count mismatch".to_string()))?;
1277 let v = resolve_expr(expr, params)?;
1278 values.insert(
1279 col.clone(),
1280 coerce_value_for_column_with_meta(
1281 &p.table,
1282 &insert_meta,
1283 col,
1284 v,
1285 current_tx_max,
1286 Some(txid),
1287 )?,
1288 );
1289 }
1290
1291 if has_column_defaults {
1292 apply_missing_column_defaults(db, &p.table, &mut values, Some(txid))?;
1293 }
1294
1295 if vector_column.is_some() {
1296 validate_vector_columns(db, &p.table, &values)?;
1297 }
1298 let row_bytes = estimate_row_bytes_for_meta(&values, &insert_meta, false);
1299 db.accountant().try_allocate_for(
1300 row_bytes,
1301 "insert",
1302 "row_insert",
1303 "Reduce row size or raise MEMORY_LIMIT before inserting more data.",
1304 )?;
1305 let checkpoint = db.write_set_checkpoint(txid)?;
1306 let graph_edge = if route_inserts_to_graph {
1307 match (
1308 values.get("source_id"),
1309 values.get("target_id"),
1310 values.get("edge_type"),
1311 ) {
1312 (
1313 Some(Value::Uuid(source)),
1314 Some(Value::Uuid(target)),
1315 Some(Value::Text(edge_type)),
1316 ) => Some((*source, *target, edge_type.clone())),
1317 _ => None,
1318 }
1319 } else {
1320 None
1321 };
1322 let vector_value = vector_column
1323 .as_ref()
1324 .and_then(|column| match values.get(column) {
1325 Some(Value::Vector(vector)) => Some(vector.clone()),
1326 _ => None,
1327 });
1328
1329 let row_id = if let Some(on_conflict) = &p.on_conflict {
1330 let conflict_col = &on_conflict.columns[0];
1331 let conflict_value = values
1332 .get(conflict_col)
1333 .ok_or_else(|| Error::Other("conflict column not in values".to_string()))?;
1334 let existing =
1335 db.point_lookup(&p.table, conflict_col, conflict_value, db.snapshot())?;
1336 let existing_row_id = existing.as_ref().map(|row| row.row_id);
1337 let existing_has_vector = existing
1338 .as_ref()
1339 .is_some_and(|row| db.has_live_vector(row.row_id, db.snapshot()));
1340 let upsert_values = if let Some(existing_row) = existing.as_ref() {
1341 match apply_on_conflict_updates(
1342 db,
1343 &p.table,
1344 values.clone(),
1345 existing_row,
1346 on_conflict,
1347 params,
1348 Some(txid),
1349 ) {
1350 Ok(v) => v,
1351 Err(err) => {
1352 db.accountant().release(row_bytes);
1353 let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1354 return Err(err);
1355 }
1356 }
1357 } else {
1358 values.clone()
1359 };
1360
1361 match db.upsert_row(txid, &p.table, conflict_col, upsert_values) {
1362 Ok(UpsertResult::Inserted) => {
1363 db.point_lookup_in_tx(
1364 txid,
1365 &p.table,
1366 conflict_col,
1367 conflict_value,
1368 db.snapshot(),
1369 )?
1370 .ok_or_else(|| {
1371 Error::Other("inserted upsert row not visible in tx".to_string())
1372 })?
1373 .row_id
1374 }
1375 Ok(UpsertResult::Updated) => {
1376 if existing_has_vector && let Some(existing_row_id) = existing_row_id {
1377 db.delete_vector(txid, existing_row_id)?;
1378 }
1379 db.point_lookup_in_tx(
1380 txid,
1381 &p.table,
1382 conflict_col,
1383 conflict_value,
1384 db.snapshot(),
1385 )?
1386 .ok_or_else(|| {
1387 Error::Other("updated upsert row not visible in tx".to_string())
1388 })?
1389 .row_id
1390 }
1391 Ok(UpsertResult::NoOp) => {
1392 db.accountant().release(row_bytes);
1393 RowId(0)
1394 }
1395 Err(err) => {
1396 db.accountant().release(row_bytes);
1397 return Err(err);
1398 }
1399 }
1400 } else {
1401 match db.insert_row_with_unique_noop(txid, &p.table, values) {
1402 Ok(InsertRowResult::Inserted(row_id)) => row_id,
1403 Ok(InsertRowResult::NoOp) => {
1404 db.accountant().release(row_bytes);
1405 continue;
1406 }
1407 Err(err) => {
1408 db.accountant().release(row_bytes);
1409 return Err(err);
1410 }
1411 }
1412 };
1413
1414 if let Some((source, target, edge_type)) = graph_edge {
1415 match db.insert_edge(txid, source, target, edge_type, HashMap::new()) {
1416 Ok(true) => {}
1417 Ok(false) => {
1418 let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1419 db.accountant().release(row_bytes);
1420 continue;
1421 }
1422 Err(err) => {
1423 let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1424 db.accountant().release(row_bytes);
1425 return Err(err);
1426 }
1427 }
1428 }
1429
1430 if let Some(v) = vector_value
1431 && row_id != RowId(0)
1432 && let Err(err) = db.insert_vector(txid, row_id, v)
1433 {
1434 let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1435 db.accountant().release(row_bytes);
1436 return Err(err);
1437 }
1438
1439 rows_affected += 1;
1440 }
1441
1442 Ok(QueryResult::empty_with_affected(rows_affected))
1443}
1444
1445fn exec_delete(
1446 db: &Database,
1447 p: &DeletePlan,
1448 params: &HashMap<String, Value>,
1449 tx: Option<TxId>,
1450) -> Result<QueryResult> {
1451 let txid = tx.ok_or_else(|| Error::Other("missing tx for delete".to_string()))?;
1452 let snapshot = db.snapshot();
1453 let rows = db.scan(&p.table, snapshot)?;
1454 let resolved_where = p
1455 .where_clause
1456 .as_ref()
1457 .map(|expr| resolve_in_subqueries(db, expr, params, tx))
1458 .transpose()?;
1459 let matched: Vec<_> = rows
1460 .into_iter()
1461 .filter(|r| {
1462 resolved_where
1463 .as_ref()
1464 .is_none_or(|w| row_matches(r, w, params).unwrap_or(false))
1465 })
1466 .collect();
1467
1468 for row in &matched {
1469 if db.has_live_vector(row.row_id, snapshot) {
1470 db.delete_vector(txid, row.row_id)?;
1471 }
1472 db.delete_row(txid, &p.table, row.row_id)?;
1473 }
1474
1475 Ok(QueryResult::empty_with_affected(matched.len() as u64))
1476}
1477
1478fn exec_update(
1479 db: &Database,
1480 p: &UpdatePlan,
1481 params: &HashMap<String, Value>,
1482 tx: Option<TxId>,
1483) -> Result<QueryResult> {
1484 db.check_disk_budget("UPDATE")?;
1485 let txid = tx.ok_or_else(|| Error::Other("missing tx for update".to_string()))?;
1486 let snapshot = db.snapshot();
1487 let rows = db.scan_in_tx(txid, &p.table, snapshot)?;
1490 let resolved_where = p
1491 .where_clause
1492 .as_ref()
1493 .map(|expr| resolve_in_subqueries(db, expr, params, tx))
1494 .transpose()?;
1495 let matched: Vec<_> = rows
1496 .into_iter()
1497 .filter(|r| {
1498 resolved_where
1499 .as_ref()
1500 .is_none_or(|w| row_matches(r, w, params).unwrap_or(false))
1501 })
1502 .collect();
1503
1504 let current_tx_max = Some(db.committed_watermark());
1505
1506 for row in &matched {
1507 let mut values = row.values.clone();
1508 for (k, vexpr) in &p.assignments {
1509 let value = eval_assignment_expr(vexpr, &row.values, params)?;
1510 values.insert(
1511 k.clone(),
1512 coerce_value_for_column(db, &p.table, k, value, current_tx_max, Some(txid))?,
1513 );
1514 }
1515 validate_update_state_transition(db, &p.table, row, &values)?;
1516 let row_uuid = values.get("id").and_then(Value::as_uuid).copied();
1517 let new_state = db
1518 .table_meta(&p.table)
1519 .as_ref()
1520 .and_then(|meta| meta.state_machine.as_ref())
1521 .and_then(|sm| values.get(&sm.column))
1522 .and_then(Value::as_text)
1523 .map(std::borrow::ToOwned::to_owned);
1524
1525 let old_has_vector = db.has_live_vector(row.row_id, snapshot);
1526 validate_vector_columns(db, &p.table, &values)?;
1527 let new_vector = vector_value_for_table(db, &p.table, &values).cloned();
1528 let new_row_bytes = estimate_table_row_bytes(db, &p.table, &values)?;
1529 db.accountant().try_allocate_for(
1530 new_row_bytes,
1531 "update",
1532 "row_replace",
1533 "Reduce row growth or raise MEMORY_LIMIT before updating this row.",
1534 )?;
1535 let checkpoint = db.write_set_checkpoint(txid)?;
1536
1537 if let Err(err) = db.delete_row(txid, &p.table, row.row_id) {
1538 db.accountant().release(new_row_bytes);
1539 return Err(err);
1540 }
1541 if old_has_vector && let Err(err) = db.delete_vector(txid, row.row_id) {
1542 db.accountant().release(new_row_bytes);
1543 let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1544 return Err(err);
1545 }
1546
1547 let new_row_id = match db.insert_row_replacing(txid, &p.table, values, row.row_id) {
1548 Ok(row_id) => row_id,
1549 Err(err) => {
1550 db.accountant().release(new_row_bytes);
1551 let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1552 return Err(err);
1553 }
1554 };
1555 if let Some(vector) = new_vector
1556 && let Err(err) = db.insert_vector(txid, new_row_id, vector)
1557 {
1558 db.accountant().release(new_row_bytes);
1559 let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1560 return Err(err);
1561 }
1562 if let Err(err) =
1563 db.propagate_state_change_if_needed(txid, &p.table, row_uuid, new_state.as_deref())
1564 {
1565 db.accountant().release(new_row_bytes);
1566 let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1567 return Err(err);
1568 }
1569 }
1570
1571 Ok(QueryResult::empty_with_affected(matched.len() as u64))
1572}
1573
1574fn exec_create_index(
1575 db: &Database,
1576 plan: &contextdb_planner::CreateIndexPlan,
1577) -> Result<QueryResult> {
1578 for prefix in ["__pk_", "__unique_"] {
1581 if plan.name.starts_with(prefix) {
1582 return Err(Error::ReservedIndexName {
1583 table: plan.table.clone(),
1584 name: plan.name.clone(),
1585 prefix: prefix.to_string(),
1586 });
1587 }
1588 }
1589
1590 let meta = db
1594 .table_meta(&plan.table)
1595 .ok_or_else(|| Error::TableNotFound(plan.table.clone()))?;
1596
1597 for (col_name, _) in &plan.columns {
1599 if !meta.columns.iter().any(|c| c.name == *col_name) {
1600 return Err(Error::ColumnNotFound {
1601 table: plan.table.clone(),
1602 column: col_name.clone(),
1603 });
1604 }
1605 }
1606
1607 for (col_name, _) in &plan.columns {
1609 let col = meta
1610 .columns
1611 .iter()
1612 .find(|c| c.name == *col_name)
1613 .expect("column existence verified above");
1614 if matches!(col.column_type, ColumnType::Json | ColumnType::Vector(_)) {
1615 return Err(Error::ColumnNotIndexable {
1616 table: plan.table.clone(),
1617 column: col_name.clone(),
1618 column_type: col.column_type.clone(),
1619 });
1620 }
1621 }
1622
1623 if meta.indexes.iter().any(|i| i.name == plan.name) {
1625 return Err(Error::DuplicateIndex {
1626 table: plan.table.clone(),
1627 index: plan.name.clone(),
1628 });
1629 }
1630
1631 {
1634 let store = db.relational_store();
1635 let mut metas = store.table_meta.write();
1636 let m = metas
1637 .get_mut(&plan.table)
1638 .ok_or_else(|| Error::TableNotFound(plan.table.clone()))?;
1639 m.indexes.push(contextdb_core::IndexDecl {
1640 name: plan.name.clone(),
1641 columns: plan.columns.clone(),
1642 kind: contextdb_core::IndexKind::UserDeclared,
1643 });
1644 }
1645 db.relational_store()
1646 .create_index_storage(&plan.table, &plan.name, plan.columns.clone());
1647 db.relational_store().rebuild_index(&plan.table, &plan.name);
1648
1649 if let Some(table_meta) = db.table_meta(&plan.table) {
1650 db.persist_table_meta(&plan.table, &table_meta)?;
1651 }
1652
1653 db.allocate_ddl_lsn(|lsn| {
1654 db.log_create_index_ddl(&plan.table, &plan.name, &plan.columns, lsn)
1655 })?;
1656
1657 db.clear_statement_cache();
1658 Ok(QueryResult::empty_with_affected(0))
1659}
1660
1661fn exec_drop_index(db: &Database, plan: &contextdb_planner::DropIndexPlan) -> Result<QueryResult> {
1662 let meta = db
1663 .table_meta(&plan.table)
1664 .ok_or_else(|| Error::TableNotFound(plan.table.clone()))?;
1665 let exists = meta.indexes.iter().any(|i| i.name == plan.name);
1666 if !exists {
1667 if plan.if_exists {
1668 return Ok(QueryResult::empty_with_affected(0));
1669 }
1670 return Err(Error::IndexNotFound {
1671 table: plan.table.clone(),
1672 index: plan.name.clone(),
1673 });
1674 }
1675 {
1676 let store = db.relational_store();
1677 let mut metas = store.table_meta.write();
1678 if let Some(m) = metas.get_mut(&plan.table) {
1679 m.indexes.retain(|i| i.name != plan.name);
1680 }
1681 }
1682 db.relational_store()
1683 .drop_index_storage(&plan.table, &plan.name);
1684 if let Some(table_meta) = db.table_meta(&plan.table) {
1685 db.persist_table_meta(&plan.table, &table_meta)?;
1686 }
1687 db.allocate_ddl_lsn(|lsn| db.log_drop_index_ddl(&plan.table, &plan.name, lsn))?;
1688 db.clear_statement_cache();
1689 Ok(QueryResult::empty_with_affected(0))
1690}
1691
1692fn estimate_table_row_bytes(
1693 db: &Database,
1694 table: &str,
1695 values: &HashMap<String, Value>,
1696) -> Result<usize> {
1697 let meta = db
1698 .table_meta(table)
1699 .ok_or_else(|| Error::TableNotFound(table.to_string()))?;
1700 Ok(estimate_row_bytes_for_meta(values, &meta, false))
1701}
1702
1703#[derive(Debug, Clone)]
1709pub(crate) enum IndexPredicateShape {
1710 Equality(Value),
1711 NotEqual(Value),
1712 Range {
1713 lower: std::ops::Bound<Value>,
1714 upper: std::ops::Bound<Value>,
1715 },
1716 InList(Vec<Value>),
1717 IsNull,
1718 IsNotNull,
1719}
1720
1721impl IndexPredicateShape {
1722 fn selectivity_tier(&self) -> u8 {
1724 match self {
1725 IndexPredicateShape::Equality(_) | IndexPredicateShape::InList(_) => 0,
1726 IndexPredicateShape::Range { .. } | IndexPredicateShape::NotEqual(_) => 1,
1727 IndexPredicateShape::IsNull | IndexPredicateShape::IsNotNull => 2,
1728 }
1729 }
1730}
1731
1732#[derive(Debug, Clone)]
1733struct IndexPick {
1734 name: String,
1735 columns: Vec<(String, contextdb_core::SortDirection)>,
1736 shape: IndexPredicateShape,
1738 pushed_column: String,
1740}
1741
1742struct IndexAnalysis {
1745 pick: Option<IndexPick>,
1746 considered: Vec<crate::database::IndexCandidate>,
1747}
1748
1749fn coerce_pick_shape_to_column_type(
1756 db: &Database,
1757 table: &str,
1758 pick: &IndexPick,
1759) -> Result<IndexPick> {
1760 use std::ops::Bound;
1761 let col = &pick.pushed_column;
1762 let coerce = |v: Value| coerce_value_for_column(db, table, col, v, None, None);
1763 let new_shape = match &pick.shape {
1764 IndexPredicateShape::Equality(v) => IndexPredicateShape::Equality(coerce(v.clone())?),
1765 IndexPredicateShape::InList(vs) => IndexPredicateShape::InList(
1766 vs.iter()
1767 .cloned()
1768 .map(&coerce)
1769 .collect::<Result<Vec<_>>>()?,
1770 ),
1771 IndexPredicateShape::Range { lower, upper } => {
1772 let lower = match lower {
1773 Bound::Included(v) => Bound::Included(coerce(v.clone())?),
1774 Bound::Excluded(v) => Bound::Excluded(coerce(v.clone())?),
1775 Bound::Unbounded => Bound::Unbounded,
1776 };
1777 let upper = match upper {
1778 Bound::Included(v) => Bound::Included(coerce(v.clone())?),
1779 Bound::Excluded(v) => Bound::Excluded(coerce(v.clone())?),
1780 Bound::Unbounded => Bound::Unbounded,
1781 };
1782 IndexPredicateShape::Range { lower, upper }
1783 }
1784 IndexPredicateShape::NotEqual(v) => IndexPredicateShape::NotEqual(coerce(v.clone())?),
1785 IndexPredicateShape::IsNull => IndexPredicateShape::IsNull,
1786 IndexPredicateShape::IsNotNull => IndexPredicateShape::IsNotNull,
1787 };
1788 Ok(IndexPick {
1789 name: pick.name.clone(),
1790 columns: pick.columns.clone(),
1791 shape: new_shape,
1792 pushed_column: pick.pushed_column.clone(),
1793 })
1794}
1795
1796fn analyze_filter_for_index(
1799 filter: &Expr,
1800 indexes: &[contextdb_core::IndexDecl],
1801 params: &HashMap<String, Value>,
1802) -> IndexAnalysis {
1803 use std::borrow::Cow;
1804 let mut considered: Vec<crate::database::IndexCandidate> = Vec::new();
1805
1806 let conjuncts = split_conjuncts(filter);
1808 let mut conjunct_shapes: Vec<(String, IndexPredicateShape)> = Vec::new();
1809 for conjunct in &conjuncts {
1810 if let Some((col, shape)) = classify_index_predicate(conjunct, params) {
1811 conjunct_shapes.push((col, shape));
1812 }
1813 }
1814
1815 let mut candidates: Vec<(IndexPick, u8, usize)> = Vec::new();
1817 for (i_idx, decl) in indexes.iter().enumerate() {
1818 let first_col = match decl.columns.first() {
1819 Some((c, _)) => c.clone(),
1820 None => continue,
1821 };
1822 let matching: Vec<&(String, IndexPredicateShape)> = conjunct_shapes
1824 .iter()
1825 .filter(|(c, _)| c == &first_col)
1826 .collect();
1827 if matching.is_empty() {
1828 let reason = classify_rejection_reason(filter, &first_col);
1832 considered.push(crate::database::IndexCandidate {
1833 name: decl.name.clone(),
1834 rejected_reason: Cow::Borrowed(reason),
1835 });
1836 continue;
1837 }
1838 let shape = combine_shapes(matching.iter().map(|(_, s)| s.clone()).collect());
1840 let tier = shape.selectivity_tier();
1841 candidates.push((
1842 IndexPick {
1843 name: decl.name.clone(),
1844 columns: decl.columns.clone(),
1845 shape,
1846 pushed_column: first_col.clone(),
1847 },
1848 tier,
1849 i_idx,
1850 ));
1851 }
1852
1853 let pick = candidates
1855 .into_iter()
1856 .min_by(|a, b| a.1.cmp(&b.1).then(a.2.cmp(&b.2)))
1857 .map(|(p, _, _)| p);
1858
1859 IndexAnalysis { pick, considered }
1860}
1861
1862fn combine_shapes(mut shapes: Vec<IndexPredicateShape>) -> IndexPredicateShape {
1865 shapes.sort_by_key(|s| s.selectivity_tier());
1867 let head = shapes.remove(0);
1868 if let IndexPredicateShape::Range {
1870 mut lower,
1871 mut upper,
1872 } = head.clone()
1873 {
1874 for s in shapes {
1875 if let IndexPredicateShape::Range { lower: l, upper: u } = s {
1876 lower = tighter_lower(&lower, &l);
1878 upper = tighter_upper(&upper, &u);
1879 }
1880 }
1881 return IndexPredicateShape::Range { lower, upper };
1882 }
1883 head
1884}
1885
1886fn tighter_lower(a: &std::ops::Bound<Value>, b: &std::ops::Bound<Value>) -> std::ops::Bound<Value> {
1887 use std::ops::Bound;
1888 match (a, b) {
1889 (Bound::Unbounded, _) => b.clone(),
1890 (_, Bound::Unbounded) => a.clone(),
1891 (Bound::Included(va), Bound::Included(vb)) => {
1892 if compare_values(va, vb).is_some_and(|o| o == std::cmp::Ordering::Greater) {
1893 a.clone()
1894 } else {
1895 b.clone()
1896 }
1897 }
1898 (Bound::Excluded(va), Bound::Excluded(vb)) => {
1899 if compare_values(va, vb).is_some_and(|o| o == std::cmp::Ordering::Greater) {
1900 a.clone()
1901 } else {
1902 b.clone()
1903 }
1904 }
1905 (Bound::Included(va), Bound::Excluded(vb)) => {
1906 if compare_values(va, vb).is_some_and(|o| o == std::cmp::Ordering::Greater) {
1907 a.clone()
1908 } else {
1909 b.clone()
1910 }
1911 }
1912 (Bound::Excluded(va), Bound::Included(vb)) => {
1913 if compare_values(va, vb).is_some_and(|o| o == std::cmp::Ordering::Less) {
1914 b.clone()
1915 } else {
1916 a.clone()
1917 }
1918 }
1919 }
1920}
1921
1922fn tighter_upper(a: &std::ops::Bound<Value>, b: &std::ops::Bound<Value>) -> std::ops::Bound<Value> {
1923 use std::ops::Bound;
1924 match (a, b) {
1925 (Bound::Unbounded, _) => b.clone(),
1926 (_, Bound::Unbounded) => a.clone(),
1927 (Bound::Included(va), Bound::Included(vb)) => {
1928 if compare_values(va, vb).is_some_and(|o| o == std::cmp::Ordering::Less) {
1929 a.clone()
1930 } else {
1931 b.clone()
1932 }
1933 }
1934 (Bound::Excluded(va), Bound::Excluded(vb)) => {
1935 if compare_values(va, vb).is_some_and(|o| o == std::cmp::Ordering::Less) {
1936 a.clone()
1937 } else {
1938 b.clone()
1939 }
1940 }
1941 (Bound::Included(va), Bound::Excluded(vb)) => {
1942 if compare_values(va, vb).is_some_and(|o| o == std::cmp::Ordering::Less) {
1943 a.clone()
1944 } else {
1945 b.clone()
1946 }
1947 }
1948 (Bound::Excluded(va), Bound::Included(vb)) => {
1949 if compare_values(va, vb).is_some_and(|o| o == std::cmp::Ordering::Greater) {
1950 b.clone()
1951 } else {
1952 a.clone()
1953 }
1954 }
1955 }
1956}
1957
1958fn split_conjuncts(expr: &Expr) -> Vec<Expr> {
1960 match expr {
1961 Expr::BinaryOp {
1962 left,
1963 op: BinOp::And,
1964 right,
1965 } => {
1966 let mut out = split_conjuncts(left);
1967 out.extend(split_conjuncts(right));
1968 out
1969 }
1970 other => vec![other.clone()],
1971 }
1972}
1973
1974fn classify_index_predicate(
1977 expr: &Expr,
1978 params: &HashMap<String, Value>,
1979) -> Option<(String, IndexPredicateShape)> {
1980 match expr {
1981 Expr::BinaryOp { left, op, right } => {
1982 let col = extract_simple_col_ref(left)?;
1983 if !is_literal_or_param(right) {
1985 return None;
1986 }
1987 let rhs = resolve_simple_rhs(right, params)?;
1988 let shape = match op {
1989 BinOp::Eq => IndexPredicateShape::Equality(rhs),
1990 BinOp::Neq => IndexPredicateShape::NotEqual(rhs),
1991 BinOp::Lt => IndexPredicateShape::Range {
1992 lower: std::ops::Bound::Unbounded,
1993 upper: std::ops::Bound::Excluded(rhs),
1994 },
1995 BinOp::Lte => IndexPredicateShape::Range {
1996 lower: std::ops::Bound::Unbounded,
1997 upper: std::ops::Bound::Included(rhs),
1998 },
1999 BinOp::Gt => IndexPredicateShape::Range {
2000 lower: std::ops::Bound::Excluded(rhs),
2001 upper: std::ops::Bound::Unbounded,
2002 },
2003 BinOp::Gte => IndexPredicateShape::Range {
2004 lower: std::ops::Bound::Included(rhs),
2005 upper: std::ops::Bound::Unbounded,
2006 },
2007 _ => return None,
2008 };
2009 Some((col, shape))
2010 }
2011 Expr::InList {
2012 expr: e,
2013 list,
2014 negated: false,
2015 } => {
2016 let col = extract_simple_col_ref(e)?;
2017 let mut values = Vec::with_capacity(list.len());
2018 for v in list {
2019 if !is_literal_or_param(v) {
2020 return None;
2021 }
2022 values.push(resolve_simple_rhs(v, params)?);
2023 }
2024 Some((col, IndexPredicateShape::InList(values)))
2025 }
2026 Expr::IsNull { expr: e, negated } => {
2027 let col = extract_simple_col_ref(e)?;
2028 Some((
2029 col,
2030 if *negated {
2031 IndexPredicateShape::IsNotNull
2032 } else {
2033 IndexPredicateShape::IsNull
2034 },
2035 ))
2036 }
2037 _ => None,
2038 }
2039}
2040
2041fn classify_rejection_reason(filter: &Expr, column: &str) -> &'static str {
2044 fn walk(expr: &Expr, column: &str) -> Option<&'static str> {
2047 match expr {
2048 Expr::BinaryOp {
2049 left,
2050 op: BinOp::And | BinOp::Or,
2051 right,
2052 } => walk(left, column).or_else(|| walk(right, column)),
2053 Expr::BinaryOp { left, op, right } => {
2054 if expr_uses_arithmetic_on(left, column) || expr_uses_arithmetic_on(right, column) {
2057 return Some("arithmetic in predicate");
2058 }
2059 if expr_uses_function_on(left, column) || expr_uses_function_on(right, column) {
2061 return Some("function call in predicate");
2062 }
2063 if mentions_column_ref(left, column) || mentions_column_ref(right, column) {
2065 let left_is_col = extract_simple_col_ref(left).as_deref() == Some(column);
2066 let right_is_col_ref = matches!(right.as_ref(), Expr::Column(_));
2067 if left_is_col && right_is_col_ref {
2068 return Some("non-literal rhs");
2069 }
2070 }
2071 let _ = op;
2072 None
2073 }
2074 Expr::Like { expr: e, .. } => {
2075 if mentions_column_ref(e, column) {
2076 Some("LIKE is residual-only")
2077 } else {
2078 None
2079 }
2080 }
2081 Expr::InSubquery { expr: e, .. } => {
2082 if mentions_column_ref(e, column) {
2083 Some("non-literal rhs")
2084 } else {
2085 None
2086 }
2087 }
2088 _ => None,
2089 }
2090 }
2091 walk(filter, column).unwrap_or("first column not in WHERE")
2092}
2093
2094fn extract_simple_col_ref(expr: &Expr) -> Option<String> {
2095 match expr {
2096 Expr::Column(r) => Some(r.column.clone()),
2097 _ => None,
2098 }
2099}
2100
2101fn is_literal_or_param(expr: &Expr) -> bool {
2102 match expr {
2103 Expr::Literal(_) | Expr::Parameter(_) => true,
2104 Expr::FunctionCall { name, args } => {
2105 matches!(name.as_str(), "__add" | "__sub" | "__mul" | "__div")
2108 && args.iter().all(is_literal_or_param)
2109 }
2110 _ => false,
2111 }
2112}
2113
2114fn resolve_simple_rhs(expr: &Expr, params: &HashMap<String, Value>) -> Option<Value> {
2115 match expr {
2116 Expr::Literal(lit) => Some(match lit {
2117 Literal::Null => Value::Null,
2118 Literal::Bool(b) => Value::Bool(*b),
2119 Literal::Integer(i) => Value::Int64(*i),
2120 Literal::Real(f) => Value::Float64(*f),
2121 Literal::Text(s) => Value::Text(s.clone()),
2122 Literal::Vector(_) => return None,
2123 }),
2124 Expr::Parameter(name) => params.get(name).cloned(),
2125 Expr::FunctionCall { name, args }
2126 if matches!(name.as_str(), "__add" | "__sub" | "__mul" | "__div") =>
2127 {
2128 if args.len() != 2 {
2129 return None;
2130 }
2131 let a = resolve_simple_rhs(&args[0], params)?;
2132 let b = resolve_simple_rhs(&args[1], params)?;
2133 match (a, b, name.as_str()) {
2134 (Value::Int64(x), Value::Int64(y), "__add") => {
2135 Some(Value::Int64(x.wrapping_add(y)))
2136 }
2137 (Value::Int64(x), Value::Int64(y), "__sub") => {
2138 Some(Value::Int64(x.wrapping_sub(y)))
2139 }
2140 (Value::Int64(x), Value::Int64(y), "__mul") => {
2141 Some(Value::Int64(x.wrapping_mul(y)))
2142 }
2143 (Value::Int64(x), Value::Int64(y), "__div") if y != 0 => Some(Value::Int64(x / y)),
2144 (Value::Float64(x), Value::Float64(y), "__add") => Some(Value::Float64(x + y)),
2145 (Value::Float64(x), Value::Float64(y), "__sub") => Some(Value::Float64(x - y)),
2146 (Value::Float64(x), Value::Float64(y), "__mul") => Some(Value::Float64(x * y)),
2147 (Value::Float64(x), Value::Float64(y), "__div") => Some(Value::Float64(x / y)),
2148 _ => None,
2149 }
2150 }
2151 _ => None,
2152 }
2153}
2154
2155fn expr_uses_function_on(expr: &Expr, column: &str) -> bool {
2156 match expr {
2157 Expr::FunctionCall { name, args } => {
2158 if matches!(name.as_str(), "__add" | "__sub" | "__mul" | "__div") {
2161 return false;
2162 }
2163 args.iter().any(|a| mentions_column_ref(a, column))
2164 }
2165 Expr::BinaryOp { left, right, .. } => {
2166 expr_uses_function_on(left, column) || expr_uses_function_on(right, column)
2167 }
2168 _ => false,
2169 }
2170}
2171
2172fn expr_uses_arithmetic_on(expr: &Expr, column: &str) -> bool {
2173 match expr {
2176 Expr::FunctionCall { name, args } => {
2177 matches!(name.as_str(), "__add" | "__sub" | "__mul" | "__div")
2178 && args.iter().any(|a| mentions_column_ref(a, column))
2179 }
2180 Expr::BinaryOp { left, right, .. } => {
2181 expr_uses_arithmetic_on(left, column) || expr_uses_arithmetic_on(right, column)
2182 }
2183 _ => false,
2184 }
2185}
2186
2187fn mentions_column_ref(expr: &Expr, column: &str) -> bool {
2188 match expr {
2189 Expr::Column(r) => r.column == column,
2190 Expr::FunctionCall { args, .. } => args.iter().any(|a| mentions_column_ref(a, column)),
2191 Expr::BinaryOp { left, right, .. } => {
2192 mentions_column_ref(left, column) || mentions_column_ref(right, column)
2193 }
2194 Expr::UnaryOp { operand, .. } => mentions_column_ref(operand, column),
2195 Expr::IsNull { expr: e, .. } => mentions_column_ref(e, column),
2196 Expr::Like { expr: e, .. } => mentions_column_ref(e, column),
2197 Expr::InList { expr: e, .. } => mentions_column_ref(e, column),
2198 Expr::InSubquery { expr: e, .. } => mentions_column_ref(e, column),
2199 _ => false,
2200 }
2201}
2202
2203#[allow(clippy::too_many_arguments)]
2206fn execute_index_scan(
2207 db: &Database,
2208 table: &str,
2209 pick: &IndexPick,
2210 snapshot: contextdb_core::SnapshotId,
2211 tx: Option<TxId>,
2212) -> Result<(Vec<VersionedRow>, u64)> {
2213 use contextdb_core::{DirectedValue, SortDirection, TotalOrdAsc, TotalOrdDesc};
2214 use std::ops::Bound;
2215
2216 if let IndexPredicateShape::Equality(rhs) = &pick.shape
2218 && let Value::Float64(f) = rhs
2219 && f.is_nan()
2220 {
2221 return Ok((Vec::new(), 0));
2222 }
2223 if let IndexPredicateShape::Equality(Value::Null) = &pick.shape {
2226 return Ok((Vec::new(), 0));
2227 }
2228
2229 let pick = match coerce_pick_shape_to_column_type(db, table, pick) {
2237 Ok(coerced) => coerced,
2238 Err(_) => return Ok((Vec::new(), 0)),
2239 };
2240 let pick = &pick;
2241
2242 let indexes = db.relational_store().indexes.read();
2243 let storage = match indexes.get(&(table.to_string(), pick.name.clone())) {
2244 Some(s) => s,
2245 None => return Ok((Vec::new(), 0)),
2246 };
2247
2248 let first_dir = pick
2252 .columns
2253 .first()
2254 .map(|(_, d)| *d)
2255 .unwrap_or(SortDirection::Asc);
2256
2257 let wrap = |v: Value| -> DirectedValue {
2258 match first_dir {
2259 SortDirection::Asc => DirectedValue::Asc(TotalOrdAsc(v)),
2260 SortDirection::Desc => DirectedValue::Desc(TotalOrdDesc(v)),
2261 }
2262 };
2263
2264 let mut postings: Vec<contextdb_relational::IndexEntry> = Vec::new();
2266 let mut rows_examined: u64 = 0;
2267
2268 let collect_range = |postings: &mut Vec<contextdb_relational::IndexEntry>,
2269 examined: &mut u64,
2270 lower: Bound<Vec<DirectedValue>>,
2271 upper: Bound<Vec<DirectedValue>>| {
2272 for (_k, entries) in storage.tree.range((lower, upper)) {
2273 for e in entries {
2274 *examined += 1;
2275 if e.visible_at(snapshot) {
2276 postings.push(e.clone());
2277 }
2278 }
2279 }
2280 };
2281
2282 let is_composite = pick.columns.len() > 1;
2287
2288 match &pick.shape {
2289 IndexPredicateShape::Equality(v) => {
2290 if is_composite {
2291 let want = wrap(v.clone());
2293 for (key, entries) in storage.tree.iter() {
2294 if key.first() != Some(&want) {
2295 continue;
2296 }
2297 for e in entries {
2298 rows_examined += 1;
2299 if e.visible_at(snapshot) {
2300 postings.push(e.clone());
2301 }
2302 }
2303 }
2304 } else {
2305 let lower = vec![wrap(v.clone())];
2306 let upper = lower.clone();
2307 collect_range(
2308 &mut postings,
2309 &mut rows_examined,
2310 Bound::Included(lower),
2311 Bound::Included(upper),
2312 );
2313 }
2314 }
2315 IndexPredicateShape::InList(vs) => {
2316 for v in vs {
2317 if is_composite {
2318 let want = wrap(v.clone());
2319 for (key, entries) in storage.tree.iter() {
2320 if key.first() != Some(&want) {
2321 continue;
2322 }
2323 for e in entries {
2324 rows_examined += 1;
2325 if e.visible_at(snapshot) {
2326 postings.push(e.clone());
2327 }
2328 }
2329 }
2330 } else {
2331 let k = vec![wrap(v.clone())];
2332 collect_range(
2333 &mut postings,
2334 &mut rows_examined,
2335 Bound::Included(k.clone()),
2336 Bound::Included(k),
2337 );
2338 }
2339 }
2340 }
2341 IndexPredicateShape::Range { lower, upper } => {
2342 if is_composite {
2343 for (key, entries) in storage.tree.iter() {
2346 let Some(first) = key.first() else { continue };
2347 let in_lower = match lower {
2348 Bound::Unbounded => true,
2349 Bound::Included(v) => first >= &wrap(v.clone()),
2350 Bound::Excluded(v) => first > &wrap(v.clone()),
2351 };
2352 let in_upper = match upper {
2353 Bound::Unbounded => true,
2354 Bound::Included(v) => first <= &wrap(v.clone()),
2355 Bound::Excluded(v) => first < &wrap(v.clone()),
2356 };
2357 if !(in_lower && in_upper) {
2358 continue;
2359 }
2360 for e in entries {
2361 rows_examined += 1;
2362 if e.visible_at(snapshot) {
2363 postings.push(e.clone());
2364 }
2365 }
2366 }
2367 } else {
2368 let l = match lower {
2369 Bound::Included(v) => Bound::Included(vec![wrap(v.clone())]),
2370 Bound::Excluded(v) => Bound::Excluded(vec![wrap(v.clone())]),
2371 Bound::Unbounded => Bound::Unbounded,
2372 };
2373 let u = match upper {
2374 Bound::Included(v) => Bound::Included(vec![wrap(v.clone())]),
2375 Bound::Excluded(v) => Bound::Excluded(vec![wrap(v.clone())]),
2376 Bound::Unbounded => Bound::Unbounded,
2377 };
2378 collect_range(&mut postings, &mut rows_examined, l, u);
2379 }
2380 }
2381 IndexPredicateShape::NotEqual(v) => {
2382 let except_key = vec![wrap(v.clone())];
2385 for (k, entries) in storage.tree.iter() {
2386 if *k == except_key {
2387 continue;
2388 }
2389 for e in entries {
2390 rows_examined += 1;
2391 if e.visible_at(snapshot) {
2392 postings.push(e.clone());
2393 }
2394 }
2395 }
2396 }
2397 IndexPredicateShape::IsNull => {
2398 let k = vec![wrap(Value::Null)];
2399 collect_range(
2400 &mut postings,
2401 &mut rows_examined,
2402 Bound::Included(k.clone()),
2403 Bound::Included(k),
2404 );
2405 }
2406 IndexPredicateShape::IsNotNull => {
2407 let null_key = vec![wrap(Value::Null)];
2409 for (k, entries) in storage.tree.iter() {
2410 if *k == null_key {
2411 continue;
2412 }
2413 for e in entries {
2414 rows_examined += 1;
2415 if e.visible_at(snapshot) {
2416 postings.push(e.clone());
2417 }
2418 }
2419 }
2420 }
2421 }
2422
2423 drop(indexes);
2426 let row_ids: Vec<RowId> = postings.iter().map(|p| p.row_id).collect();
2427 if row_ids.is_empty() {
2428 return Ok((Vec::new(), rows_examined));
2429 }
2430 let tables = db.relational_store().tables.read();
2431 let rows_slice = tables.get(table);
2432 let mut out: Vec<VersionedRow> = Vec::with_capacity(row_ids.len());
2433 if let Some(rows) = rows_slice {
2434 let by_id: HashMap<RowId, &VersionedRow> = rows.iter().map(|r| (r.row_id, r)).collect();
2435 for rid in &row_ids {
2436 if let Some(r) = by_id.get(rid) {
2437 if (*r).visible_at(snapshot) {
2440 out.push((**r).clone());
2441 }
2442 }
2443 }
2444 }
2445 drop(tables);
2448 if let Some(tx_id) = tx {
2449 let overlay = db.index_scan_tx_overlay(tx_id, table, &pick.pushed_column, &pick.shape)?;
2450 let deleted_row_ids = overlay.deleted_row_ids;
2451 out.retain(|row| !deleted_row_ids.contains(&row.row_id));
2452 out.extend(overlay.matching_inserts);
2453 }
2454 Ok((out, rows_examined))
2455}
2456
2457pub(crate) fn range_includes(
2458 v: &Value,
2459 lower: &std::ops::Bound<Value>,
2460 upper: &std::ops::Bound<Value>,
2461) -> bool {
2462 use std::ops::Bound;
2463 let ok_lower = match lower {
2464 Bound::Unbounded => true,
2465 Bound::Included(b) => compare_values(v, b).is_some_and(|o| o != std::cmp::Ordering::Less),
2466 Bound::Excluded(b) => {
2467 compare_values(v, b).is_some_and(|o| o == std::cmp::Ordering::Greater)
2468 }
2469 };
2470 let ok_upper = match upper {
2471 Bound::Unbounded => true,
2472 Bound::Included(b) => {
2473 compare_values(v, b).is_some_and(|o| o != std::cmp::Ordering::Greater)
2474 }
2475 Bound::Excluded(b) => compare_values(v, b).is_some_and(|o| o == std::cmp::Ordering::Less),
2476 };
2477 ok_lower && ok_upper
2478}
2479
2480fn try_elide_sort(
2487 db: &Database,
2488 input: &PhysicalPlan,
2489 keys: &[contextdb_planner::SortKey],
2490 params: &HashMap<String, Value>,
2491 tx: Option<TxId>,
2492) -> Result<Option<QueryResult>> {
2493 fn find_scan(plan: &PhysicalPlan) -> Option<(&String, &Option<String>, &Option<Expr>)> {
2494 match plan {
2495 PhysicalPlan::Scan {
2496 table,
2497 alias,
2498 filter,
2499 } => Some((table, alias, filter)),
2500 PhysicalPlan::Project { input, .. }
2501 | PhysicalPlan::Filter { input, .. }
2502 | PhysicalPlan::Distinct { input }
2503 | PhysicalPlan::Limit { input, .. } => find_scan(input),
2504 _ => None,
2505 }
2506 }
2507 let Some((table, _alias, filter)) = find_scan(input) else {
2508 return Ok(None);
2509 };
2510 if filter.is_some() {
2515 return Ok(None);
2516 }
2517 let key_cols: Option<Vec<(&str, &contextdb_parser::ast::SortDirection)>> = keys
2519 .iter()
2520 .map(|k| match &k.expr {
2521 Expr::Column(r) => Some((r.column.as_str(), &k.direction)),
2522 _ => None,
2523 })
2524 .collect();
2525 let Some(key_cols) = key_cols else {
2526 return Ok(None);
2527 };
2528 let meta = match db.table_meta(table) {
2529 Some(m) => m,
2530 None => return Ok(None),
2531 };
2532 let matching_index = meta.indexes.iter().find(|decl| {
2533 if decl.columns.len() < key_cols.len() {
2534 return false;
2535 }
2536 decl.columns
2537 .iter()
2538 .zip(key_cols.iter())
2539 .all(|((col, dir), (kcol, kdir))| col == kcol && core_dir_matches_ast(*dir, **kdir))
2540 });
2541 let Some(matching) = matching_index else {
2542 return Ok(None);
2543 };
2544 run_index_scan_with_order(db, table, matching, filter.as_ref(), params, tx)
2545}
2546
2547fn run_index_scan_with_order(
2551 db: &Database,
2552 table: &str,
2553 decl: &contextdb_core::IndexDecl,
2554 filter: Option<&Expr>,
2555 params: &HashMap<String, Value>,
2556 tx: Option<TxId>,
2557) -> Result<Option<QueryResult>> {
2558 use std::borrow::Cow;
2559 let snapshot = db.snapshot_for_read();
2560 let schema_columns = db.table_meta(table).map(|meta| {
2561 meta.columns
2562 .into_iter()
2563 .map(|column| column.name)
2564 .collect::<Vec<_>>()
2565 });
2566 let resolved_filter = filter
2567 .map(|expr| resolve_in_subqueries(db, expr, params, tx))
2568 .transpose()?;
2569
2570 let pick = IndexPick {
2573 name: decl.name.clone(),
2574 columns: decl.columns.clone(),
2575 shape: IndexPredicateShape::Range {
2576 lower: std::ops::Bound::Unbounded,
2577 upper: std::ops::Bound::Unbounded,
2578 },
2579 pushed_column: decl.columns[0].0.clone(),
2580 };
2581 let (rows, examined) = execute_index_scan(db, table, &pick, snapshot, tx)?;
2582 db.__bump_rows_examined(examined);
2583 let mut result = materialize_rows(
2584 rows,
2585 resolved_filter.as_ref(),
2586 params,
2587 schema_columns.as_deref(),
2588 )?;
2589 let mut pushed: smallvec::SmallVec<[Cow<'static, str>; 4]> = smallvec::SmallVec::new();
2590 pushed.push(Cow::Owned(decl.columns[0].0.clone()));
2591 result.trace = crate::database::QueryTrace {
2592 physical_plan: "IndexScan",
2593 index_used: Some(decl.name.clone()),
2594 predicates_pushed: pushed,
2595 indexes_considered: Default::default(),
2596 sort_elided: true,
2597 };
2598 Ok(Some(result))
2599}
2600
2601fn sort_keys_match_index_prefix(
2602 db: &Database,
2603 input: &PhysicalPlan,
2604 index_name: &str,
2605 keys: &[contextdb_planner::SortKey],
2606) -> bool {
2607 fn find_scan_and_filter(plan: &PhysicalPlan) -> Option<(&String, &Option<Expr>)> {
2608 match plan {
2609 PhysicalPlan::Scan { table, filter, .. } => Some((table, filter)),
2610 PhysicalPlan::Project { input, .. }
2611 | PhysicalPlan::Filter { input, .. }
2612 | PhysicalPlan::Distinct { input }
2613 | PhysicalPlan::Limit { input, .. } => find_scan_and_filter(input),
2614 _ => None,
2615 }
2616 }
2617 let (table, filter) = match find_scan_and_filter(input) {
2618 Some(t) => t,
2619 None => return false,
2620 };
2621 let meta = match db.table_meta(table) {
2622 Some(m) => m,
2623 None => return false,
2624 };
2625 let decl = meta.indexes.iter().find(|i| i.name == index_name);
2626 let Some(decl) = decl else {
2627 return false;
2628 };
2629 if let Some(filter_expr) = filter.as_ref()
2634 && let Some(leading_col) = decl.columns.first().map(|(c, _)| c.as_str())
2635 {
2636 let conjuncts = split_conjuncts(filter_expr);
2637 let empty_params = HashMap::new();
2638 for conjunct in &conjuncts {
2639 if let Some((col, shape)) = classify_index_predicate(conjunct, &empty_params)
2640 && col == leading_col
2641 && matches!(
2642 shape,
2643 IndexPredicateShape::InList(_) | IndexPredicateShape::NotEqual(_)
2644 )
2645 {
2646 return false;
2647 }
2648 }
2649 }
2650 let pinned_prefix_len = count_equality_prefix(filter.as_ref(), &decl.columns);
2655 let remaining_index_cols = &decl.columns[pinned_prefix_len..];
2656 if remaining_index_cols.len() < keys.len() {
2657 return false;
2658 }
2659 remaining_index_cols
2660 .iter()
2661 .zip(keys.iter())
2662 .all(|((col, dir), k)| match &k.expr {
2663 Expr::Column(r) => r.column == *col && core_dir_matches_ast(*dir, k.direction),
2664 _ => false,
2665 })
2666}
2667
2668fn count_equality_prefix(
2669 filter: Option<&Expr>,
2670 columns: &[(String, contextdb_core::SortDirection)],
2671) -> usize {
2672 let Some(filter) = filter else {
2673 return 0;
2674 };
2675 let conjuncts = split_conjuncts(filter);
2676 let mut pinned = 0usize;
2677 for (col, _) in columns {
2678 let has_eq = conjuncts.iter().any(|c| match c {
2679 Expr::BinaryOp {
2680 left,
2681 op: BinOp::Eq,
2682 right,
2683 } => {
2684 let left_is_col = matches!(left.as_ref(), Expr::Column(r) if r.column == *col);
2685 let right_is_simple =
2686 matches!(right.as_ref(), Expr::Literal(_) | Expr::Parameter(_));
2687 left_is_col && right_is_simple
2688 }
2689 _ => false,
2690 });
2691 if has_eq {
2692 pinned += 1;
2693 } else {
2694 break;
2695 }
2696 }
2697 pinned
2698}
2699
2700fn core_dir_matches_ast(
2701 core: contextdb_core::SortDirection,
2702 ast: contextdb_parser::ast::SortDirection,
2703) -> bool {
2704 matches!(
2705 (core, ast),
2706 (
2707 contextdb_core::SortDirection::Asc,
2708 contextdb_parser::ast::SortDirection::Asc
2709 ) | (
2710 contextdb_core::SortDirection::Desc,
2711 contextdb_parser::ast::SortDirection::Desc
2712 )
2713 )
2714}
2715
2716fn validate_update_state_transition(
2719 db: &Database,
2720 table: &str,
2721 existing: &VersionedRow,
2722 next_values: &HashMap<String, Value>,
2723) -> Result<()> {
2724 let Some(meta) = db.table_meta(table) else {
2725 return Ok(());
2726 };
2727 let Some(state_machine) = meta.state_machine else {
2728 return Ok(());
2729 };
2730
2731 let old_state = existing
2732 .values
2733 .get(&state_machine.column)
2734 .and_then(Value::as_text);
2735 let new_state = next_values
2736 .get(&state_machine.column)
2737 .and_then(Value::as_text);
2738
2739 let (Some(old_state), Some(new_state)) = (old_state, new_state) else {
2740 return Ok(());
2741 };
2742
2743 if old_state == new_state
2744 || db.relational_store().validate_state_transition(
2745 table,
2746 &state_machine.column,
2747 old_state,
2748 new_state,
2749 )
2750 {
2751 return Ok(());
2752 }
2753
2754 Err(Error::InvalidStateTransition(format!(
2755 "{old_state} -> {new_state}"
2756 )))
2757}
2758
2759fn estimate_row_bytes_for_meta(
2760 values: &HashMap<String, Value>,
2761 meta: &TableMeta,
2762 include_vectors: bool,
2763) -> usize {
2764 let mut bytes = 96usize;
2765 for column in &meta.columns {
2766 let Some(value) = values.get(&column.name) else {
2767 continue;
2768 };
2769 if !include_vectors && matches!(column.column_type, ColumnType::Vector(_)) {
2770 continue;
2771 }
2772 bytes = bytes.saturating_add(32 + column.name.len() * 8 + value.estimated_bytes());
2773 }
2774 bytes
2775}
2776
2777fn estimate_vector_search_bytes(dimension: usize, k: usize) -> usize {
2778 k.saturating_mul(3)
2779 .saturating_mul(dimension)
2780 .saturating_mul(std::mem::size_of::<f32>())
2781}
2782
2783fn estimate_bfs_working_bytes<T>(
2784 frontier: &[T],
2785 steps: &[contextdb_planner::GraphStepPlan],
2786) -> usize {
2787 let max_hops = steps.iter().fold(0usize, |acc, step| {
2788 acc.saturating_add(step.max_depth as usize)
2789 });
2790 frontier
2791 .len()
2792 .saturating_mul(2048)
2793 .saturating_mul(max_hops.max(1))
2794}
2795
2796fn dedupe_graph_frontier(
2797 frontier: Vec<(HashMap<String, uuid::Uuid>, uuid::Uuid, u32)>,
2798 steps: &[contextdb_planner::GraphStepPlan],
2799) -> Vec<(HashMap<String, uuid::Uuid>, uuid::Uuid, u32)> {
2800 let mut best =
2801 HashMap::<Vec<uuid::Uuid>, (HashMap<String, uuid::Uuid>, uuid::Uuid, u32)>::new();
2802
2803 for (bindings, current_id, depth) in frontier {
2804 let mut key = Vec::with_capacity(steps.len());
2805 for step in steps {
2806 if let Some(id) = bindings.get(&step.target_alias) {
2807 key.push(*id);
2808 }
2809 }
2810
2811 best.entry(key)
2812 .and_modify(|existing| {
2813 if depth < existing.2 {
2814 *existing = (bindings.clone(), current_id, depth);
2815 }
2816 })
2817 .or_insert((bindings, current_id, depth));
2818 }
2819
2820 best.into_values().collect()
2821}
2822
2823fn estimate_drop_table_bytes(db: &Database, table: &str) -> usize {
2824 let meta = db.table_meta(table);
2825 let metadata_bytes = meta.as_ref().map(TableMeta::estimated_bytes).unwrap_or(0);
2826 let snapshot = db.snapshot();
2827 let rows = db.scan(table, snapshot).unwrap_or_default();
2828 let row_bytes = rows.iter().fold(0usize, |acc, row| {
2829 acc.saturating_add(meta.as_ref().map_or_else(
2830 || row.estimated_bytes(),
2831 |meta| estimate_row_bytes_for_meta(&row.values, meta, false),
2832 ))
2833 });
2834 let vector_bytes = rows
2835 .iter()
2836 .filter_map(|row| db.live_vector_entry(row.row_id, snapshot))
2837 .fold(0usize, |acc, entry| {
2838 acc.saturating_add(entry.estimated_bytes())
2839 });
2840 let edge_bytes = rows.iter().fold(0usize, |acc, row| {
2841 match (
2842 row.values.get("source_id").and_then(Value::as_uuid),
2843 row.values.get("target_id").and_then(Value::as_uuid),
2844 row.values.get("edge_type").and_then(Value::as_text),
2845 ) {
2846 (Some(_), Some(_), Some(edge_type)) => acc.saturating_add(
2847 96 + edge_type.len().saturating_mul(16) + estimate_row_value_bytes(&HashMap::new()),
2848 ),
2849 _ => acc,
2850 }
2851 });
2852 metadata_bytes
2853 .saturating_add(row_bytes)
2854 .saturating_add(vector_bytes)
2855 .saturating_add(edge_bytes)
2856}
2857
2858fn materialize_rows(
2859 rows: Vec<VersionedRow>,
2860 filter: Option<&Expr>,
2861 params: &HashMap<String, Value>,
2862 schema_columns: Option<&[String]>,
2863) -> Result<QueryResult> {
2864 let filtered: Vec<VersionedRow> = rows
2865 .into_iter()
2866 .filter(|r| filter.is_none_or(|f| row_matches(r, f, params).unwrap_or(false)))
2867 .collect();
2868
2869 let keys = if let Some(schema_columns) = schema_columns {
2870 schema_columns.to_vec()
2871 } else {
2872 let mut keys = BTreeSet::new();
2873 for r in &filtered {
2874 for k in r.values.keys() {
2875 keys.insert(k.clone());
2876 }
2877 }
2878 keys.into_iter().collect::<Vec<_>>()
2879 };
2880
2881 let mut columns = vec!["row_id".to_string()];
2882 columns.extend(keys.iter().cloned());
2883
2884 let rows = filtered
2885 .into_iter()
2886 .map(|r| {
2887 let mut out = vec![Value::Int64(r.row_id.0 as i64)];
2888 for k in &keys {
2889 out.push(r.values.get(k).cloned().unwrap_or(Value::Null));
2890 }
2891 out
2892 })
2893 .collect();
2894
2895 Ok(QueryResult {
2896 columns,
2897 rows,
2898 rows_affected: 0,
2899 trace: crate::database::QueryTrace::scan(),
2900 cascade: None,
2901 })
2902}
2903
2904fn row_matches(row: &VersionedRow, expr: &Expr, params: &HashMap<String, Value>) -> Result<bool> {
2905 Ok(eval_bool_expr(row, expr, params)?.unwrap_or(false))
2906}
2907
2908fn eval_expr_value(
2909 row: &VersionedRow,
2910 expr: &Expr,
2911 params: &HashMap<String, Value>,
2912) -> Result<Value> {
2913 match expr {
2914 Expr::Column(c) => {
2915 if c.column == "row_id" {
2916 Ok(Value::Int64(row.row_id.0 as i64))
2917 } else {
2918 Ok(row.values.get(&c.column).cloned().unwrap_or(Value::Null))
2919 }
2920 }
2921 Expr::BinaryOp { left, op, right } => {
2922 let left = eval_expr_value(row, left, params)?;
2923 let right = eval_expr_value(row, right, params)?;
2924 eval_binary_op(op, &left, &right)
2925 }
2926 Expr::UnaryOp { op, operand } => {
2927 let value = eval_expr_value(row, operand, params)?;
2928 match op {
2929 UnaryOp::Not => Ok(Value::Bool(!value_to_bool(&value))),
2930 UnaryOp::Neg => match value {
2931 Value::Int64(v) => Ok(Value::Int64(-v)),
2932 Value::Float64(v) => Ok(Value::Float64(-v)),
2933 _ => Err(Error::PlanError(
2934 "cannot negate non-numeric value".to_string(),
2935 )),
2936 },
2937 }
2938 }
2939 Expr::FunctionCall { name, args } => eval_function_in_row_context(row, name, args, params),
2940 Expr::IsNull { expr, negated } => {
2941 let is_null = eval_expr_value(row, expr, params)? == Value::Null;
2942 Ok(Value::Bool(if *negated { !is_null } else { is_null }))
2943 }
2944 Expr::InList {
2945 expr,
2946 list,
2947 negated,
2948 } => {
2949 let needle = eval_expr_value(row, expr, params)?;
2950 let matched = list.iter().try_fold(false, |found, item| {
2951 if found {
2952 Ok(true)
2953 } else {
2954 let candidate = eval_expr_value(row, item, params)?;
2955 Ok(
2956 matches!(compare_values(&needle, &candidate), Some(Ordering::Equal))
2957 || (needle != Value::Null
2958 && candidate != Value::Null
2959 && needle == candidate),
2960 )
2961 }
2962 })?;
2963 Ok(Value::Bool(if *negated { !matched } else { matched }))
2964 }
2965 Expr::Like {
2966 expr,
2967 pattern,
2968 negated,
2969 } => {
2970 let matches = match (
2971 eval_expr_value(row, expr, params)?,
2972 eval_expr_value(row, pattern, params)?,
2973 ) {
2974 (Value::Text(value), Value::Text(pattern)) => like_matches(&value, &pattern),
2975 _ => false,
2976 };
2977 Ok(Value::Bool(if *negated { !matches } else { matches }))
2978 }
2979 _ => resolve_expr(expr, params),
2980 }
2981}
2982
2983pub fn resolve_expr(expr: &Expr, params: &HashMap<String, Value>) -> Result<Value> {
2984 match expr {
2985 Expr::Literal(l) => Ok(match l {
2986 Literal::Null => Value::Null,
2987 Literal::Bool(v) => Value::Bool(*v),
2988 Literal::Integer(v) => Value::Int64(*v),
2989 Literal::Real(v) => Value::Float64(*v),
2990 Literal::Text(v) => Value::Text(v.clone()),
2991 Literal::Vector(v) => Value::Vector(v.clone()),
2992 }),
2993 Expr::Parameter(p) => params
2994 .get(p)
2995 .cloned()
2996 .ok_or_else(|| Error::NotFound(format!("missing parameter: {}", p))),
2997 Expr::Column(c) => Ok(Value::Text(c.column.clone())),
2998 Expr::UnaryOp { op, operand } => match op {
2999 UnaryOp::Neg => match resolve_expr(operand, params)? {
3000 Value::Int64(v) => Ok(Value::Int64(-v)),
3001 Value::Float64(v) => Ok(Value::Float64(-v)),
3002 _ => Err(Error::PlanError(
3003 "cannot negate non-numeric value".to_string(),
3004 )),
3005 },
3006 UnaryOp::Not => Err(Error::PlanError(
3007 "boolean NOT requires row context".to_string(),
3008 )),
3009 },
3010 Expr::FunctionCall { name, args } => {
3011 let values = args
3012 .iter()
3013 .map(|arg| resolve_expr(arg, params))
3014 .collect::<Result<Vec<_>>>()?;
3015 eval_function(name, &values)
3016 }
3017 Expr::CosineDistance { right, .. } => resolve_expr(right, params),
3018 _ => Err(Error::PlanError("unsupported expression".to_string())),
3019 }
3020}
3021
3022fn compare_values(a: &Value, b: &Value) -> Option<Ordering> {
3023 match (a, b) {
3024 (Value::Int64(left), Value::Int64(right)) => Some(left.cmp(right)),
3025 (Value::Float64(left), Value::Float64(right)) => Some(left.total_cmp(right)),
3026 (Value::Text(left), Value::Text(right)) => Some(left.cmp(right)),
3027 (Value::Timestamp(left), Value::Timestamp(right)) => Some(left.cmp(right)),
3028 (Value::Int64(left), Value::Float64(right)) => Some((*left as f64).total_cmp(right)),
3029 (Value::Float64(left), Value::Int64(right)) => Some(left.total_cmp(&(*right as f64))),
3030 (Value::Timestamp(left), Value::Int64(right)) => Some(left.cmp(right)),
3031 (Value::Int64(left), Value::Timestamp(right)) => Some(left.cmp(right)),
3032 (Value::Bool(left), Value::Bool(right)) => Some(left.cmp(right)),
3033 (Value::Uuid(left), Value::Uuid(right)) => Some(left.cmp(right)),
3034 (Value::Uuid(u), Value::Text(t)) => {
3035 if let Ok(parsed) = t.parse::<uuid::Uuid>() {
3036 Some(u.cmp(&parsed))
3037 } else {
3038 None
3039 }
3040 }
3041 (Value::Text(t), Value::Uuid(u)) => {
3042 if let Ok(parsed) = t.parse::<uuid::Uuid>() {
3043 Some(parsed.cmp(u))
3044 } else {
3045 None
3046 }
3047 }
3048 (Value::TxId(a), Value::TxId(b)) => Some(a.0.cmp(&b.0)),
3049 (Value::TxId(a), Value::Int64(b)) => {
3050 if *b < 0 {
3051 Some(Ordering::Greater)
3052 } else {
3053 Some(a.0.cmp(&(*b as u64)))
3054 }
3055 }
3056 (Value::Int64(a), Value::TxId(b)) => {
3057 if *a < 0 {
3058 Some(Ordering::Less)
3059 } else {
3060 Some((*a as u64).cmp(&b.0))
3061 }
3062 }
3063 (Value::TxId(_), Value::Timestamp(_)) | (Value::Timestamp(_), Value::TxId(_)) => None,
3064 (Value::Null, _) | (_, Value::Null) => None,
3065 _ => None,
3066 }
3067}
3068
3069fn eval_bool_expr(
3070 row: &VersionedRow,
3071 expr: &Expr,
3072 params: &HashMap<String, Value>,
3073) -> Result<Option<bool>> {
3074 match expr {
3075 Expr::BinaryOp { left, op, right } => match op {
3076 BinOp::Eq | BinOp::Neq | BinOp::Lt | BinOp::Lte | BinOp::Gt | BinOp::Gte => {
3077 let left = eval_expr_value(row, left, params)?;
3078 let right = eval_expr_value(row, right, params)?;
3079 if left == Value::Null || right == Value::Null {
3080 return Ok(None);
3081 }
3082
3083 let result = match op {
3084 BinOp::Eq => {
3085 compare_values(&left, &right) == Some(Ordering::Equal) || left == right
3086 }
3087 BinOp::Neq => {
3088 !(compare_values(&left, &right) == Some(Ordering::Equal) || left == right)
3089 }
3090 BinOp::Lt => compare_values(&left, &right) == Some(Ordering::Less),
3091 BinOp::Lte => matches!(
3092 compare_values(&left, &right),
3093 Some(Ordering::Less | Ordering::Equal)
3094 ),
3095 BinOp::Gt => compare_values(&left, &right) == Some(Ordering::Greater),
3096 BinOp::Gte => matches!(
3097 compare_values(&left, &right),
3098 Some(Ordering::Greater | Ordering::Equal)
3099 ),
3100 BinOp::And | BinOp::Or => unreachable!(),
3101 };
3102 Ok(Some(result))
3103 }
3104 BinOp::And => {
3105 let left = eval_bool_expr(row, left, params)?;
3106 if left == Some(false) {
3107 return Ok(Some(false));
3108 }
3109 let right = eval_bool_expr(row, right, params)?;
3110 Ok(match (left, right) {
3111 (Some(true), Some(true)) => Some(true),
3112 (Some(true), other) => other,
3113 (None, Some(false)) => Some(false),
3114 (None, Some(true)) | (None, None) => None,
3115 (Some(false), _) => Some(false),
3116 })
3117 }
3118 BinOp::Or => {
3119 let left = eval_bool_expr(row, left, params)?;
3120 if left == Some(true) {
3121 return Ok(Some(true));
3122 }
3123 let right = eval_bool_expr(row, right, params)?;
3124 Ok(match (left, right) {
3125 (Some(false), Some(false)) => Some(false),
3126 (Some(false), other) => other,
3127 (None, Some(true)) => Some(true),
3128 (None, Some(false)) | (None, None) => None,
3129 (Some(true), _) => Some(true),
3130 })
3131 }
3132 },
3133 Expr::UnaryOp {
3134 op: UnaryOp::Not,
3135 operand,
3136 } => Ok(eval_bool_expr(row, operand, params)?.map(|value| !value)),
3137 Expr::InList {
3138 expr,
3139 list,
3140 negated,
3141 } => {
3142 let needle = eval_expr_value(row, expr, params)?;
3143 if needle == Value::Null {
3144 return Ok(None);
3145 }
3146
3147 let matched = list.iter().try_fold(false, |found, item| {
3148 if found {
3149 Ok(true)
3150 } else {
3151 let candidate = eval_expr_value(row, item, params)?;
3152 Ok(
3153 matches!(compare_values(&needle, &candidate), Some(Ordering::Equal))
3154 || (candidate != Value::Null && needle == candidate),
3155 )
3156 }
3157 })?;
3158 Ok(Some(if *negated { !matched } else { matched }))
3159 }
3160 Expr::InSubquery { .. } => Err(Error::PlanError(
3161 "IN (subquery) must be resolved before execution".to_string(),
3162 )),
3163 Expr::Like {
3164 expr,
3165 pattern,
3166 negated,
3167 } => {
3168 let left = eval_expr_value(row, expr, params)?;
3169 let right = eval_expr_value(row, pattern, params)?;
3170 let matched = match (left, right) {
3171 (Value::Text(value), Value::Text(pattern)) => like_matches(&value, &pattern),
3172 _ => false,
3173 };
3174 Ok(Some(if *negated { !matched } else { matched }))
3175 }
3176 Expr::IsNull { expr, negated } => {
3177 let is_null = eval_expr_value(row, expr, params)? == Value::Null;
3178 Ok(Some(if *negated { !is_null } else { is_null }))
3179 }
3180 Expr::FunctionCall { .. } => match eval_expr_value(row, expr, params)? {
3181 Value::Bool(value) => Ok(Some(value)),
3182 Value::Null => Ok(None),
3183 _ => Err(Error::PlanError(format!(
3184 "unsupported WHERE expression: {:?}",
3185 expr
3186 ))),
3187 },
3188 _ => Err(Error::PlanError(format!(
3189 "unsupported WHERE expression: {:?}",
3190 expr
3191 ))),
3192 }
3193}
3194
3195fn eval_binary_op(op: &BinOp, left: &Value, right: &Value) -> Result<Value> {
3196 let bool_value = match op {
3197 BinOp::Eq => {
3198 if left == &Value::Null || right == &Value::Null {
3199 false
3200 } else {
3201 compare_values(left, right) == Some(Ordering::Equal) || left == right
3202 }
3203 }
3204 BinOp::Neq => {
3205 if left == &Value::Null || right == &Value::Null {
3206 false
3207 } else {
3208 !(compare_values(left, right) == Some(Ordering::Equal) || left == right)
3209 }
3210 }
3211 BinOp::Lt => compare_values(left, right) == Some(Ordering::Less),
3212 BinOp::Lte => matches!(
3213 compare_values(left, right),
3214 Some(Ordering::Less | Ordering::Equal)
3215 ),
3216 BinOp::Gt => compare_values(left, right) == Some(Ordering::Greater),
3217 BinOp::Gte => matches!(
3218 compare_values(left, right),
3219 Some(Ordering::Greater | Ordering::Equal)
3220 ),
3221 BinOp::And => value_to_bool(left) && value_to_bool(right),
3222 BinOp::Or => value_to_bool(left) || value_to_bool(right),
3223 };
3224 Ok(Value::Bool(bool_value))
3225}
3226
3227fn value_to_bool(value: &Value) -> bool {
3228 matches!(value, Value::Bool(true))
3229}
3230
3231fn compare_sort_values(left: &Value, right: &Value, direction: SortDirection) -> Ordering {
3232 match (left, right) {
3233 (Value::Null, Value::Null) => Ordering::Equal,
3234 (Value::Null, _) => match direction {
3235 SortDirection::Asc => Ordering::Greater,
3236 SortDirection::Desc => Ordering::Less,
3237 SortDirection::CosineDistance => Ordering::Equal,
3238 },
3239 (_, Value::Null) => match direction {
3240 SortDirection::Asc => Ordering::Less,
3241 SortDirection::Desc => Ordering::Greater,
3242 SortDirection::CosineDistance => Ordering::Equal,
3243 },
3244 _ => {
3245 let ordering = compare_values(left, right).unwrap_or(Ordering::Equal);
3246 match direction {
3247 SortDirection::Asc => ordering,
3248 SortDirection::Desc => ordering.reverse(),
3249 SortDirection::CosineDistance => ordering,
3250 }
3251 }
3252 }
3253}
3254
3255fn eval_assignment_expr(
3256 expr: &Expr,
3257 row_values: &HashMap<String, Value>,
3258 params: &HashMap<String, Value>,
3259) -> Result<Value> {
3260 match expr {
3261 Expr::Literal(lit) => literal_to_value(lit),
3262 Expr::Parameter(name) => params
3263 .get(name)
3264 .cloned()
3265 .ok_or_else(|| Error::Other(format!("unknown parameter: {}", name))),
3266 Expr::Column(col_ref) => row_values
3267 .get(&col_ref.column)
3268 .cloned()
3269 .ok_or_else(|| Error::Other(format!("column not found: {}", col_ref.column))),
3270 Expr::BinaryOp { left, op, right } => {
3271 let left = eval_assignment_expr(left, row_values, params)?;
3272 let right = eval_assignment_expr(right, row_values, params)?;
3273 eval_binary_op(op, &left, &right)
3274 }
3275 Expr::UnaryOp { op, operand } => match op {
3276 UnaryOp::Neg => match eval_assignment_expr(operand, row_values, params)? {
3277 Value::Int64(value) => Ok(Value::Int64(-value)),
3278 Value::Float64(value) => Ok(Value::Float64(-value)),
3279 _ => Err(Error::Other(format!(
3280 "unsupported expression in UPDATE SET: {:?}",
3281 expr
3282 ))),
3283 },
3284 UnaryOp::Not => Err(Error::Other(format!(
3285 "unsupported expression in UPDATE SET: {:?}",
3286 expr
3287 ))),
3288 },
3289 Expr::FunctionCall { name, args } => {
3290 let evaluated = args
3291 .iter()
3292 .map(|arg| eval_assignment_expr(arg, row_values, params))
3293 .collect::<Result<Vec<_>>>()?;
3294 eval_function(name, &evaluated)
3295 }
3296 _ => Err(Error::Other(format!(
3297 "unsupported expression in UPDATE SET: {:?}",
3298 expr
3299 ))),
3300 }
3301}
3302
3303fn apply_on_conflict_updates(
3304 db: &Database,
3305 table: &str,
3306 mut insert_values: HashMap<String, Value>,
3307 existing_row: &VersionedRow,
3308 on_conflict: &OnConflictPlan,
3309 params: &HashMap<String, Value>,
3310 active_tx: Option<TxId>,
3311) -> Result<HashMap<String, Value>> {
3312 if on_conflict.update_columns.is_empty() {
3313 return Ok(insert_values);
3314 }
3315
3316 if let Some(meta) = db.table_meta(table) {
3321 for (column, _) in &on_conflict.update_columns {
3322 if let Some(col_def) = meta.columns.iter().find(|c| c.name == *column)
3323 && col_def.immutable
3324 {
3325 return Err(Error::ImmutableColumn {
3326 table: table.to_string(),
3327 column: column.clone(),
3328 });
3329 }
3330 }
3331 }
3332
3333 let current_tx_max = Some(db.committed_watermark());
3334
3335 let mut merged = existing_row.values.clone();
3336 for (column, expr) in &on_conflict.update_columns {
3337 let value = eval_assignment_expr(expr, &existing_row.values, params)?;
3338 merged.insert(
3339 column.clone(),
3340 coerce_value_for_column(db, table, column, value, current_tx_max, active_tx)?,
3341 );
3342 }
3343
3344 for (column, value) in insert_values.drain() {
3345 merged.entry(column).or_insert(value);
3346 }
3347
3348 Ok(merged)
3349}
3350
3351fn literal_to_value(lit: &Literal) -> Result<Value> {
3352 Ok(match lit {
3353 Literal::Null => Value::Null,
3354 Literal::Bool(v) => Value::Bool(*v),
3355 Literal::Integer(v) => Value::Int64(*v),
3356 Literal::Real(v) => Value::Float64(*v),
3357 Literal::Text(v) => Value::Text(v.clone()),
3358 Literal::Vector(v) => Value::Vector(v.clone()),
3359 })
3360}
3361
3362fn eval_arithmetic(name: &str, args: &[Value]) -> Result<Value> {
3363 let [left, right] = args else {
3364 return Err(Error::PlanError(format!(
3365 "function {} expects 2 arguments",
3366 name
3367 )));
3368 };
3369
3370 match (left, right) {
3371 (Value::Int64(left), Value::Int64(right)) => match name {
3372 "__add" => Ok(Value::Int64(left + right)),
3373 "__sub" => Ok(Value::Int64(left - right)),
3374 "__mul" => Ok(Value::Int64(left * right)),
3375 "__div" => Ok(Value::Int64(left / right)),
3376 _ => Err(Error::PlanError(format!("unknown function: {}", name))),
3377 },
3378 (Value::Float64(left), Value::Float64(right)) => match name {
3379 "__add" => Ok(Value::Float64(left + right)),
3380 "__sub" => Ok(Value::Float64(left - right)),
3381 "__mul" => Ok(Value::Float64(left * right)),
3382 "__div" => Ok(Value::Float64(left / right)),
3383 _ => Err(Error::PlanError(format!("unknown function: {}", name))),
3384 },
3385 (Value::Int64(left), Value::Float64(right)) => match name {
3386 "__add" => Ok(Value::Float64(*left as f64 + right)),
3387 "__sub" => Ok(Value::Float64(*left as f64 - right)),
3388 "__mul" => Ok(Value::Float64(*left as f64 * right)),
3389 "__div" => Ok(Value::Float64(*left as f64 / right)),
3390 _ => Err(Error::PlanError(format!("unknown function: {}", name))),
3391 },
3392 (Value::Float64(left), Value::Int64(right)) => match name {
3393 "__add" => Ok(Value::Float64(left + *right as f64)),
3394 "__sub" => Ok(Value::Float64(left - *right as f64)),
3395 "__mul" => Ok(Value::Float64(left * *right as f64)),
3396 "__div" => Ok(Value::Float64(left / *right as f64)),
3397 _ => Err(Error::PlanError(format!("unknown function: {}", name))),
3398 },
3399 _ => Err(Error::PlanError(format!(
3400 "function {} expects numeric arguments",
3401 name
3402 ))),
3403 }
3404}
3405
3406fn eval_function_in_row_context(
3407 row: &VersionedRow,
3408 name: &str,
3409 args: &[Expr],
3410 params: &HashMap<String, Value>,
3411) -> Result<Value> {
3412 let values = args
3413 .iter()
3414 .map(|arg| eval_expr_value(row, arg, params))
3415 .collect::<Result<Vec<_>>>()?;
3416 eval_function(name, &values)
3417}
3418
3419fn eval_function(name: &str, args: &[Value]) -> Result<Value> {
3420 match name.to_ascii_lowercase().as_str() {
3421 "__add" | "__sub" | "__mul" | "__div" => eval_arithmetic(name, args),
3422 "coalesce" => Ok(args
3423 .iter()
3424 .find(|value| **value != Value::Null)
3425 .cloned()
3426 .unwrap_or(Value::Null)),
3427 "now" => Ok(Value::Timestamp(
3428 SystemTime::now()
3429 .duration_since(UNIX_EPOCH)
3430 .map_err(|err| Error::PlanError(err.to_string()))?
3431 .as_secs() as i64,
3432 )),
3433 _ => Err(Error::PlanError(format!("unknown function: {}", name))),
3434 }
3435}
3436
3437fn like_matches(value: &str, pattern: &str) -> bool {
3438 let value_chars = value.chars().collect::<Vec<_>>();
3439 let pattern_chars = pattern.chars().collect::<Vec<_>>();
3440 let (mut vi, mut pi) = (0usize, 0usize);
3441 let (mut star_idx, mut match_idx) = (None, 0usize);
3442
3443 while vi < value_chars.len() {
3444 if pi < pattern_chars.len()
3445 && (pattern_chars[pi] == '_' || pattern_chars[pi] == value_chars[vi])
3446 {
3447 vi += 1;
3448 pi += 1;
3449 } else if pi < pattern_chars.len() && pattern_chars[pi] == '%' {
3450 star_idx = Some(pi);
3451 match_idx = vi;
3452 pi += 1;
3453 } else if let Some(star) = star_idx {
3454 pi = star + 1;
3455 match_idx += 1;
3456 vi = match_idx;
3457 } else {
3458 return false;
3459 }
3460 }
3461
3462 while pi < pattern_chars.len() && pattern_chars[pi] == '%' {
3463 pi += 1;
3464 }
3465
3466 pi == pattern_chars.len()
3467}
3468
3469fn resolve_in_subqueries(
3470 db: &Database,
3471 expr: &Expr,
3472 params: &HashMap<String, Value>,
3473 tx: Option<TxId>,
3474) -> Result<Expr> {
3475 resolve_in_subqueries_with_ctes(db, expr, params, tx, &[])
3476}
3477
3478pub(crate) fn resolve_in_subqueries_with_ctes(
3479 db: &Database,
3480 expr: &Expr,
3481 params: &HashMap<String, Value>,
3482 tx: Option<TxId>,
3483 ctes: &[Cte],
3484) -> Result<Expr> {
3485 match expr {
3486 Expr::InSubquery {
3487 expr,
3488 subquery,
3489 negated,
3490 } => {
3491 let mut subquery_tables: std::collections::HashSet<String> = subquery
3493 .from
3494 .iter()
3495 .filter_map(|item| match item {
3496 contextdb_parser::ast::FromItem::Table { name, .. } => Some(name.clone()),
3497 _ => None,
3498 })
3499 .collect();
3500 for cte in ctes {
3502 match cte {
3503 Cte::SqlCte { name, .. } | Cte::MatchCte { name, .. } => {
3504 subquery_tables.insert(name.clone());
3505 }
3506 }
3507 }
3508 if let Some(where_clause) = &subquery.where_clause
3509 && has_outer_table_ref(where_clause, &subquery_tables)
3510 {
3511 return Err(Error::Other(
3512 "correlated subqueries are not supported".to_string(),
3513 ));
3514 }
3515
3516 let query_plan = plan(&Statement::Select(SelectStatement {
3517 ctes: ctes.to_vec(),
3518 body: (**subquery).clone(),
3519 }))?;
3520 let result = execute_plan(db, &query_plan, params, tx)?;
3521 let select_expr = subquery
3522 .columns
3523 .first()
3524 .map(|column| column.expr.clone())
3525 .ok_or_else(|| Error::PlanError("subquery must select one column".to_string()))?;
3526 let list = result
3527 .rows
3528 .iter()
3529 .map(|row| eval_project_expr(&select_expr, row, &result.columns, params))
3530 .collect::<Result<Vec<_>>>()?
3531 .into_iter()
3532 .map(value_to_literal)
3533 .collect::<Result<Vec<_>>>()?;
3534 Ok(Expr::InList {
3535 expr: Box::new(resolve_in_subqueries_with_ctes(db, expr, params, tx, ctes)?),
3536 list,
3537 negated: *negated,
3538 })
3539 }
3540 Expr::BinaryOp { left, op, right } => Ok(Expr::BinaryOp {
3541 left: Box::new(resolve_in_subqueries_with_ctes(db, left, params, tx, ctes)?),
3542 op: *op,
3543 right: Box::new(resolve_in_subqueries_with_ctes(
3544 db, right, params, tx, ctes,
3545 )?),
3546 }),
3547 Expr::UnaryOp { op, operand } => Ok(Expr::UnaryOp {
3548 op: *op,
3549 operand: Box::new(resolve_in_subqueries_with_ctes(
3550 db, operand, params, tx, ctes,
3551 )?),
3552 }),
3553 Expr::InList {
3554 expr,
3555 list,
3556 negated,
3557 } => Ok(Expr::InList {
3558 expr: Box::new(resolve_in_subqueries_with_ctes(db, expr, params, tx, ctes)?),
3559 list: list
3560 .iter()
3561 .map(|item| resolve_in_subqueries_with_ctes(db, item, params, tx, ctes))
3562 .collect::<Result<Vec<_>>>()?,
3563 negated: *negated,
3564 }),
3565 Expr::Like {
3566 expr,
3567 pattern,
3568 negated,
3569 } => Ok(Expr::Like {
3570 expr: Box::new(resolve_in_subqueries_with_ctes(db, expr, params, tx, ctes)?),
3571 pattern: Box::new(resolve_in_subqueries_with_ctes(
3572 db, pattern, params, tx, ctes,
3573 )?),
3574 negated: *negated,
3575 }),
3576 Expr::IsNull { expr, negated } => Ok(Expr::IsNull {
3577 expr: Box::new(resolve_in_subqueries_with_ctes(db, expr, params, tx, ctes)?),
3578 negated: *negated,
3579 }),
3580 Expr::FunctionCall { name, args } => Ok(Expr::FunctionCall {
3581 name: name.clone(),
3582 args: args
3583 .iter()
3584 .map(|arg| resolve_in_subqueries_with_ctes(db, arg, params, tx, ctes))
3585 .collect::<Result<Vec<_>>>()?,
3586 }),
3587 _ => Ok(expr.clone()),
3588 }
3589}
3590
3591fn has_outer_table_ref(expr: &Expr, subquery_tables: &std::collections::HashSet<String>) -> bool {
3592 match expr {
3593 Expr::Column(ColumnRef {
3594 table: Some(table), ..
3595 }) => !subquery_tables.contains(table),
3596 Expr::BinaryOp { left, right, .. } => {
3597 has_outer_table_ref(left, subquery_tables)
3598 || has_outer_table_ref(right, subquery_tables)
3599 }
3600 Expr::UnaryOp { operand, .. } => has_outer_table_ref(operand, subquery_tables),
3601 Expr::InList { expr, list, .. } => {
3602 has_outer_table_ref(expr, subquery_tables)
3603 || list
3604 .iter()
3605 .any(|item| has_outer_table_ref(item, subquery_tables))
3606 }
3607 Expr::IsNull { expr, .. } => has_outer_table_ref(expr, subquery_tables),
3608 Expr::Like { expr, pattern, .. } => {
3609 has_outer_table_ref(expr, subquery_tables)
3610 || has_outer_table_ref(pattern, subquery_tables)
3611 }
3612 Expr::FunctionCall { args, .. } => args
3613 .iter()
3614 .any(|arg| has_outer_table_ref(arg, subquery_tables)),
3615 _ => false,
3616 }
3617}
3618
3619fn value_to_literal(value: Value) -> Result<Expr> {
3620 Ok(Expr::Literal(match value {
3621 Value::Null => Literal::Null,
3622 Value::Bool(v) => Literal::Bool(v),
3623 Value::Int64(v) => Literal::Integer(v),
3624 Value::Float64(v) => Literal::Real(v),
3625 Value::Text(v) => Literal::Text(v),
3626 Value::Uuid(v) => Literal::Text(v.to_string()),
3627 Value::Timestamp(v) => Literal::Integer(v),
3628 other => {
3629 return Err(Error::PlanError(format!(
3630 "unsupported subquery result value: {:?}",
3631 other
3632 )));
3633 }
3634 }))
3635}
3636
3637fn query_result_row_matches(
3638 row: &[Value],
3639 columns: &[String],
3640 expr: &Expr,
3641 params: &HashMap<String, Value>,
3642) -> Result<bool> {
3643 Ok(eval_query_result_bool_expr(row, columns, expr, params)?.unwrap_or(false))
3644}
3645
3646fn eval_query_result_bool_expr(
3647 row: &[Value],
3648 columns: &[String],
3649 expr: &Expr,
3650 params: &HashMap<String, Value>,
3651) -> Result<Option<bool>> {
3652 match expr {
3653 Expr::BinaryOp { left, op, right } => match op {
3654 BinOp::Eq | BinOp::Neq | BinOp::Lt | BinOp::Lte | BinOp::Gt | BinOp::Gte => {
3655 let left = eval_query_result_expr(left, row, columns, params)?;
3656 let right = eval_query_result_expr(right, row, columns, params)?;
3657 if left == Value::Null || right == Value::Null {
3658 return Ok(None);
3659 }
3660
3661 let result = match op {
3662 BinOp::Eq => {
3663 compare_values(&left, &right) == Some(Ordering::Equal) || left == right
3664 }
3665 BinOp::Neq => {
3666 !(compare_values(&left, &right) == Some(Ordering::Equal) || left == right)
3667 }
3668 BinOp::Lt => compare_values(&left, &right) == Some(Ordering::Less),
3669 BinOp::Lte => matches!(
3670 compare_values(&left, &right),
3671 Some(Ordering::Less | Ordering::Equal)
3672 ),
3673 BinOp::Gt => compare_values(&left, &right) == Some(Ordering::Greater),
3674 BinOp::Gte => matches!(
3675 compare_values(&left, &right),
3676 Some(Ordering::Greater | Ordering::Equal)
3677 ),
3678 BinOp::And | BinOp::Or => unreachable!(),
3679 };
3680 Ok(Some(result))
3681 }
3682 BinOp::And => {
3683 let left = eval_query_result_bool_expr(row, columns, left, params)?;
3684 if left == Some(false) {
3685 return Ok(Some(false));
3686 }
3687 let right = eval_query_result_bool_expr(row, columns, right, params)?;
3688 Ok(match (left, right) {
3689 (Some(true), Some(true)) => Some(true),
3690 (Some(true), other) => other,
3691 (None, Some(false)) => Some(false),
3692 (None, Some(true)) | (None, None) => None,
3693 (Some(false), _) => Some(false),
3694 })
3695 }
3696 BinOp::Or => {
3697 let left = eval_query_result_bool_expr(row, columns, left, params)?;
3698 if left == Some(true) {
3699 return Ok(Some(true));
3700 }
3701 let right = eval_query_result_bool_expr(row, columns, right, params)?;
3702 Ok(match (left, right) {
3703 (Some(false), Some(false)) => Some(false),
3704 (Some(false), other) => other,
3705 (None, Some(true)) => Some(true),
3706 (None, Some(false)) | (None, None) => None,
3707 (Some(true), _) => Some(true),
3708 })
3709 }
3710 },
3711 Expr::UnaryOp {
3712 op: UnaryOp::Not,
3713 operand,
3714 } => Ok(eval_query_result_bool_expr(row, columns, operand, params)?.map(|value| !value)),
3715 Expr::InList {
3716 expr,
3717 list,
3718 negated,
3719 } => {
3720 let needle = eval_query_result_expr(expr, row, columns, params)?;
3721 if needle == Value::Null {
3722 return Ok(None);
3723 }
3724
3725 let matched = list.iter().try_fold(false, |found, item| {
3726 if found {
3727 Ok(true)
3728 } else {
3729 let candidate = eval_query_result_expr(item, row, columns, params)?;
3730 Ok(
3731 matches!(compare_values(&needle, &candidate), Some(Ordering::Equal))
3732 || (candidate != Value::Null && needle == candidate),
3733 )
3734 }
3735 })?;
3736 Ok(Some(if *negated { !matched } else { matched }))
3737 }
3738 Expr::InSubquery { .. } => Err(Error::PlanError(
3739 "IN (subquery) must be resolved before execution".to_string(),
3740 )),
3741 Expr::Like {
3742 expr,
3743 pattern,
3744 negated,
3745 } => {
3746 let left = eval_query_result_expr(expr, row, columns, params)?;
3747 let right = eval_query_result_expr(pattern, row, columns, params)?;
3748 let matched = match (left, right) {
3749 (Value::Text(value), Value::Text(pattern)) => like_matches(&value, &pattern),
3750 _ => false,
3751 };
3752 Ok(Some(if *negated { !matched } else { matched }))
3753 }
3754 Expr::IsNull { expr, negated } => {
3755 let is_null = eval_query_result_expr(expr, row, columns, params)? == Value::Null;
3756 Ok(Some(if *negated { !is_null } else { is_null }))
3757 }
3758 Expr::FunctionCall { .. } => match eval_query_result_expr(expr, row, columns, params)? {
3759 Value::Bool(value) => Ok(Some(value)),
3760 Value::Null => Ok(None),
3761 _ => Err(Error::PlanError(format!(
3762 "unsupported WHERE expression: {:?}",
3763 expr
3764 ))),
3765 },
3766 _ => Err(Error::PlanError(format!(
3767 "unsupported WHERE expression: {:?}",
3768 expr
3769 ))),
3770 }
3771}
3772
3773fn lookup_query_result_column(
3774 row: &[Value],
3775 input_columns: &[String],
3776 column_ref: &ColumnRef,
3777) -> Result<Value> {
3778 if let Some(table) = &column_ref.table {
3779 let qualified = format!("{table}.{}", column_ref.column);
3780 let idx = input_columns
3783 .iter()
3784 .position(|name| name == &qualified)
3785 .or_else(|| {
3786 input_columns
3787 .iter()
3788 .position(|name| name == &column_ref.column)
3789 })
3790 .ok_or_else(|| Error::PlanError(format!("project column not found: {}", qualified)))?;
3791 return Ok(row.get(idx).cloned().unwrap_or(Value::Null));
3792 }
3793
3794 let matches = input_columns
3795 .iter()
3796 .enumerate()
3797 .filter_map(|(idx, name)| {
3798 (name == &column_ref.column
3799 || name.rsplit('.').next() == Some(column_ref.column.as_str()))
3800 .then_some(idx)
3801 })
3802 .collect::<Vec<_>>();
3803
3804 match matches.as_slice() {
3805 [] => Err(Error::PlanError(format!(
3806 "project column not found: {}",
3807 column_ref.column
3808 ))),
3809 [idx] => Ok(row.get(*idx).cloned().unwrap_or(Value::Null)),
3810 _ => Err(Error::PlanError(format!(
3811 "ambiguous column reference: {}",
3812 column_ref.column
3813 ))),
3814 }
3815}
3816
3817fn concatenate_rows(left: &[Value], right: &[Value]) -> Vec<Value> {
3818 let mut combined = Vec::with_capacity(left.len() + right.len());
3819 combined.extend_from_slice(left);
3820 combined.extend_from_slice(right);
3821 combined
3822}
3823
3824fn duplicate_column_names(left: &[String], right: &[String]) -> BTreeSet<String> {
3825 let left_names = left
3826 .iter()
3827 .map(|column| column.rsplit('.').next().unwrap_or(column.as_str()))
3828 .collect::<BTreeSet<_>>();
3829 right
3830 .iter()
3831 .filter_map(|column| {
3832 let bare = column.rsplit('.').next().unwrap_or(column.as_str());
3833 left_names.contains(bare).then(|| bare.to_string())
3834 })
3835 .collect()
3836}
3837
3838fn qualify_join_columns(
3839 columns: &[String],
3840 left_columns: &[String],
3841 right_columns: &[String],
3842 left_alias: &Option<String>,
3843 right_prefix: &str,
3844) -> Vec<String> {
3845 let left_prefix = left_alias.as_deref();
3846 columns
3847 .iter()
3848 .enumerate()
3849 .map(|(idx, column)| {
3850 if idx < left_columns.len() {
3851 if let Some(prefix) = left_prefix {
3852 format!(
3853 "{prefix}.{}",
3854 left_columns[idx].rsplit('.').next().unwrap_or(column)
3855 )
3856 } else {
3857 left_columns[idx].clone()
3858 }
3859 } else {
3860 let right_idx = idx - left_columns.len();
3861 let bare = right_columns[right_idx]
3862 .rsplit('.')
3863 .next()
3864 .unwrap_or(right_columns[right_idx].as_str());
3865 if column == bare {
3866 format!("{right_prefix}.{bare}")
3867 } else {
3868 column.clone()
3869 }
3870 }
3871 })
3872 .collect()
3873}
3874
3875fn right_table_name(plan: &PhysicalPlan) -> String {
3876 match plan {
3877 PhysicalPlan::Scan { table, alias, .. } => alias.clone().unwrap_or_else(|| table.clone()),
3878 _ => "right".to_string(),
3879 }
3880}
3881
3882fn distinct_row_key(row: &[Value]) -> Vec<u8> {
3883 bincode::serde::encode_to_vec(row, bincode::config::standard())
3884 .expect("query rows should serialize for DISTINCT")
3885}
3886
3887fn resolve_uuid(expr: &Expr, params: &HashMap<String, Value>) -> Result<uuid::Uuid> {
3888 match resolve_expr(expr, params)? {
3889 Value::Uuid(u) => Ok(u),
3890 Value::Text(t) => uuid::Uuid::parse_str(&t)
3891 .map_err(|e| Error::PlanError(format!("invalid uuid '{}': {}", t, e))),
3892 _ => Err(Error::PlanError(
3893 "graph start node must be UUID".to_string(),
3894 )),
3895 }
3896}
3897
3898fn resolve_graph_start_nodes_from_filter(
3901 db: &Database,
3902 filter: &Expr,
3903 params: &HashMap<String, Value>,
3904) -> Result<Vec<uuid::Uuid>> {
3905 if let Some(ids) = resolve_graph_start_ids_from_filter(filter, params)? {
3906 return Ok(ids);
3907 }
3908
3909 let (col_name, expected_value) = match filter {
3911 Expr::BinaryOp {
3912 left,
3913 op: BinOp::Eq,
3914 right,
3915 } => {
3916 if let Some(col) = extract_column_name(left) {
3917 (col, resolve_expr(right, params)?)
3918 } else if let Some(col) = extract_column_name(right) {
3919 (col, resolve_expr(left, params)?)
3920 } else {
3921 return Ok(vec![]);
3922 }
3923 }
3924 _ => return Ok(vec![]),
3925 };
3926
3927 let snapshot = db.snapshot();
3928 let mut uuids = Vec::new();
3929 for table_name in db.table_names() {
3930 let meta = match db.table_meta(&table_name) {
3931 Some(m) => m,
3932 None => continue,
3933 };
3934 let has_col = meta.columns.iter().any(|c| c.name == col_name);
3936 let has_id = meta.columns.iter().any(|c| c.name == "id");
3937 if !has_col || !has_id {
3938 continue;
3939 }
3940 let rows = db.scan_filter(&table_name, snapshot, &|row| {
3941 row.values.get(&col_name) == Some(&expected_value)
3942 })?;
3943 for row in rows {
3944 if let Some(Value::Uuid(id)) = row.values.get("id") {
3945 uuids.push(*id);
3946 }
3947 }
3948 }
3949 Ok(uuids)
3950}
3951
3952fn resolve_graph_start_nodes_from_plan(
3953 db: &Database,
3954 plan: &PhysicalPlan,
3955 params: &HashMap<String, Value>,
3956 tx: Option<TxId>,
3957) -> Result<Vec<uuid::Uuid>> {
3958 let result = execute_plan(db, plan, params, tx)?;
3959 result
3960 .rows
3961 .into_iter()
3962 .filter_map(|row| row.into_iter().next())
3963 .map(|value| match value {
3964 Value::Uuid(id) => Ok(id),
3965 Value::Text(text) => uuid::Uuid::parse_str(&text)
3966 .map_err(|_| Error::PlanError(format!("invalid UUID in graph start plan: {text}"))),
3967 other => Err(Error::PlanError(format!(
3968 "invalid graph start identifier from plan: {other:?}"
3969 ))),
3970 })
3971 .collect()
3972}
3973
3974fn resolve_graph_start_ids_from_filter(
3975 filter: &Expr,
3976 params: &HashMap<String, Value>,
3977) -> Result<Option<Vec<uuid::Uuid>>> {
3978 match filter {
3979 Expr::BinaryOp {
3980 left,
3981 op: BinOp::Eq,
3982 right,
3983 } if is_graph_id_ref(left) || is_graph_id_ref(right) => {
3984 let value = if is_graph_id_ref(left) {
3985 resolve_expr(right, params)?
3986 } else {
3987 resolve_expr(left, params)?
3988 };
3989 let id = match value {
3990 Value::Uuid(id) => id,
3991 Value::Text(text) => uuid::Uuid::parse_str(&text).map_err(|_| {
3992 Error::PlanError(format!("invalid UUID in graph filter: {text}"))
3993 })?,
3994 other => {
3995 return Err(Error::PlanError(format!(
3996 "invalid graph start identifier in filter: {other:?}"
3997 )));
3998 }
3999 };
4000 Ok(Some(vec![id]))
4001 }
4002 Expr::InList { expr, list, .. } if is_graph_id_ref(expr) => {
4003 let ids = list
4004 .iter()
4005 .map(|item| resolve_expr(item, params))
4006 .map(|value| match value? {
4007 Value::Uuid(id) => Ok(id),
4008 Value::Text(text) => uuid::Uuid::parse_str(&text).map_err(|_| {
4009 Error::PlanError(format!("invalid UUID in graph filter: {text}"))
4010 }),
4011 other => Err(Error::PlanError(format!(
4012 "invalid graph start identifier in filter: {other:?}"
4013 ))),
4014 })
4015 .collect::<Result<Vec<_>>>()?;
4016 Ok(Some(ids))
4017 }
4018 Expr::BinaryOp { left, right, .. } => {
4019 if let Some(ids) = resolve_graph_start_ids_from_filter(left, params)? {
4020 return Ok(Some(ids));
4021 }
4022 resolve_graph_start_ids_from_filter(right, params)
4023 }
4024 Expr::UnaryOp { operand, .. } => resolve_graph_start_ids_from_filter(operand, params),
4025 _ => Ok(None),
4026 }
4027}
4028
4029fn is_graph_id_ref(expr: &Expr) -> bool {
4030 matches!(
4031 expr,
4032 Expr::Column(contextdb_parser::ast::ColumnRef { column, .. }) if column == "id"
4033 )
4034}
4035
4036fn extract_column_name(expr: &Expr) -> Option<String> {
4038 match expr {
4039 Expr::Column(contextdb_parser::ast::ColumnRef { column, .. }) => Some(column.clone()),
4040 _ => None,
4041 }
4042}
4043
4044fn resolve_vector_from_expr(expr: &Expr, params: &HashMap<String, Value>) -> Result<Vec<f32>> {
4045 match resolve_expr(expr, params)? {
4046 Value::Vector(v) => Ok(v),
4047 Value::Text(name) => match params.get(&name) {
4048 Some(Value::Vector(v)) => Ok(v.clone()),
4049 _ => Err(Error::PlanError("vector parameter missing".to_string())),
4050 },
4051 _ => Err(Error::PlanError(
4052 "invalid vector query expression".to_string(),
4053 )),
4054 }
4055}
4056
4057fn validate_vector_columns(
4058 db: &Database,
4059 table: &str,
4060 values: &HashMap<String, Value>,
4061) -> Result<()> {
4062 let Some(meta) = db.table_meta(table) else {
4063 return Ok(());
4064 };
4065
4066 for column in &meta.columns {
4067 if let contextdb_core::ColumnType::Vector(expected) = column.column_type
4068 && let Some(Value::Vector(vector)) = values.get(&column.name)
4069 {
4070 let got = vector.len();
4071 if got != expected {
4072 return Err(Error::VectorDimensionMismatch { expected, got });
4073 }
4074 }
4075 }
4076
4077 Ok(())
4078}
4079
4080fn vector_value_for_table<'a>(
4081 db: &Database,
4082 table: &str,
4083 values: &'a HashMap<String, Value>,
4084) -> Option<&'a Vec<f32>> {
4085 let meta = db.table_meta(table)?;
4086 meta.columns
4087 .iter()
4088 .find_map(|column| match column.column_type {
4089 contextdb_core::ColumnType::Vector(_) => match values.get(&column.name) {
4090 Some(Value::Vector(vector)) => Some(vector),
4091 _ => None,
4092 },
4093 _ => None,
4094 })
4095}
4096
4097pub(crate) fn coerce_into_column(
4098 db: &Database,
4099 table: &str,
4100 col: &str,
4101 v: Value,
4102 current_tx_max: Option<TxId>,
4103 active_tx: Option<TxId>,
4104) -> Result<Value> {
4105 coerce_value_for_column(db, table, col, v, current_tx_max, active_tx)
4106}
4107
4108fn coerce_value_for_column(
4109 db: &Database,
4110 table: &str,
4111 col_name: &str,
4112 v: Value,
4113 current_tx_max: Option<TxId>,
4114 active_tx: Option<TxId>,
4115) -> Result<Value> {
4116 let Some(meta) = db.table_meta(table) else {
4117 if let Value::TxId(_) = &v {
4119 return Err(Error::ColumnTypeMismatch {
4120 table: table.to_string(),
4121 column: col_name.to_string(),
4122 expected: "UNKNOWN",
4123 actual: "TxId",
4124 });
4125 }
4126 return Ok(coerce_uuid_if_needed(col_name, v));
4127 };
4128 coerce_value_for_column_with_meta(table, &meta, col_name, v, current_tx_max, active_tx)
4129}
4130
4131fn coerce_value_for_column_with_meta(
4132 table: &str,
4133 meta: &TableMeta,
4134 col_name: &str,
4135 v: Value,
4136 current_tx_max: Option<TxId>,
4137 active_tx: Option<TxId>,
4138) -> Result<Value> {
4139 let Some(col) = meta.columns.iter().find(|c| c.name == col_name) else {
4140 if let Value::TxId(_) = &v {
4141 return Err(Error::ColumnTypeMismatch {
4142 table: table.to_string(),
4143 column: col_name.to_string(),
4144 expected: "UNKNOWN",
4145 actual: "TxId",
4146 });
4147 }
4148 return Ok(coerce_uuid_if_needed(col_name, v));
4149 };
4150
4151 match col.column_type {
4152 contextdb_core::ColumnType::Uuid => match v {
4153 Value::TxId(_) => Err(Error::ColumnTypeMismatch {
4154 table: table.to_string(),
4155 column: col_name.to_string(),
4156 expected: "UUID",
4157 actual: "TxId",
4158 }),
4159 other => coerce_uuid_value(other),
4160 },
4161 contextdb_core::ColumnType::Timestamp => match v {
4162 Value::TxId(_) => Err(Error::ColumnTypeMismatch {
4163 table: table.to_string(),
4164 column: col_name.to_string(),
4165 expected: "TIMESTAMP",
4166 actual: "TxId",
4167 }),
4168 other => coerce_timestamp_value(other),
4169 },
4170 contextdb_core::ColumnType::Vector(dim) => match v {
4171 Value::TxId(_) => Err(Error::ColumnTypeMismatch {
4172 table: table.to_string(),
4173 column: col_name.to_string(),
4174 expected: format_vector_type(dim),
4175 actual: "TxId",
4176 }),
4177 other => coerce_vector_value(other, dim),
4178 },
4179 contextdb_core::ColumnType::Integer => match v {
4180 Value::TxId(_) => Err(Error::ColumnTypeMismatch {
4181 table: table.to_string(),
4182 column: col_name.to_string(),
4183 expected: "INTEGER",
4184 actual: "TxId",
4185 }),
4186 other => Ok(coerce_uuid_if_needed(col_name, other)),
4187 },
4188 contextdb_core::ColumnType::Real => match v {
4189 Value::TxId(_) => Err(Error::ColumnTypeMismatch {
4190 table: table.to_string(),
4191 column: col_name.to_string(),
4192 expected: "REAL",
4193 actual: "TxId",
4194 }),
4195 other => Ok(coerce_uuid_if_needed(col_name, other)),
4196 },
4197 contextdb_core::ColumnType::Text => match v {
4198 Value::TxId(_) => Err(Error::ColumnTypeMismatch {
4199 table: table.to_string(),
4200 column: col_name.to_string(),
4201 expected: "TEXT",
4202 actual: "TxId",
4203 }),
4204 other => Ok(coerce_uuid_if_needed(col_name, other)),
4205 },
4206 contextdb_core::ColumnType::Boolean => match v {
4207 Value::TxId(_) => Err(Error::ColumnTypeMismatch {
4208 table: table.to_string(),
4209 column: col_name.to_string(),
4210 expected: "BOOLEAN",
4211 actual: "TxId",
4212 }),
4213 other => Ok(coerce_uuid_if_needed(col_name, other)),
4214 },
4215 contextdb_core::ColumnType::Json => match v {
4216 Value::TxId(_) => Err(Error::ColumnTypeMismatch {
4217 table: table.to_string(),
4218 column: col_name.to_string(),
4219 expected: "JSON",
4220 actual: "TxId",
4221 }),
4222 other => Ok(coerce_uuid_if_needed(col_name, other)),
4223 },
4224 contextdb_core::ColumnType::TxId => {
4225 coerce_txid_value(table, col_name, v, col.nullable, current_tx_max, active_tx)
4226 }
4227 }
4228}
4229
4230fn format_vector_type(dim: usize) -> &'static str {
4231 match dim {
4233 1 => "VECTOR(1)",
4234 2 => "VECTOR(2)",
4235 3 => "VECTOR(3)",
4236 4 => "VECTOR(4)",
4237 8 => "VECTOR(8)",
4238 16 => "VECTOR(16)",
4239 32 => "VECTOR(32)",
4240 64 => "VECTOR(64)",
4241 128 => "VECTOR(128)",
4242 256 => "VECTOR(256)",
4243 512 => "VECTOR(512)",
4244 768 => "VECTOR(768)",
4245 1024 => "VECTOR(1024)",
4246 1536 => "VECTOR(1536)",
4247 3072 => "VECTOR(3072)",
4248 _ => "VECTOR",
4249 }
4250}
4251
4252fn coerce_txid_value(
4253 table: &str,
4254 col: &str,
4255 v: Value,
4256 nullable: bool,
4257 current_tx_max: Option<TxId>,
4258 active_tx: Option<TxId>,
4259) -> Result<Value> {
4260 match v {
4261 Value::Null => {
4262 if nullable {
4263 Ok(Value::Null)
4264 } else {
4265 Err(Error::ColumnNotNullable {
4266 table: table.to_string(),
4267 column: col.to_string(),
4268 })
4269 }
4270 }
4271 Value::TxId(tx) => {
4272 if let Some(max) = current_tx_max {
4281 let ceiling = max.0.max(active_tx.map(|t| t.0).unwrap_or(0));
4282 if tx.0 > ceiling {
4283 return Err(Error::TxIdOutOfRange {
4284 table: table.to_string(),
4285 column: col.to_string(),
4286 value: tx.0,
4287 max: max.0,
4288 });
4289 }
4290 }
4291 Ok(Value::TxId(tx))
4292 }
4293 other => Err(Error::ColumnTypeMismatch {
4294 table: table.to_string(),
4295 column: col.to_string(),
4296 expected: "TXID",
4297 actual: value_variant_name(&other),
4298 }),
4299 }
4300}
4301
4302fn value_variant_name(v: &Value) -> &'static str {
4303 match v {
4304 Value::Null => "Null",
4305 Value::Bool(_) => "Bool",
4306 Value::Int64(_) => "Int64",
4307 Value::Float64(_) => "Float64",
4308 Value::Text(_) => "Text",
4309 Value::Uuid(_) => "Uuid",
4310 Value::Timestamp(_) => "Timestamp",
4311 Value::Json(_) => "Json",
4312 Value::Vector(_) => "Vector",
4313 Value::TxId(_) => "TxId",
4314 }
4315}
4316
4317fn coerce_uuid_value(v: Value) -> Result<Value> {
4318 match v {
4319 Value::Null => Ok(Value::Null),
4320 Value::Uuid(id) => Ok(Value::Uuid(id)),
4321 Value::Text(text) => uuid::Uuid::parse_str(&text)
4322 .map(Value::Uuid)
4323 .map_err(|err| Error::Other(format!("invalid UUID literal '{text}': {err}"))),
4324 other => Err(Error::Other(format!(
4325 "UUID column requires UUID or text literal, got {other:?}"
4326 ))),
4327 }
4328}
4329
4330fn coerce_uuid_if_needed(col: &str, v: Value) -> Value {
4331 if (col == "id" || col.ends_with("_id"))
4332 && let Value::Text(s) = &v
4333 && let Ok(u) = uuid::Uuid::parse_str(s)
4334 {
4335 return Value::Uuid(u);
4336 }
4337 v
4338}
4339
4340fn coerce_timestamp_value(v: Value) -> Result<Value> {
4341 match v {
4342 Value::Null => Ok(Value::Null),
4343 Value::Text(text) if text.eq_ignore_ascii_case("infinity") => {
4344 Ok(Value::Timestamp(i64::MAX))
4345 }
4346 Value::Text(text) => {
4347 let parsed = OffsetDateTime::parse(&text, &Rfc3339).map_err(|err| {
4348 Error::Other(format!("invalid TIMESTAMP literal '{text}': {err}"))
4349 })?;
4350 Ok(Value::Timestamp(
4351 parsed.unix_timestamp_nanos() as i64 / 1_000_000,
4352 ))
4353 }
4354 other => Ok(other),
4355 }
4356}
4357
4358fn coerce_vector_value(v: Value, expected_dim: usize) -> Result<Value> {
4359 let vector = match v {
4360 Value::Null => return Ok(Value::Null),
4361 Value::Vector(vector) => vector,
4362 Value::Text(text) => parse_text_vector_literal(&text)?,
4363 other => return Ok(other),
4364 };
4365
4366 if vector.len() != expected_dim {
4367 return Err(Error::VectorDimensionMismatch {
4368 expected: expected_dim,
4369 got: vector.len(),
4370 });
4371 }
4372
4373 Ok(Value::Vector(vector))
4374}
4375
4376fn parse_text_vector_literal(text: &str) -> Result<Vec<f32>> {
4377 let trimmed = text.trim();
4378 let inner = trimmed
4379 .strip_prefix('[')
4380 .and_then(|s| s.strip_suffix(']'))
4381 .ok_or_else(|| Error::Other(format!("invalid VECTOR literal '{text}'")))?;
4382
4383 if inner.trim().is_empty() {
4384 return Ok(Vec::new());
4385 }
4386
4387 inner
4388 .split(',')
4389 .map(|part| {
4390 part.trim().parse::<f32>().map_err(|err| {
4391 Error::Other(format!("invalid VECTOR component '{}': {err}", part.trim()))
4392 })
4393 })
4394 .collect()
4395}
4396
4397fn apply_missing_column_defaults(
4398 db: &Database,
4399 table: &str,
4400 values: &mut HashMap<String, Value>,
4401 active_tx: Option<TxId>,
4402) -> Result<()> {
4403 let Some(meta) = db.table_meta(table) else {
4404 return Ok(());
4405 };
4406
4407 let current_tx_max = Some(db.committed_watermark());
4408
4409 for column in &meta.columns {
4410 if values.contains_key(&column.name) {
4411 continue;
4412 }
4413 let Some(default) = &column.default else {
4414 continue;
4415 };
4416 let value = evaluate_stored_default_expr(default)?;
4417 values.insert(
4418 column.name.clone(),
4419 coerce_value_for_column(db, table, &column.name, value, current_tx_max, active_tx)?,
4420 );
4421 }
4422
4423 Ok(())
4424}
4425
4426fn evaluate_stored_default_expr(default: &str) -> Result<Value> {
4427 if default.eq_ignore_ascii_case("NOW()") {
4428 return eval_function("now", &[]);
4429 }
4430 if default.contains("FunctionCall") && default.contains("name: \"NOW\"") {
4431 return eval_function("now", &[]);
4432 }
4433 if default == "Literal(Null)" || default.eq_ignore_ascii_case("NULL") {
4434 return Ok(Value::Null);
4435 }
4436 if default.eq_ignore_ascii_case("TRUE") {
4437 return Ok(Value::Bool(true));
4438 }
4439 if default.eq_ignore_ascii_case("FALSE") {
4440 return Ok(Value::Bool(false));
4441 }
4442 if default.starts_with('\'') && default.ends_with('\'') && default.len() >= 2 {
4443 return Ok(Value::Text(
4444 default[1..default.len() - 1].replace("''", "'"),
4445 ));
4446 }
4447 if let Some(text) = default
4448 .strip_prefix("Literal(Text(\"")
4449 .and_then(|value| value.strip_suffix("\"))"))
4450 {
4451 return Ok(Value::Text(text.to_string()));
4452 }
4453 if let Some(value) = default
4454 .strip_prefix("Literal(Integer(")
4455 .and_then(|value| value.strip_suffix("))"))
4456 {
4457 let parsed = value.parse::<i64>().map_err(|err| {
4458 Error::Other(format!("invalid stored integer default '{value}': {err}"))
4459 })?;
4460 return Ok(Value::Int64(parsed));
4461 }
4462 if let Some(value) = default
4463 .strip_prefix("Literal(Real(")
4464 .and_then(|value| value.strip_suffix("))"))
4465 {
4466 let parsed = value
4467 .parse::<f64>()
4468 .map_err(|err| Error::Other(format!("invalid stored real default '{value}': {err}")))?;
4469 return Ok(Value::Float64(parsed));
4470 }
4471 if let Some(value) = default
4472 .strip_prefix("Literal(Bool(")
4473 .and_then(|value| value.strip_suffix("))"))
4474 {
4475 let parsed = value
4476 .parse::<bool>()
4477 .map_err(|err| Error::Other(format!("invalid stored bool default '{value}': {err}")))?;
4478 return Ok(Value::Bool(parsed));
4479 }
4480
4481 Err(Error::Other(format!(
4482 "unsupported stored DEFAULT expression: {default}"
4483 )))
4484}
4485
4486pub(crate) fn stored_default_expr(expr: &Expr) -> String {
4487 match expr {
4488 Expr::Literal(Literal::Null) => "NULL".to_string(),
4489 Expr::Literal(Literal::Bool(value)) => {
4490 if *value {
4491 "TRUE".to_string()
4492 } else {
4493 "FALSE".to_string()
4494 }
4495 }
4496 Expr::Literal(Literal::Integer(value)) => value.to_string(),
4497 Expr::Literal(Literal::Real(value)) => value.to_string(),
4498 Expr::Literal(Literal::Text(value)) => format!("'{}'", value.replace('\'', "''")),
4499 Expr::FunctionCall { name, args }
4500 if name.eq_ignore_ascii_case("NOW") && args.is_empty() =>
4501 {
4502 "NOW()".to_string()
4503 }
4504 _ => format!("{expr:?}"),
4505 }
4506}
4507
4508fn validate_expires_column(col: &contextdb_parser::ast::ColumnDef) -> Result<()> {
4509 if col.expires && !matches!(col.data_type, DataType::Timestamp) {
4510 return Err(Error::Other(
4511 "EXPIRES is only valid on TIMESTAMP columns".to_string(),
4512 ));
4513 }
4514 Ok(())
4515}
4516
4517fn expires_column_name(columns: &[contextdb_parser::ast::ColumnDef]) -> Result<Option<String>> {
4518 let mut expires_column = None;
4519 for col in columns {
4520 validate_expires_column(col)?;
4521 if col.expires {
4522 if expires_column.is_some() {
4523 return Err(Error::Other(
4524 "only one EXPIRES column is supported per table".to_string(),
4525 ));
4526 }
4527 expires_column = Some(col.name.clone());
4528 }
4529 }
4530 Ok(expires_column)
4531}
4532
4533pub(crate) fn map_column_type(dtype: &DataType) -> contextdb_core::ColumnType {
4534 match dtype {
4535 DataType::Uuid => contextdb_core::ColumnType::Uuid,
4536 DataType::Text => contextdb_core::ColumnType::Text,
4537 DataType::Integer => contextdb_core::ColumnType::Integer,
4538 DataType::Real => contextdb_core::ColumnType::Real,
4539 DataType::Boolean => contextdb_core::ColumnType::Boolean,
4540 DataType::Timestamp => contextdb_core::ColumnType::Timestamp,
4541 DataType::Json => contextdb_core::ColumnType::Json,
4542 DataType::Vector(dim) => contextdb_core::ColumnType::Vector(*dim as usize),
4543 DataType::TxId => contextdb_core::ColumnType::TxId,
4544 }
4545}
4546
4547fn parse_conflict_policy(s: &str) -> Result<ConflictPolicy> {
4548 match s {
4549 "latest_wins" => Ok(ConflictPolicy::LatestWins),
4550 "server_wins" => Ok(ConflictPolicy::ServerWins),
4551 "edge_wins" => Ok(ConflictPolicy::EdgeWins),
4552 "insert_if_not_exists" => Ok(ConflictPolicy::InsertIfNotExists),
4553 _ => Err(Error::Other(format!("unknown conflict policy: {s}"))),
4554 }
4555}
4556
4557fn conflict_policy_to_string(p: ConflictPolicy) -> String {
4558 match p {
4559 ConflictPolicy::LatestWins => "latest_wins".to_string(),
4560 ConflictPolicy::ServerWins => "server_wins".to_string(),
4561 ConflictPolicy::EdgeWins => "edge_wins".to_string(),
4562 ConflictPolicy::InsertIfNotExists => "insert_if_not_exists".to_string(),
4563 }
4564}
4565
4566fn project_graph_frontier_rows(
4567 frontier: Vec<(HashMap<String, uuid::Uuid>, uuid::Uuid, u32)>,
4568 start_alias: &str,
4569 steps: &[GraphStepPlan],
4570) -> Result<Vec<Vec<Value>>> {
4571 frontier
4572 .into_iter()
4573 .map(|(bindings, id, depth)| {
4574 let mut row = Vec::with_capacity(steps.len() + 3);
4575 let start_id = bindings.get(start_alias).ok_or_else(|| {
4576 Error::PlanError(format!(
4577 "graph frontier missing required start alias binding '{start_alias}'"
4578 ))
4579 })?;
4580 row.push(Value::Uuid(*start_id));
4581 for step in steps {
4582 let target_id = bindings.get(&step.target_alias).ok_or_else(|| {
4583 Error::PlanError(format!(
4584 "graph frontier missing required target alias binding '{}'",
4585 step.target_alias
4586 ))
4587 })?;
4588 row.push(Value::Uuid(*target_id));
4589 }
4590 row.push(Value::Uuid(id));
4591 row.push(Value::Int64(depth as i64));
4592 Ok(row)
4593 })
4594 .collect()
4595}
4596
4597#[cfg(test)]
4598mod tests {
4599 use super::*;
4600 use contextdb_planner::GraphStepPlan;
4601 use uuid::Uuid;
4602
4603 #[test]
4604 fn graph_01_frontier_projection_requires_complete_bindings() {
4605 let steps = vec![GraphStepPlan {
4606 edge_types: vec!["EDGE".to_string()],
4607 direction: Direction::Outgoing,
4608 min_depth: 1,
4609 max_depth: 1,
4610 target_alias: "b".to_string(),
4611 }];
4612
4613 let missing_start = vec![(HashMap::new(), Uuid::new_v4(), 0)];
4614 let missing_target = vec![(
4615 HashMap::from([("a".to_string(), Uuid::new_v4())]),
4616 Uuid::new_v4(),
4617 0,
4618 )];
4619
4620 let start_result = project_graph_frontier_rows(missing_start, "a", &steps);
4621 assert!(
4622 matches!(start_result, Err(Error::PlanError(_))),
4623 "graph frontier projection should return a plan error on missing start alias binding, got {start_result:?}"
4624 );
4625
4626 let target_result = project_graph_frontier_rows(missing_target, "a", &steps);
4627 assert!(
4628 matches!(target_result, Err(Error::PlanError(_))),
4629 "graph frontier projection should return a plan error on missing target alias binding, got {target_result:?}"
4630 );
4631 }
4632}