1use crate::database::{Database, QueryResult};
2use crate::sync_types::ConflictPolicy;
3use contextdb_core::*;
4use contextdb_parser::ast::{
5 AlterAction, BinOp, ColumnRef, Cte, DataType, Expr, Literal, SelectStatement,
6 SetDiskLimitValue, SetMemoryLimitValue, SortDirection, Statement, UnaryOp,
7};
8use contextdb_planner::{
9 DeletePlan, GraphStepPlan, InsertPlan, OnConflictPlan, PhysicalPlan, UpdatePlan, plan,
10};
11use roaring::RoaringTreemap;
12use std::cmp::Ordering;
13use std::collections::{BTreeSet, HashMap, HashSet};
14use std::time::{SystemTime, UNIX_EPOCH};
15use time::OffsetDateTime;
16use time::format_description::well_known::Rfc3339;
17
18pub(crate) fn execute_plan(
19 db: &Database,
20 plan: &PhysicalPlan,
21 params: &HashMap<String, Value>,
22 tx: Option<TxId>,
23) -> Result<QueryResult> {
24 match plan {
25 PhysicalPlan::CreateTable(p) => {
26 db.check_disk_budget("CREATE TABLE")?;
27 let expires_column = expires_column_name(&p.columns)?;
28 let meta = TableMeta {
29 columns: p
30 .columns
31 .iter()
32 .map(|c| contextdb_core::ColumnDef {
33 name: c.name.clone(),
34 column_type: map_column_type(&c.data_type),
35 nullable: c.nullable,
36 primary_key: c.primary_key,
37 unique: c.unique,
38 default: c.default.as_ref().map(stored_default_expr),
39 references: c.references.as_ref().map(|reference| {
40 contextdb_core::ForeignKeyReference {
41 table: reference.table.clone(),
42 column: reference.column.clone(),
43 }
44 }),
45 expires: c.expires,
46 })
47 .collect(),
48 immutable: p.immutable,
49 state_machine: p.state_machine.as_ref().map(|sm| StateMachineConstraint {
50 column: sm.column.clone(),
51 transitions: sm
52 .transitions
53 .iter()
54 .map(|(from, tos)| (from.clone(), tos.clone()))
55 .collect(),
56 }),
57 dag_edge_types: p.dag_edge_types.clone(),
58 unique_constraints: p.unique_constraints.clone(),
59 natural_key_column: None,
60 propagation_rules: p.propagation_rules.clone(),
61 default_ttl_seconds: p.retain.as_ref().map(|r| r.duration_seconds),
62 sync_safe: p.retain.as_ref().is_some_and(|r| r.sync_safe),
63 expires_column,
64 };
65 let metadata_bytes = meta.estimated_bytes();
66 db.accountant().try_allocate_for(
67 metadata_bytes,
68 "ddl",
69 "create_table",
70 "Reduce schema size or raise MEMORY_LIMIT before creating more tables.",
71 )?;
72 db.relational_store().create_table(&p.name, meta);
73 if let Some(table_meta) = db.table_meta(&p.name) {
74 db.persist_table_meta(&p.name, &table_meta)?;
75 db.allocate_ddl_lsn(|lsn| db.log_create_table_ddl(&p.name, &table_meta, lsn));
76 }
77 Ok(QueryResult::empty_with_affected(0))
78 }
79 PhysicalPlan::DropTable(name) => {
80 let bytes_to_release = estimate_drop_table_bytes(db, name);
81 db.drop_table_aux_state(name);
82 db.relational_store().drop_table(name);
83 db.remove_persisted_table(name)?;
84 db.allocate_ddl_lsn(|lsn| db.log_drop_table_ddl(name, lsn));
85 db.accountant().release(bytes_to_release);
86 Ok(QueryResult::empty_with_affected(0))
87 }
88 PhysicalPlan::AlterTable(p) => {
89 db.check_disk_budget("ALTER TABLE")?;
90 let store = db.relational_store();
91 match &p.action {
92 AlterAction::AddColumn(col) => {
93 if col.primary_key {
94 return Err(Error::Other(
95 "adding a primary key column via ALTER TABLE is not supported"
96 .to_string(),
97 ));
98 }
99 validate_expires_column(col)?;
100 let core_col = contextdb_core::ColumnDef {
101 name: col.name.clone(),
102 column_type: map_column_type(&col.data_type),
103 nullable: col.nullable,
104 primary_key: col.primary_key,
105 unique: col.unique,
106 default: col.default.as_ref().map(stored_default_expr),
107 references: col.references.as_ref().map(|reference| {
108 contextdb_core::ForeignKeyReference {
109 table: reference.table.clone(),
110 column: reference.column.clone(),
111 }
112 }),
113 expires: col.expires,
114 };
115 store
116 .alter_table_add_column(&p.table, core_col)
117 .map_err(Error::Other)?;
118 if col.expires {
119 let mut meta = store.table_meta.write();
120 let table_meta = meta.get_mut(&p.table).ok_or_else(|| {
121 Error::Other(format!("table '{}' not found", p.table))
122 })?;
123 table_meta.expires_column = Some(col.name.clone());
124 }
125 }
126 AlterAction::DropColumn(name) => {
127 store
128 .alter_table_drop_column(&p.table, name)
129 .map_err(Error::Other)?;
130 let mut meta = store.table_meta.write();
131 if let Some(table_meta) = meta.get_mut(&p.table)
132 && table_meta.expires_column.as_deref() == Some(name.as_str())
133 {
134 table_meta.expires_column = None;
135 }
136 }
137 AlterAction::RenameColumn { from, to } => {
138 store
139 .alter_table_rename_column(&p.table, from, to)
140 .map_err(Error::Other)?;
141 let mut meta = store.table_meta.write();
142 if let Some(table_meta) = meta.get_mut(&p.table)
143 && table_meta.expires_column.as_deref() == Some(from.as_str())
144 {
145 table_meta.expires_column = Some(to.clone());
146 }
147 }
148 AlterAction::SetRetain {
149 duration_seconds,
150 sync_safe,
151 } => {
152 let mut meta = store.table_meta.write();
153 let table_meta = meta
154 .get_mut(&p.table)
155 .ok_or_else(|| Error::Other(format!("table '{}' not found", p.table)))?;
156 if table_meta.immutable {
157 return Err(Error::Other(
158 "IMMUTABLE and RETAIN are mutually exclusive".to_string(),
159 ));
160 }
161 table_meta.default_ttl_seconds = Some(*duration_seconds);
162 table_meta.sync_safe = *sync_safe;
163 }
164 AlterAction::DropRetain => {
165 let mut meta = store.table_meta.write();
166 let table_meta = meta
167 .get_mut(&p.table)
168 .ok_or_else(|| Error::Other(format!("table '{}' not found", p.table)))?;
169 table_meta.default_ttl_seconds = None;
170 table_meta.sync_safe = false;
171 }
172 AlterAction::SetSyncConflictPolicy(policy) => {
173 let cp = parse_conflict_policy(policy)?;
174 db.set_table_conflict_policy(&p.table, cp);
175 }
176 AlterAction::DropSyncConflictPolicy => {
177 db.drop_table_conflict_policy(&p.table);
178 }
179 }
180 if let Some(table_meta) = db.table_meta(&p.table) {
181 db.persist_table_meta(&p.table, &table_meta)?;
182 if !matches!(
183 p.action,
184 AlterAction::AddColumn(_)
185 | AlterAction::SetRetain { .. }
186 | AlterAction::DropRetain
187 | AlterAction::SetSyncConflictPolicy(_)
188 | AlterAction::DropSyncConflictPolicy
189 ) {
190 db.persist_table_rows(&p.table)?;
191 }
192 db.allocate_ddl_lsn(|lsn| db.log_alter_table_ddl(&p.table, &table_meta, lsn));
193 }
194 Ok(QueryResult::empty_with_affected(0))
195 }
196 PhysicalPlan::Insert(p) => exec_insert(db, p, params, tx),
197 PhysicalPlan::Delete(p) => exec_delete(db, p, params, tx),
198 PhysicalPlan::Update(p) => exec_update(db, p, params, tx),
199 PhysicalPlan::Scan { table, filter, .. } => {
200 if table == "dual" {
201 return Ok(QueryResult {
202 columns: vec![],
203 rows: vec![vec![]],
204 rows_affected: 0,
205 });
206 }
207 let snapshot = db.snapshot();
208 let rows = db.scan(table, snapshot)?;
209 let schema_columns = db.table_meta(table).map(|meta| {
210 meta.columns
211 .into_iter()
212 .map(|column| column.name)
213 .collect::<Vec<_>>()
214 });
215 let resolved_filter = filter
216 .as_ref()
217 .map(|expr| resolve_in_subqueries(db, expr, params, tx))
218 .transpose()?;
219 materialize_rows(
220 rows,
221 resolved_filter.as_ref(),
222 params,
223 schema_columns.as_deref(),
224 )
225 }
226 PhysicalPlan::GraphBfs {
227 start_alias,
228 start_expr,
229 start_candidates,
230 steps,
231 filter,
232 } => {
233 let start_uuids = match resolve_uuid(start_expr, params) {
234 Ok(start) => vec![start],
235 Err(Error::PlanError(_))
236 if matches!(
237 start_expr,
238 Expr::Column(contextdb_parser::ast::ColumnRef { table: None, .. })
239 ) =>
240 {
241 if let Some(candidate_plan) = start_candidates {
243 resolve_graph_start_nodes_from_plan(db, candidate_plan, params, tx)?
244 } else if let Some(filter_expr) = filter {
245 let resolved_filter = resolve_in_subqueries(db, filter_expr, params, tx)?;
246 resolve_graph_start_nodes_from_filter(db, &resolved_filter, params)?
247 } else {
248 vec![]
249 }
250 }
251 Err(err) => return Err(err),
252 };
253 if start_uuids.is_empty() {
254 return Ok(QueryResult {
255 columns: vec!["id".to_string(), "depth".to_string()],
256 rows: vec![],
257 rows_affected: 0,
258 });
259 }
260 let snapshot = db.snapshot();
261 let mut frontier = start_uuids
262 .into_iter()
263 .map(|id| (HashMap::from([(start_alias.clone(), id)]), id, 0_u32))
264 .collect::<Vec<_>>();
265 let bfs_bytes = estimate_bfs_working_bytes(&frontier, steps);
266 db.accountant().try_allocate_for(
267 bfs_bytes,
268 "bfs_frontier",
269 "graph_bfs",
270 "Reduce traversal depth/fan-out or raise MEMORY_LIMIT before running BFS.",
271 )?;
272
273 let result = (|| {
274 for step in steps {
275 let edge_types_ref = if step.edge_types.is_empty() {
276 None
277 } else {
278 Some(step.edge_types.as_slice())
279 };
280 let mut next = Vec::new();
281
282 for (bindings, start, base_depth) in &frontier {
283 let res = db.graph().bfs(
284 *start,
285 edge_types_ref,
286 step.direction,
287 step.min_depth,
288 step.max_depth,
289 snapshot,
290 )?;
291 for node in res.nodes {
292 let total_depth = base_depth.saturating_add(node.depth);
293 let mut next_bindings = bindings.clone();
294 next_bindings.insert(step.target_alias.clone(), node.id);
295 next.push((next_bindings, node.id, total_depth));
296 }
297 }
298
299 frontier = dedupe_graph_frontier(next, steps);
300 if frontier.is_empty() {
301 break;
302 }
303 }
304
305 let mut columns =
306 steps
307 .iter()
308 .fold(vec![format!("{start_alias}.id")], |mut cols, step| {
309 cols.push(format!("{}.id", step.target_alias));
310 cols
311 });
312 columns.push("id".to_string());
313 columns.push("depth".to_string());
314
315 Ok(QueryResult {
316 columns,
317 rows: project_graph_frontier_rows(frontier, start_alias, steps)?,
318 rows_affected: 0,
319 })
320 })();
321 db.accountant().release(bfs_bytes);
322
323 result
324 }
325 PhysicalPlan::VectorSearch {
326 table,
327 query_expr,
328 k,
329 candidates,
330 ..
331 }
332 | PhysicalPlan::HnswSearch {
333 table,
334 query_expr,
335 k,
336 candidates,
337 ..
338 } => {
339 let query_vec = resolve_vector_from_expr(query_expr, params)?;
340 let snapshot = db.snapshot();
341 let all_rows = db.scan(table, snapshot)?;
342 let candidate_bitmap = if let Some(cands_plan) = candidates {
343 let qr = execute_plan(db, cands_plan, params, tx)?;
344 let mut bm = RoaringTreemap::new();
345 let row_id_idx = qr.columns.iter().position(|column| {
346 column == "row_id" || column.rsplit('.').next() == Some("row_id")
347 });
348 let id_idx = qr
349 .columns
350 .iter()
351 .position(|column| column == "id" || column.rsplit('.').next() == Some("id"));
352
353 if let Some(idx) = row_id_idx {
354 for row in qr.rows {
355 if let Some(Value::Int64(id)) = row.get(idx) {
356 bm.insert(*id as u64);
357 }
358 }
359 } else if let Some(idx) = id_idx {
360 let uuid_to_row_id: HashMap<uuid::Uuid, u64> = all_rows
361 .iter()
362 .filter_map(|row| match row.values.get("id") {
363 Some(Value::Uuid(uuid)) => Some((*uuid, row.row_id)),
364 _ => None,
365 })
366 .collect();
367 for row in qr.rows {
368 if let Some(Value::Uuid(uuid)) = row.get(idx)
369 && let Some(row_id) = uuid_to_row_id.get(uuid)
370 {
371 bm.insert(*row_id);
372 }
373 }
374 }
375 Some(bm)
376 } else {
377 None
378 };
379
380 let vector_bytes = estimate_vector_search_bytes(query_vec.len(), *k as usize);
381 db.accountant().try_allocate_for(
382 vector_bytes,
383 "vector_search",
384 "search",
385 "Reduce LIMIT/dimensionality or raise MEMORY_LIMIT before vector search.",
386 )?;
387 let res = db.query_vector(
388 &query_vec,
389 *k as usize,
390 candidate_bitmap.as_ref(),
391 db.snapshot(),
392 );
393 db.accountant().release(vector_bytes);
394 let res = res?;
395
396 let schema_columns = db.table_meta(table).map(|meta| {
398 meta.columns
399 .into_iter()
400 .map(|column| column.name)
401 .collect::<Vec<_>>()
402 });
403 let keys = if let Some(ref sc) = schema_columns {
404 sc.clone()
405 } else {
406 let mut ks = BTreeSet::new();
407 for r in &all_rows {
408 for k in r.values.keys() {
409 ks.insert(k.clone());
410 }
411 }
412 ks.into_iter().collect::<Vec<_>>()
413 };
414
415 let row_map: HashMap<u64, &VersionedRow> =
416 all_rows.iter().map(|r| (r.row_id, r)).collect();
417
418 let mut columns = vec!["row_id".to_string()];
419 columns.extend(keys.iter().cloned());
420 columns.push("score".to_string());
421
422 let rows = res
423 .into_iter()
424 .filter_map(|(rid, score)| {
425 row_map.get(&rid).map(|row| {
426 let mut out = vec![Value::Int64(rid as i64)];
427 for k in &keys {
428 out.push(row.values.get(k).cloned().unwrap_or(Value::Null));
429 }
430 out.push(Value::Float64(score as f64));
431 out
432 })
433 })
434 .collect();
435
436 Ok(QueryResult {
437 columns,
438 rows,
439 rows_affected: 0,
440 })
441 }
442 PhysicalPlan::MaterializeCte { input, .. } => execute_plan(db, input, params, tx),
443 PhysicalPlan::Project { input, columns } => {
444 let input_result = execute_plan(db, input, params, tx)?;
445 let has_aggregate = columns.iter().any(|column| {
446 matches!(
447 &column.expr,
448 Expr::FunctionCall { name, .. } if name.eq_ignore_ascii_case("count")
449 )
450 });
451 if has_aggregate {
452 if columns.iter().any(|column| {
453 !matches!(
454 &column.expr,
455 Expr::FunctionCall { name, .. } if name.eq_ignore_ascii_case("count")
456 )
457 }) {
458 return Err(Error::PlanError(
459 "mixed aggregate and non-aggregate columns without GROUP BY".to_string(),
460 ));
461 }
462
463 let output_columns = columns
464 .iter()
465 .map(|column| {
466 column.alias.clone().unwrap_or_else(|| match &column.expr {
467 Expr::FunctionCall { name, .. } => name.clone(),
468 _ => "expr".to_string(),
469 })
470 })
471 .collect::<Vec<_>>();
472
473 let aggregate_row = columns
474 .iter()
475 .map(|column| match &column.expr {
476 Expr::FunctionCall { name: _, args } => {
477 let count = if matches!(
478 args.as_slice(),
479 [Expr::Column(contextdb_parser::ast::ColumnRef { table: None, column })]
480 if column == "*"
481 ) {
482 input_result.rows.len() as i64
483 } else {
484 input_result
485 .rows
486 .iter()
487 .filter_map(|row| {
488 args.first().map(|arg| {
489 eval_query_result_expr(
490 arg,
491 row,
492 &input_result.columns,
493 params,
494 )
495 })
496 })
497 .collect::<Result<Vec<_>>>()?
498 .into_iter()
499 .filter(|value| *value != Value::Null)
500 .count() as i64
501 };
502 Ok(Value::Int64(count))
503 }
504 _ => Err(Error::PlanError(
505 "mixed aggregate and non-aggregate columns without GROUP BY"
506 .to_string(),
507 )),
508 })
509 .collect::<Result<Vec<_>>>()?;
510
511 return Ok(QueryResult {
512 columns: output_columns,
513 rows: vec![aggregate_row],
514 rows_affected: 0,
515 });
516 }
517
518 let output_columns = columns
519 .iter()
520 .map(|c| {
521 c.alias.clone().unwrap_or_else(|| match &c.expr {
522 Expr::Column(col) => col.column.clone(),
523 _ => "expr".to_string(),
524 })
525 })
526 .collect::<Vec<_>>();
527
528 let mut output_rows = Vec::with_capacity(input_result.rows.len());
529 for row in &input_result.rows {
530 let mut projected = Vec::with_capacity(columns.len());
531 for col in columns {
532 projected.push(eval_project_expr(
533 &col.expr,
534 row,
535 &input_result.columns,
536 params,
537 )?);
538 }
539 output_rows.push(projected);
540 }
541
542 Ok(QueryResult {
543 columns: output_columns,
544 rows: output_rows,
545 rows_affected: 0,
546 })
547 }
548 PhysicalPlan::Sort { input, keys } => {
549 let mut input_result = execute_plan(db, input, params, tx)?;
550 input_result.rows.sort_by(|left, right| {
551 for key in keys {
552 let Expr::Column(column_ref) = &key.expr else {
553 return Ordering::Equal;
554 };
555 let left_value =
556 match lookup_query_result_column(left, &input_result.columns, column_ref) {
557 Ok(value) => value,
558 Err(_) => return Ordering::Equal,
559 };
560 let right_value = match lookup_query_result_column(
561 right,
562 &input_result.columns,
563 column_ref,
564 ) {
565 Ok(value) => value,
566 Err(_) => return Ordering::Equal,
567 };
568 let ordering = compare_sort_values(&left_value, &right_value, key.direction);
569 if ordering != Ordering::Equal {
570 return ordering;
571 }
572 }
573 Ordering::Equal
574 });
575 Ok(input_result)
576 }
577 PhysicalPlan::Limit { input, count } => {
578 let mut input_result = execute_plan(db, input, params, tx)?;
579 input_result.rows.truncate(*count as usize);
580 Ok(input_result)
581 }
582 PhysicalPlan::Filter { input, predicate } => {
583 let mut input_result = execute_plan(db, input, params, tx)?;
584 input_result.rows.retain(|row| {
585 query_result_row_matches(row, &input_result.columns, predicate, params)
586 .unwrap_or(false)
587 });
588 Ok(input_result)
589 }
590 PhysicalPlan::Distinct { input } => {
591 let input_result = execute_plan(db, input, params, tx)?;
592 let mut seen = HashSet::<Vec<u8>>::new();
593 let rows = input_result
594 .rows
595 .into_iter()
596 .filter(|row| seen.insert(distinct_row_key(row)))
597 .collect();
598 Ok(QueryResult {
599 columns: input_result.columns,
600 rows,
601 rows_affected: input_result.rows_affected,
602 })
603 }
604 PhysicalPlan::Join {
605 left,
606 right,
607 condition,
608 join_type,
609 left_alias,
610 right_alias,
611 } => {
612 let left_result = execute_plan(db, left, params, tx)?;
613 let right_result = execute_plan(db, right, params, tx)?;
614 let right_duplicate_names =
615 duplicate_column_names(&left_result.columns, &right_result.columns);
616 let right_prefix = right_alias
617 .clone()
618 .unwrap_or_else(|| right_table_name(right));
619 let right_columns = right_result
620 .columns
621 .iter()
622 .map(|column| {
623 if right_duplicate_names.contains(column) {
624 format!("{right_prefix}.{column}")
625 } else {
626 column.clone()
627 }
628 })
629 .collect::<Vec<_>>();
630
631 let mut columns = left_result.columns.clone();
632 columns.extend(right_columns);
633
634 let mut rows = Vec::new();
635 for left_row in &left_result.rows {
636 let mut matched = false;
637 for right_row in &right_result.rows {
638 let combined = concatenate_rows(left_row, right_row);
639 if query_result_row_matches(&combined, &columns, condition, params)? {
640 matched = true;
641 rows.push(combined);
642 }
643 }
644
645 if !matched && matches!(join_type, contextdb_planner::JoinType::Left) {
646 let mut combined = left_row.clone();
647 combined.extend(std::iter::repeat_n(Value::Null, right_result.columns.len()));
648 rows.push(combined);
649 }
650 }
651
652 let output_columns = qualify_join_columns(
653 &columns,
654 &left_result.columns,
655 &right_result.columns,
656 left_alias,
657 &right_prefix,
658 );
659
660 Ok(QueryResult {
661 columns: output_columns,
662 rows,
663 rows_affected: 0,
664 })
665 }
666 PhysicalPlan::CreateIndex(_) => Ok(QueryResult::empty_with_affected(0)),
667 PhysicalPlan::SetMemoryLimit(val) => {
668 let limit = match val {
669 SetMemoryLimitValue::Bytes(bytes) => Some(*bytes),
670 SetMemoryLimitValue::None => None,
671 };
672 db.accountant().set_budget(limit)?;
673 db.persist_memory_limit(limit)?;
674 Ok(QueryResult::empty())
675 }
676 PhysicalPlan::ShowMemoryLimit => {
677 let usage = db.accountant().usage();
678 Ok(QueryResult {
679 columns: vec![
680 "limit".to_string(),
681 "used".to_string(),
682 "available".to_string(),
683 "startup_ceiling".to_string(),
684 ],
685 rows: vec![vec![
686 usage
687 .limit
688 .map(|value| Value::Int64(value as i64))
689 .unwrap_or_else(|| Value::Text("none".to_string())),
690 Value::Int64(usage.used as i64),
691 usage
692 .available
693 .map(|value| Value::Int64(value as i64))
694 .unwrap_or_else(|| Value::Text("none".to_string())),
695 usage
696 .startup_ceiling
697 .map(|value| Value::Int64(value as i64))
698 .unwrap_or_else(|| Value::Text("none".to_string())),
699 ]],
700 rows_affected: 0,
701 })
702 }
703 PhysicalPlan::SetDiskLimit(val) => {
704 let limit = match val {
705 SetDiskLimitValue::Bytes(bytes) => Some(*bytes),
706 SetDiskLimitValue::None => None,
707 };
708 db.set_disk_limit(limit)?;
709 db.persist_disk_limit(limit)?;
710 Ok(QueryResult::empty())
711 }
712 PhysicalPlan::ShowDiskLimit => {
713 let limit = db.disk_limit();
714 let used = db.disk_file_size();
715 let startup_ceiling = db.disk_limit_startup_ceiling();
716 Ok(QueryResult {
717 columns: vec![
718 "limit".to_string(),
719 "used".to_string(),
720 "available".to_string(),
721 "startup_ceiling".to_string(),
722 ],
723 rows: vec![vec![
724 limit
725 .map(|value| Value::Int64(value as i64))
726 .unwrap_or_else(|| Value::Text("none".to_string())),
727 used.map(|value| Value::Int64(value as i64))
728 .unwrap_or(Value::Null),
729 match (limit, used) {
730 (Some(limit), Some(used)) => {
731 Value::Int64(limit.saturating_sub(used) as i64)
732 }
733 _ => Value::Null,
734 },
735 startup_ceiling
736 .map(|value| Value::Int64(value as i64))
737 .unwrap_or_else(|| Value::Text("none".to_string())),
738 ]],
739 rows_affected: 0,
740 })
741 }
742 PhysicalPlan::SetSyncConflictPolicy(policy) => {
743 let cp = parse_conflict_policy(policy)?;
744 db.set_default_conflict_policy(cp);
745 Ok(QueryResult::empty())
746 }
747 PhysicalPlan::ShowSyncConflictPolicy => {
748 let policies = db.conflict_policies();
749 let default_str = conflict_policy_to_string(policies.default);
750 let mut rows = vec![vec![Value::Text(default_str)]];
751 for (table, policy) in &policies.per_table {
752 rows.push(vec![Value::Text(format!(
753 "{}={}",
754 table,
755 conflict_policy_to_string(*policy)
756 ))]);
757 }
758 Ok(QueryResult {
759 columns: vec!["policy".to_string()],
760 rows,
761 rows_affected: 0,
762 })
763 }
764 PhysicalPlan::Pipeline(plans) => {
765 let mut last = QueryResult::empty();
766 for p in plans {
767 last = execute_plan(db, p, params, tx)?;
768 }
769 Ok(last)
770 }
771 _ => Err(Error::PlanError(
772 "unsupported plan node in executor".to_string(),
773 )),
774 }
775}
776
777fn eval_project_expr(
778 expr: &Expr,
779 row: &[Value],
780 input_columns: &[String],
781 params: &HashMap<String, Value>,
782) -> Result<Value> {
783 match expr {
784 Expr::Column(c) => lookup_query_result_column(row, input_columns, c),
785 Expr::Literal(lit) => resolve_expr(&Expr::Literal(lit.clone()), params),
786 Expr::Parameter(name) => params
787 .get(name)
788 .cloned()
789 .ok_or_else(|| Error::NotFound(format!("missing parameter: {}", name))),
790 Expr::BinaryOp { left, op, right } => {
791 let left = eval_query_result_expr(left, row, input_columns, params)?;
792 let right = eval_query_result_expr(right, row, input_columns, params)?;
793 eval_binary_op(op, &left, &right)
794 }
795 Expr::UnaryOp { op, operand } => {
796 let value = eval_query_result_expr(operand, row, input_columns, params)?;
797 match op {
798 UnaryOp::Not => Ok(Value::Bool(!value_to_bool(&value))),
799 UnaryOp::Neg => match value {
800 Value::Int64(v) => Ok(Value::Int64(-v)),
801 Value::Float64(v) => Ok(Value::Float64(-v)),
802 _ => Err(Error::PlanError(
803 "cannot negate non-numeric value".to_string(),
804 )),
805 },
806 }
807 }
808 Expr::FunctionCall { name, args } => {
809 let values = args
810 .iter()
811 .map(|arg| eval_query_result_expr(arg, row, input_columns, params))
812 .collect::<Result<Vec<_>>>()?;
813 eval_function(name, &values)
814 }
815 Expr::IsNull { expr, negated } => {
816 let is_null = eval_query_result_expr(expr, row, input_columns, params)? == Value::Null;
817 Ok(Value::Bool(if *negated { !is_null } else { is_null }))
818 }
819 Expr::InList {
820 expr,
821 list,
822 negated,
823 } => {
824 let needle = eval_query_result_expr(expr, row, input_columns, params)?;
825 let matched = list.iter().try_fold(false, |found, item| {
826 if found {
827 Ok(true)
828 } else {
829 let candidate = eval_query_result_expr(item, row, input_columns, params)?;
830 Ok(
831 matches!(compare_values(&needle, &candidate), Some(Ordering::Equal))
832 || (needle != Value::Null
833 && candidate != Value::Null
834 && needle == candidate),
835 )
836 }
837 })?;
838 Ok(Value::Bool(if *negated { !matched } else { matched }))
839 }
840 Expr::Like {
841 expr,
842 pattern,
843 negated,
844 } => {
845 let matches = match (
846 eval_query_result_expr(expr, row, input_columns, params)?,
847 eval_query_result_expr(pattern, row, input_columns, params)?,
848 ) {
849 (Value::Text(value), Value::Text(pattern)) => like_matches(&value, &pattern),
850 _ => false,
851 };
852 Ok(Value::Bool(if *negated { !matches } else { matches }))
853 }
854 _ => resolve_expr(expr, params),
855 }
856}
857
858fn eval_query_result_expr(
859 expr: &Expr,
860 row: &[Value],
861 input_columns: &[String],
862 params: &HashMap<String, Value>,
863) -> Result<Value> {
864 match expr {
865 Expr::Column(c) => lookup_query_result_column(row, input_columns, c),
866 Expr::Literal(lit) => resolve_expr(&Expr::Literal(lit.clone()), params),
867 Expr::Parameter(name) => params
868 .get(name)
869 .cloned()
870 .ok_or_else(|| Error::NotFound(format!("missing parameter: {}", name))),
871 Expr::FunctionCall { name, args } => {
872 let values = args
873 .iter()
874 .map(|arg| eval_query_result_expr(arg, row, input_columns, params))
875 .collect::<Result<Vec<_>>>()?;
876 eval_function(name, &values)
877 }
878 _ => resolve_expr(expr, params),
879 }
880}
881
882fn exec_insert(
883 db: &Database,
884 p: &InsertPlan,
885 params: &HashMap<String, Value>,
886 tx: Option<TxId>,
887) -> Result<QueryResult> {
888 db.check_disk_budget("INSERT")?;
889 let txid = tx.ok_or_else(|| Error::Other("missing tx for insert".to_string()))?;
890
891 let columns: Vec<String> = if p.columns.is_empty() {
894 let meta = db
895 .table_meta(&p.table)
896 .ok_or_else(|| Error::TableNotFound(p.table.clone()))?;
897 meta.columns.iter().map(|c| c.name.clone()).collect()
898 } else {
899 p.columns.clone()
900 };
901
902 let mut rows_affected = 0;
903 for row in &p.values {
904 let mut values = HashMap::new();
905 for (idx, expr) in row.iter().enumerate() {
906 let col = columns
907 .get(idx)
908 .ok_or_else(|| Error::PlanError("column/value count mismatch".to_string()))?;
909 let v = resolve_expr(expr, params)?;
910 values.insert(col.clone(), coerce_value_for_column(db, &p.table, col, v)?);
911 }
912
913 apply_missing_column_defaults(db, &p.table, &mut values)?;
914
915 validate_vector_columns(db, &p.table, &values)?;
916 let row_bytes = estimate_table_row_bytes(db, &p.table, &values)?;
917 db.accountant().try_allocate_for(
918 row_bytes,
919 "insert",
920 "row_insert",
921 "Reduce row size or raise MEMORY_LIMIT before inserting more data.",
922 )?;
923 let checkpoint = db.write_set_checkpoint(txid)?;
924
925 let row_id = if let Some(on_conflict) = &p.on_conflict {
926 let conflict_col = &on_conflict.columns[0];
927 let conflict_value = values
928 .get(conflict_col)
929 .ok_or_else(|| Error::Other("conflict column not in values".to_string()))?;
930 let existing =
931 db.point_lookup(&p.table, conflict_col, conflict_value, db.snapshot())?;
932 let existing_row_id = existing.as_ref().map(|row| row.row_id);
933 let existing_has_vector = existing
934 .as_ref()
935 .is_some_and(|row| db.has_live_vector(row.row_id, db.snapshot()));
936 let upsert_values = if let Some(existing_row) = existing.as_ref() {
937 apply_on_conflict_updates(
938 db,
939 &p.table,
940 values.clone(),
941 existing_row,
942 on_conflict,
943 params,
944 )?
945 } else {
946 values.clone()
947 };
948
949 match db.upsert_row(txid, &p.table, conflict_col, upsert_values) {
950 Ok(UpsertResult::Inserted) => {
951 db.point_lookup_in_tx(
952 txid,
953 &p.table,
954 conflict_col,
955 conflict_value,
956 db.snapshot(),
957 )?
958 .ok_or_else(|| {
959 Error::Other("inserted upsert row not visible in tx".to_string())
960 })?
961 .row_id
962 }
963 Ok(UpsertResult::Updated) => {
964 if existing_has_vector && let Some(existing_row_id) = existing_row_id {
965 db.delete_vector(txid, existing_row_id)?;
966 }
967 db.point_lookup_in_tx(
968 txid,
969 &p.table,
970 conflict_col,
971 conflict_value,
972 db.snapshot(),
973 )?
974 .ok_or_else(|| {
975 Error::Other("updated upsert row not visible in tx".to_string())
976 })?
977 .row_id
978 }
979 Ok(UpsertResult::NoOp) => {
980 db.accountant().release(row_bytes);
981 0
982 }
983 Err(err) => {
984 db.accountant().release(row_bytes);
985 return Err(err);
986 }
987 }
988 } else {
989 match db.insert_row(txid, &p.table, values.clone()) {
990 Ok(row_id) => row_id,
991 Err(err) => {
992 db.accountant().release(row_bytes);
993 return Err(err);
994 }
995 }
996 };
997
998 if should_route_insert_to_graph(db, &p.table)
999 && let (
1000 Some(Value::Uuid(source)),
1001 Some(Value::Uuid(target)),
1002 Some(Value::Text(edge_type)),
1003 ) = (
1004 values.get("source_id"),
1005 values.get("target_id"),
1006 values.get("edge_type"),
1007 )
1008 {
1009 match db.insert_edge(txid, *source, *target, edge_type.clone(), HashMap::new()) {
1010 Ok(true) => {}
1011 Ok(false) => {
1012 let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1013 db.accountant().release(row_bytes);
1014 continue;
1015 }
1016 Err(err) => {
1017 let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1018 db.accountant().release(row_bytes);
1019 return Err(err);
1020 }
1021 }
1022 }
1023
1024 if let Some(v) = vector_value_for_table(db, &p.table, &values)
1025 && row_id != 0
1026 && let Err(err) = db.insert_vector(txid, row_id, v.clone())
1027 {
1028 let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1029 db.accountant().release(row_bytes);
1030 return Err(err);
1031 }
1032
1033 rows_affected += 1;
1034 }
1035
1036 Ok(QueryResult::empty_with_affected(rows_affected))
1037}
1038
1039fn exec_delete(
1040 db: &Database,
1041 p: &DeletePlan,
1042 params: &HashMap<String, Value>,
1043 tx: Option<TxId>,
1044) -> Result<QueryResult> {
1045 let txid = tx.ok_or_else(|| Error::Other("missing tx for delete".to_string()))?;
1046 let snapshot = db.snapshot();
1047 let rows = db.scan(&p.table, snapshot)?;
1048 let resolved_where = p
1049 .where_clause
1050 .as_ref()
1051 .map(|expr| resolve_in_subqueries(db, expr, params, tx))
1052 .transpose()?;
1053 let matched: Vec<_> = rows
1054 .into_iter()
1055 .filter(|r| {
1056 resolved_where
1057 .as_ref()
1058 .is_none_or(|w| row_matches(r, w, params).unwrap_or(false))
1059 })
1060 .collect();
1061
1062 for row in &matched {
1063 if db.has_live_vector(row.row_id, snapshot) {
1064 db.delete_vector(txid, row.row_id)?;
1065 }
1066 db.delete_row(txid, &p.table, row.row_id)?;
1067 }
1068
1069 Ok(QueryResult::empty_with_affected(matched.len() as u64))
1070}
1071
1072fn exec_update(
1073 db: &Database,
1074 p: &UpdatePlan,
1075 params: &HashMap<String, Value>,
1076 tx: Option<TxId>,
1077) -> Result<QueryResult> {
1078 db.check_disk_budget("UPDATE")?;
1079 let txid = tx.ok_or_else(|| Error::Other("missing tx for update".to_string()))?;
1080 let snapshot = db.snapshot();
1081 let rows = db.scan(&p.table, snapshot)?;
1082 let resolved_where = p
1083 .where_clause
1084 .as_ref()
1085 .map(|expr| resolve_in_subqueries(db, expr, params, tx))
1086 .transpose()?;
1087 let matched: Vec<_> = rows
1088 .into_iter()
1089 .filter(|r| {
1090 resolved_where
1091 .as_ref()
1092 .is_none_or(|w| row_matches(r, w, params).unwrap_or(false))
1093 })
1094 .collect();
1095
1096 for row in &matched {
1097 let mut values = row.values.clone();
1098 for (k, vexpr) in &p.assignments {
1099 let value = eval_assignment_expr(vexpr, &row.values, params)?;
1100 values.insert(k.clone(), coerce_value_for_column(db, &p.table, k, value)?);
1101 }
1102 validate_update_state_transition(db, &p.table, row, &values)?;
1103 let row_uuid = values.get("id").and_then(Value::as_uuid).copied();
1104 let new_state = db
1105 .table_meta(&p.table)
1106 .as_ref()
1107 .and_then(|meta| meta.state_machine.as_ref())
1108 .and_then(|sm| values.get(&sm.column))
1109 .and_then(Value::as_text)
1110 .map(std::borrow::ToOwned::to_owned);
1111
1112 let old_has_vector = db.has_live_vector(row.row_id, snapshot);
1113 validate_vector_columns(db, &p.table, &values)?;
1114 let new_vector = vector_value_for_table(db, &p.table, &values).cloned();
1115 let new_row_bytes = estimate_table_row_bytes(db, &p.table, &values)?;
1116 db.accountant().try_allocate_for(
1117 new_row_bytes,
1118 "update",
1119 "row_replace",
1120 "Reduce row growth or raise MEMORY_LIMIT before updating this row.",
1121 )?;
1122 let checkpoint = db.write_set_checkpoint(txid)?;
1123
1124 if let Err(err) = db.delete_row(txid, &p.table, row.row_id) {
1125 db.accountant().release(new_row_bytes);
1126 return Err(err);
1127 }
1128 if old_has_vector && let Err(err) = db.delete_vector(txid, row.row_id) {
1129 db.accountant().release(new_row_bytes);
1130 let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1131 return Err(err);
1132 }
1133
1134 let new_row_id = match db.insert_row(txid, &p.table, values) {
1135 Ok(row_id) => row_id,
1136 Err(err) => {
1137 db.accountant().release(new_row_bytes);
1138 let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1139 return Err(err);
1140 }
1141 };
1142 if let Some(vector) = new_vector
1143 && let Err(err) = db.insert_vector(txid, new_row_id, vector)
1144 {
1145 db.accountant().release(new_row_bytes);
1146 let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1147 return Err(err);
1148 }
1149 if let Err(err) =
1150 db.propagate_state_change_if_needed(txid, &p.table, row_uuid, new_state.as_deref())
1151 {
1152 db.accountant().release(new_row_bytes);
1153 let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1154 return Err(err);
1155 }
1156 }
1157
1158 Ok(QueryResult::empty_with_affected(matched.len() as u64))
1159}
1160
1161fn estimate_table_row_bytes(
1162 db: &Database,
1163 table: &str,
1164 values: &HashMap<String, Value>,
1165) -> Result<usize> {
1166 let meta = db
1167 .table_meta(table)
1168 .ok_or_else(|| Error::TableNotFound(table.to_string()))?;
1169 Ok(estimate_row_bytes_for_meta(values, &meta, false))
1170}
1171
1172fn validate_update_state_transition(
1173 db: &Database,
1174 table: &str,
1175 existing: &VersionedRow,
1176 next_values: &HashMap<String, Value>,
1177) -> Result<()> {
1178 let Some(meta) = db.table_meta(table) else {
1179 return Ok(());
1180 };
1181 let Some(state_machine) = meta.state_machine else {
1182 return Ok(());
1183 };
1184
1185 let old_state = existing
1186 .values
1187 .get(&state_machine.column)
1188 .and_then(Value::as_text);
1189 let new_state = next_values
1190 .get(&state_machine.column)
1191 .and_then(Value::as_text);
1192
1193 let (Some(old_state), Some(new_state)) = (old_state, new_state) else {
1194 return Ok(());
1195 };
1196
1197 if old_state == new_state
1198 || db.relational_store().validate_state_transition(
1199 table,
1200 &state_machine.column,
1201 old_state,
1202 new_state,
1203 )
1204 {
1205 return Ok(());
1206 }
1207
1208 Err(Error::InvalidStateTransition(format!(
1209 "{old_state} -> {new_state}"
1210 )))
1211}
1212
1213fn estimate_row_bytes_for_meta(
1214 values: &HashMap<String, Value>,
1215 meta: &TableMeta,
1216 include_vectors: bool,
1217) -> usize {
1218 let mut bytes = 96usize;
1219 for column in &meta.columns {
1220 let Some(value) = values.get(&column.name) else {
1221 continue;
1222 };
1223 if !include_vectors && matches!(column.column_type, ColumnType::Vector(_)) {
1224 continue;
1225 }
1226 bytes = bytes.saturating_add(32 + column.name.len() * 8 + value.estimated_bytes());
1227 }
1228 bytes
1229}
1230
1231fn estimate_vector_search_bytes(dimension: usize, k: usize) -> usize {
1232 k.saturating_mul(3)
1233 .saturating_mul(dimension)
1234 .saturating_mul(std::mem::size_of::<f32>())
1235}
1236
1237fn estimate_bfs_working_bytes<T>(
1238 frontier: &[T],
1239 steps: &[contextdb_planner::GraphStepPlan],
1240) -> usize {
1241 let max_hops = steps.iter().fold(0usize, |acc, step| {
1242 acc.saturating_add(step.max_depth as usize)
1243 });
1244 frontier
1245 .len()
1246 .saturating_mul(2048)
1247 .saturating_mul(max_hops.max(1))
1248}
1249
1250fn dedupe_graph_frontier(
1251 frontier: Vec<(HashMap<String, uuid::Uuid>, uuid::Uuid, u32)>,
1252 steps: &[contextdb_planner::GraphStepPlan],
1253) -> Vec<(HashMap<String, uuid::Uuid>, uuid::Uuid, u32)> {
1254 let mut best =
1255 HashMap::<Vec<uuid::Uuid>, (HashMap<String, uuid::Uuid>, uuid::Uuid, u32)>::new();
1256
1257 for (bindings, current_id, depth) in frontier {
1258 let mut key = Vec::with_capacity(steps.len());
1259 for step in steps {
1260 if let Some(id) = bindings.get(&step.target_alias) {
1261 key.push(*id);
1262 }
1263 }
1264
1265 best.entry(key)
1266 .and_modify(|existing| {
1267 if depth < existing.2 {
1268 *existing = (bindings.clone(), current_id, depth);
1269 }
1270 })
1271 .or_insert((bindings, current_id, depth));
1272 }
1273
1274 best.into_values().collect()
1275}
1276
1277fn estimate_drop_table_bytes(db: &Database, table: &str) -> usize {
1278 let meta = db.table_meta(table);
1279 let metadata_bytes = meta.as_ref().map(TableMeta::estimated_bytes).unwrap_or(0);
1280 let snapshot = db.snapshot();
1281 let rows = db.scan(table, snapshot).unwrap_or_default();
1282 let row_bytes = rows.iter().fold(0usize, |acc, row| {
1283 acc.saturating_add(meta.as_ref().map_or_else(
1284 || row.estimated_bytes(),
1285 |meta| estimate_row_bytes_for_meta(&row.values, meta, false),
1286 ))
1287 });
1288 let vector_bytes = rows
1289 .iter()
1290 .filter_map(|row| db.live_vector_entry(row.row_id, snapshot))
1291 .fold(0usize, |acc, entry| {
1292 acc.saturating_add(entry.estimated_bytes())
1293 });
1294 let edge_bytes = rows.iter().fold(0usize, |acc, row| {
1295 match (
1296 row.values.get("source_id").and_then(Value::as_uuid),
1297 row.values.get("target_id").and_then(Value::as_uuid),
1298 row.values.get("edge_type").and_then(Value::as_text),
1299 ) {
1300 (Some(_), Some(_), Some(edge_type)) => acc.saturating_add(
1301 96 + edge_type.len().saturating_mul(16) + estimate_row_value_bytes(&HashMap::new()),
1302 ),
1303 _ => acc,
1304 }
1305 });
1306 metadata_bytes
1307 .saturating_add(row_bytes)
1308 .saturating_add(vector_bytes)
1309 .saturating_add(edge_bytes)
1310}
1311
1312fn materialize_rows(
1313 rows: Vec<VersionedRow>,
1314 filter: Option<&Expr>,
1315 params: &HashMap<String, Value>,
1316 schema_columns: Option<&[String]>,
1317) -> Result<QueryResult> {
1318 let filtered: Vec<VersionedRow> = rows
1319 .into_iter()
1320 .filter(|r| filter.is_none_or(|f| row_matches(r, f, params).unwrap_or(false)))
1321 .collect();
1322
1323 let keys = if let Some(schema_columns) = schema_columns {
1324 schema_columns.to_vec()
1325 } else {
1326 let mut keys = BTreeSet::new();
1327 for r in &filtered {
1328 for k in r.values.keys() {
1329 keys.insert(k.clone());
1330 }
1331 }
1332 keys.into_iter().collect::<Vec<_>>()
1333 };
1334
1335 let mut columns = vec!["row_id".to_string()];
1336 columns.extend(keys.iter().cloned());
1337
1338 let rows = filtered
1339 .into_iter()
1340 .map(|r| {
1341 let mut out = vec![Value::Int64(r.row_id as i64)];
1342 for k in &keys {
1343 out.push(r.values.get(k).cloned().unwrap_or(Value::Null));
1344 }
1345 out
1346 })
1347 .collect();
1348
1349 Ok(QueryResult {
1350 columns,
1351 rows,
1352 rows_affected: 0,
1353 })
1354}
1355
1356fn row_matches(row: &VersionedRow, expr: &Expr, params: &HashMap<String, Value>) -> Result<bool> {
1357 Ok(eval_bool_expr(row, expr, params)?.unwrap_or(false))
1358}
1359
1360fn eval_expr_value(
1361 row: &VersionedRow,
1362 expr: &Expr,
1363 params: &HashMap<String, Value>,
1364) -> Result<Value> {
1365 match expr {
1366 Expr::Column(c) => {
1367 if c.column == "row_id" {
1368 Ok(Value::Int64(row.row_id as i64))
1369 } else {
1370 Ok(row.values.get(&c.column).cloned().unwrap_or(Value::Null))
1371 }
1372 }
1373 Expr::BinaryOp { left, op, right } => {
1374 let left = eval_expr_value(row, left, params)?;
1375 let right = eval_expr_value(row, right, params)?;
1376 eval_binary_op(op, &left, &right)
1377 }
1378 Expr::UnaryOp { op, operand } => {
1379 let value = eval_expr_value(row, operand, params)?;
1380 match op {
1381 UnaryOp::Not => Ok(Value::Bool(!value_to_bool(&value))),
1382 UnaryOp::Neg => match value {
1383 Value::Int64(v) => Ok(Value::Int64(-v)),
1384 Value::Float64(v) => Ok(Value::Float64(-v)),
1385 _ => Err(Error::PlanError(
1386 "cannot negate non-numeric value".to_string(),
1387 )),
1388 },
1389 }
1390 }
1391 Expr::FunctionCall { name, args } => eval_function_in_row_context(row, name, args, params),
1392 Expr::IsNull { expr, negated } => {
1393 let is_null = eval_expr_value(row, expr, params)? == Value::Null;
1394 Ok(Value::Bool(if *negated { !is_null } else { is_null }))
1395 }
1396 Expr::InList {
1397 expr,
1398 list,
1399 negated,
1400 } => {
1401 let needle = eval_expr_value(row, expr, params)?;
1402 let matched = list.iter().try_fold(false, |found, item| {
1403 if found {
1404 Ok(true)
1405 } else {
1406 let candidate = eval_expr_value(row, item, params)?;
1407 Ok(
1408 matches!(compare_values(&needle, &candidate), Some(Ordering::Equal))
1409 || (needle != Value::Null
1410 && candidate != Value::Null
1411 && needle == candidate),
1412 )
1413 }
1414 })?;
1415 Ok(Value::Bool(if *negated { !matched } else { matched }))
1416 }
1417 Expr::Like {
1418 expr,
1419 pattern,
1420 negated,
1421 } => {
1422 let matches = match (
1423 eval_expr_value(row, expr, params)?,
1424 eval_expr_value(row, pattern, params)?,
1425 ) {
1426 (Value::Text(value), Value::Text(pattern)) => like_matches(&value, &pattern),
1427 _ => false,
1428 };
1429 Ok(Value::Bool(if *negated { !matches } else { matches }))
1430 }
1431 _ => resolve_expr(expr, params),
1432 }
1433}
1434
1435pub fn resolve_expr(expr: &Expr, params: &HashMap<String, Value>) -> Result<Value> {
1436 match expr {
1437 Expr::Literal(l) => Ok(match l {
1438 Literal::Null => Value::Null,
1439 Literal::Bool(v) => Value::Bool(*v),
1440 Literal::Integer(v) => Value::Int64(*v),
1441 Literal::Real(v) => Value::Float64(*v),
1442 Literal::Text(v) => Value::Text(v.clone()),
1443 Literal::Vector(v) => Value::Vector(v.clone()),
1444 }),
1445 Expr::Parameter(p) => params
1446 .get(p)
1447 .cloned()
1448 .ok_or_else(|| Error::NotFound(format!("missing parameter: {}", p))),
1449 Expr::Column(c) => Ok(Value::Text(c.column.clone())),
1450 Expr::UnaryOp { op, operand } => match op {
1451 UnaryOp::Neg => match resolve_expr(operand, params)? {
1452 Value::Int64(v) => Ok(Value::Int64(-v)),
1453 Value::Float64(v) => Ok(Value::Float64(-v)),
1454 _ => Err(Error::PlanError(
1455 "cannot negate non-numeric value".to_string(),
1456 )),
1457 },
1458 UnaryOp::Not => Err(Error::PlanError(
1459 "boolean NOT requires row context".to_string(),
1460 )),
1461 },
1462 Expr::FunctionCall { name, args } => {
1463 let values = args
1464 .iter()
1465 .map(|arg| resolve_expr(arg, params))
1466 .collect::<Result<Vec<_>>>()?;
1467 eval_function(name, &values)
1468 }
1469 Expr::CosineDistance { right, .. } => resolve_expr(right, params),
1470 _ => Err(Error::PlanError("unsupported expression".to_string())),
1471 }
1472}
1473
1474fn compare_values(a: &Value, b: &Value) -> Option<Ordering> {
1475 match (a, b) {
1476 (Value::Int64(left), Value::Int64(right)) => Some(left.cmp(right)),
1477 (Value::Float64(left), Value::Float64(right)) => Some(left.total_cmp(right)),
1478 (Value::Text(left), Value::Text(right)) => Some(left.cmp(right)),
1479 (Value::Timestamp(left), Value::Timestamp(right)) => Some(left.cmp(right)),
1480 (Value::Int64(left), Value::Float64(right)) => Some((*left as f64).total_cmp(right)),
1481 (Value::Float64(left), Value::Int64(right)) => Some(left.total_cmp(&(*right as f64))),
1482 (Value::Timestamp(left), Value::Int64(right)) => Some(left.cmp(right)),
1483 (Value::Int64(left), Value::Timestamp(right)) => Some(left.cmp(right)),
1484 (Value::Bool(left), Value::Bool(right)) => Some(left.cmp(right)),
1485 (Value::Uuid(left), Value::Uuid(right)) => Some(left.cmp(right)),
1486 (Value::Uuid(u), Value::Text(t)) => {
1487 if let Ok(parsed) = t.parse::<uuid::Uuid>() {
1488 Some(u.cmp(&parsed))
1489 } else {
1490 None
1491 }
1492 }
1493 (Value::Text(t), Value::Uuid(u)) => {
1494 if let Ok(parsed) = t.parse::<uuid::Uuid>() {
1495 Some(parsed.cmp(u))
1496 } else {
1497 None
1498 }
1499 }
1500 (Value::Null, _) | (_, Value::Null) => None,
1501 _ => None,
1502 }
1503}
1504
1505fn eval_bool_expr(
1506 row: &VersionedRow,
1507 expr: &Expr,
1508 params: &HashMap<String, Value>,
1509) -> Result<Option<bool>> {
1510 match expr {
1511 Expr::BinaryOp { left, op, right } => match op {
1512 BinOp::Eq | BinOp::Neq | BinOp::Lt | BinOp::Lte | BinOp::Gt | BinOp::Gte => {
1513 let left = eval_expr_value(row, left, params)?;
1514 let right = eval_expr_value(row, right, params)?;
1515 if left == Value::Null || right == Value::Null {
1516 return Ok(None);
1517 }
1518
1519 let result = match op {
1520 BinOp::Eq => {
1521 compare_values(&left, &right) == Some(Ordering::Equal) || left == right
1522 }
1523 BinOp::Neq => {
1524 !(compare_values(&left, &right) == Some(Ordering::Equal) || left == right)
1525 }
1526 BinOp::Lt => compare_values(&left, &right) == Some(Ordering::Less),
1527 BinOp::Lte => matches!(
1528 compare_values(&left, &right),
1529 Some(Ordering::Less | Ordering::Equal)
1530 ),
1531 BinOp::Gt => compare_values(&left, &right) == Some(Ordering::Greater),
1532 BinOp::Gte => matches!(
1533 compare_values(&left, &right),
1534 Some(Ordering::Greater | Ordering::Equal)
1535 ),
1536 BinOp::And | BinOp::Or => unreachable!(),
1537 };
1538 Ok(Some(result))
1539 }
1540 BinOp::And => {
1541 let left = eval_bool_expr(row, left, params)?;
1542 if left == Some(false) {
1543 return Ok(Some(false));
1544 }
1545 let right = eval_bool_expr(row, right, params)?;
1546 Ok(match (left, right) {
1547 (Some(true), Some(true)) => Some(true),
1548 (Some(true), other) => other,
1549 (None, Some(false)) => Some(false),
1550 (None, Some(true)) | (None, None) => None,
1551 (Some(false), _) => Some(false),
1552 })
1553 }
1554 BinOp::Or => {
1555 let left = eval_bool_expr(row, left, params)?;
1556 if left == Some(true) {
1557 return Ok(Some(true));
1558 }
1559 let right = eval_bool_expr(row, right, params)?;
1560 Ok(match (left, right) {
1561 (Some(false), Some(false)) => Some(false),
1562 (Some(false), other) => other,
1563 (None, Some(true)) => Some(true),
1564 (None, Some(false)) | (None, None) => None,
1565 (Some(true), _) => Some(true),
1566 })
1567 }
1568 },
1569 Expr::UnaryOp {
1570 op: UnaryOp::Not,
1571 operand,
1572 } => Ok(eval_bool_expr(row, operand, params)?.map(|value| !value)),
1573 Expr::InList {
1574 expr,
1575 list,
1576 negated,
1577 } => {
1578 let needle = eval_expr_value(row, expr, params)?;
1579 if needle == Value::Null {
1580 return Ok(None);
1581 }
1582
1583 let matched = list.iter().try_fold(false, |found, item| {
1584 if found {
1585 Ok(true)
1586 } else {
1587 let candidate = eval_expr_value(row, item, params)?;
1588 Ok(
1589 matches!(compare_values(&needle, &candidate), Some(Ordering::Equal))
1590 || (candidate != Value::Null && needle == candidate),
1591 )
1592 }
1593 })?;
1594 Ok(Some(if *negated { !matched } else { matched }))
1595 }
1596 Expr::InSubquery { .. } => Err(Error::PlanError(
1597 "IN (subquery) must be resolved before execution".to_string(),
1598 )),
1599 Expr::Like {
1600 expr,
1601 pattern,
1602 negated,
1603 } => {
1604 let left = eval_expr_value(row, expr, params)?;
1605 let right = eval_expr_value(row, pattern, params)?;
1606 let matched = match (left, right) {
1607 (Value::Text(value), Value::Text(pattern)) => like_matches(&value, &pattern),
1608 _ => false,
1609 };
1610 Ok(Some(if *negated { !matched } else { matched }))
1611 }
1612 Expr::IsNull { expr, negated } => {
1613 let is_null = eval_expr_value(row, expr, params)? == Value::Null;
1614 Ok(Some(if *negated { !is_null } else { is_null }))
1615 }
1616 Expr::FunctionCall { .. } => match eval_expr_value(row, expr, params)? {
1617 Value::Bool(value) => Ok(Some(value)),
1618 Value::Null => Ok(None),
1619 _ => Err(Error::PlanError(format!(
1620 "unsupported WHERE expression: {:?}",
1621 expr
1622 ))),
1623 },
1624 _ => Err(Error::PlanError(format!(
1625 "unsupported WHERE expression: {:?}",
1626 expr
1627 ))),
1628 }
1629}
1630
1631fn eval_binary_op(op: &BinOp, left: &Value, right: &Value) -> Result<Value> {
1632 let bool_value = match op {
1633 BinOp::Eq => {
1634 if left == &Value::Null || right == &Value::Null {
1635 false
1636 } else {
1637 compare_values(left, right) == Some(Ordering::Equal) || left == right
1638 }
1639 }
1640 BinOp::Neq => {
1641 if left == &Value::Null || right == &Value::Null {
1642 false
1643 } else {
1644 !(compare_values(left, right) == Some(Ordering::Equal) || left == right)
1645 }
1646 }
1647 BinOp::Lt => compare_values(left, right) == Some(Ordering::Less),
1648 BinOp::Lte => matches!(
1649 compare_values(left, right),
1650 Some(Ordering::Less | Ordering::Equal)
1651 ),
1652 BinOp::Gt => compare_values(left, right) == Some(Ordering::Greater),
1653 BinOp::Gte => matches!(
1654 compare_values(left, right),
1655 Some(Ordering::Greater | Ordering::Equal)
1656 ),
1657 BinOp::And => value_to_bool(left) && value_to_bool(right),
1658 BinOp::Or => value_to_bool(left) || value_to_bool(right),
1659 };
1660 Ok(Value::Bool(bool_value))
1661}
1662
1663fn value_to_bool(value: &Value) -> bool {
1664 matches!(value, Value::Bool(true))
1665}
1666
1667fn compare_sort_values(left: &Value, right: &Value, direction: SortDirection) -> Ordering {
1668 match (left, right) {
1669 (Value::Null, Value::Null) => Ordering::Equal,
1670 (Value::Null, _) => match direction {
1671 SortDirection::Asc => Ordering::Greater,
1672 SortDirection::Desc => Ordering::Less,
1673 SortDirection::CosineDistance => Ordering::Equal,
1674 },
1675 (_, Value::Null) => match direction {
1676 SortDirection::Asc => Ordering::Less,
1677 SortDirection::Desc => Ordering::Greater,
1678 SortDirection::CosineDistance => Ordering::Equal,
1679 },
1680 _ => {
1681 let ordering = compare_values(left, right).unwrap_or(Ordering::Equal);
1682 match direction {
1683 SortDirection::Asc => ordering,
1684 SortDirection::Desc => ordering.reverse(),
1685 SortDirection::CosineDistance => ordering,
1686 }
1687 }
1688 }
1689}
1690
1691fn eval_assignment_expr(
1692 expr: &Expr,
1693 row_values: &HashMap<String, Value>,
1694 params: &HashMap<String, Value>,
1695) -> Result<Value> {
1696 match expr {
1697 Expr::Literal(lit) => literal_to_value(lit),
1698 Expr::Parameter(name) => params
1699 .get(name)
1700 .cloned()
1701 .ok_or_else(|| Error::Other(format!("unknown parameter: {}", name))),
1702 Expr::Column(col_ref) => row_values
1703 .get(&col_ref.column)
1704 .cloned()
1705 .ok_or_else(|| Error::Other(format!("column not found: {}", col_ref.column))),
1706 Expr::BinaryOp { left, op, right } => {
1707 let left = eval_assignment_expr(left, row_values, params)?;
1708 let right = eval_assignment_expr(right, row_values, params)?;
1709 eval_binary_op(op, &left, &right)
1710 }
1711 Expr::UnaryOp { op, operand } => match op {
1712 UnaryOp::Neg => match eval_assignment_expr(operand, row_values, params)? {
1713 Value::Int64(value) => Ok(Value::Int64(-value)),
1714 Value::Float64(value) => Ok(Value::Float64(-value)),
1715 _ => Err(Error::Other(format!(
1716 "unsupported expression in UPDATE SET: {:?}",
1717 expr
1718 ))),
1719 },
1720 UnaryOp::Not => Err(Error::Other(format!(
1721 "unsupported expression in UPDATE SET: {:?}",
1722 expr
1723 ))),
1724 },
1725 Expr::FunctionCall { name, args } => {
1726 let evaluated = args
1727 .iter()
1728 .map(|arg| eval_assignment_expr(arg, row_values, params))
1729 .collect::<Result<Vec<_>>>()?;
1730 eval_function(name, &evaluated)
1731 }
1732 _ => Err(Error::Other(format!(
1733 "unsupported expression in UPDATE SET: {:?}",
1734 expr
1735 ))),
1736 }
1737}
1738
1739fn apply_on_conflict_updates(
1740 db: &Database,
1741 table: &str,
1742 mut insert_values: HashMap<String, Value>,
1743 existing_row: &VersionedRow,
1744 on_conflict: &OnConflictPlan,
1745 params: &HashMap<String, Value>,
1746) -> Result<HashMap<String, Value>> {
1747 if on_conflict.update_columns.is_empty() {
1748 return Ok(insert_values);
1749 }
1750
1751 let mut merged = existing_row.values.clone();
1752 for (column, expr) in &on_conflict.update_columns {
1753 let value = eval_assignment_expr(expr, &existing_row.values, params)?;
1754 merged.insert(
1755 column.clone(),
1756 coerce_value_for_column(db, table, column, value)?,
1757 );
1758 }
1759
1760 for (column, value) in insert_values.drain() {
1761 merged.entry(column).or_insert(value);
1762 }
1763
1764 Ok(merged)
1765}
1766
1767fn literal_to_value(lit: &Literal) -> Result<Value> {
1768 Ok(match lit {
1769 Literal::Null => Value::Null,
1770 Literal::Bool(v) => Value::Bool(*v),
1771 Literal::Integer(v) => Value::Int64(*v),
1772 Literal::Real(v) => Value::Float64(*v),
1773 Literal::Text(v) => Value::Text(v.clone()),
1774 Literal::Vector(v) => Value::Vector(v.clone()),
1775 })
1776}
1777
1778fn eval_arithmetic(name: &str, args: &[Value]) -> Result<Value> {
1779 let [left, right] = args else {
1780 return Err(Error::PlanError(format!(
1781 "function {} expects 2 arguments",
1782 name
1783 )));
1784 };
1785
1786 match (left, right) {
1787 (Value::Int64(left), Value::Int64(right)) => match name {
1788 "__add" => Ok(Value::Int64(left + right)),
1789 "__sub" => Ok(Value::Int64(left - right)),
1790 "__mul" => Ok(Value::Int64(left * right)),
1791 "__div" => Ok(Value::Int64(left / right)),
1792 _ => Err(Error::PlanError(format!("unknown function: {}", name))),
1793 },
1794 (Value::Float64(left), Value::Float64(right)) => match name {
1795 "__add" => Ok(Value::Float64(left + right)),
1796 "__sub" => Ok(Value::Float64(left - right)),
1797 "__mul" => Ok(Value::Float64(left * right)),
1798 "__div" => Ok(Value::Float64(left / right)),
1799 _ => Err(Error::PlanError(format!("unknown function: {}", name))),
1800 },
1801 (Value::Int64(left), Value::Float64(right)) => match name {
1802 "__add" => Ok(Value::Float64(*left as f64 + right)),
1803 "__sub" => Ok(Value::Float64(*left as f64 - right)),
1804 "__mul" => Ok(Value::Float64(*left as f64 * right)),
1805 "__div" => Ok(Value::Float64(*left as f64 / right)),
1806 _ => Err(Error::PlanError(format!("unknown function: {}", name))),
1807 },
1808 (Value::Float64(left), Value::Int64(right)) => match name {
1809 "__add" => Ok(Value::Float64(left + *right as f64)),
1810 "__sub" => Ok(Value::Float64(left - *right as f64)),
1811 "__mul" => Ok(Value::Float64(left * *right as f64)),
1812 "__div" => Ok(Value::Float64(left / *right as f64)),
1813 _ => Err(Error::PlanError(format!("unknown function: {}", name))),
1814 },
1815 _ => Err(Error::PlanError(format!(
1816 "function {} expects numeric arguments",
1817 name
1818 ))),
1819 }
1820}
1821
1822fn eval_function_in_row_context(
1823 row: &VersionedRow,
1824 name: &str,
1825 args: &[Expr],
1826 params: &HashMap<String, Value>,
1827) -> Result<Value> {
1828 let values = args
1829 .iter()
1830 .map(|arg| eval_expr_value(row, arg, params))
1831 .collect::<Result<Vec<_>>>()?;
1832 eval_function(name, &values)
1833}
1834
1835fn eval_function(name: &str, args: &[Value]) -> Result<Value> {
1836 match name.to_ascii_lowercase().as_str() {
1837 "__add" | "__sub" | "__mul" | "__div" => eval_arithmetic(name, args),
1838 "coalesce" => Ok(args
1839 .iter()
1840 .find(|value| **value != Value::Null)
1841 .cloned()
1842 .unwrap_or(Value::Null)),
1843 "now" => Ok(Value::Timestamp(
1844 SystemTime::now()
1845 .duration_since(UNIX_EPOCH)
1846 .map_err(|err| Error::PlanError(err.to_string()))?
1847 .as_secs() as i64,
1848 )),
1849 _ => Err(Error::PlanError(format!("unknown function: {}", name))),
1850 }
1851}
1852
1853fn like_matches(value: &str, pattern: &str) -> bool {
1854 let value_chars = value.chars().collect::<Vec<_>>();
1855 let pattern_chars = pattern.chars().collect::<Vec<_>>();
1856 let (mut vi, mut pi) = (0usize, 0usize);
1857 let (mut star_idx, mut match_idx) = (None, 0usize);
1858
1859 while vi < value_chars.len() {
1860 if pi < pattern_chars.len()
1861 && (pattern_chars[pi] == '_' || pattern_chars[pi] == value_chars[vi])
1862 {
1863 vi += 1;
1864 pi += 1;
1865 } else if pi < pattern_chars.len() && pattern_chars[pi] == '%' {
1866 star_idx = Some(pi);
1867 match_idx = vi;
1868 pi += 1;
1869 } else if let Some(star) = star_idx {
1870 pi = star + 1;
1871 match_idx += 1;
1872 vi = match_idx;
1873 } else {
1874 return false;
1875 }
1876 }
1877
1878 while pi < pattern_chars.len() && pattern_chars[pi] == '%' {
1879 pi += 1;
1880 }
1881
1882 pi == pattern_chars.len()
1883}
1884
1885fn resolve_in_subqueries(
1886 db: &Database,
1887 expr: &Expr,
1888 params: &HashMap<String, Value>,
1889 tx: Option<TxId>,
1890) -> Result<Expr> {
1891 resolve_in_subqueries_with_ctes(db, expr, params, tx, &[])
1892}
1893
1894pub(crate) fn resolve_in_subqueries_with_ctes(
1895 db: &Database,
1896 expr: &Expr,
1897 params: &HashMap<String, Value>,
1898 tx: Option<TxId>,
1899 ctes: &[Cte],
1900) -> Result<Expr> {
1901 match expr {
1902 Expr::InSubquery {
1903 expr,
1904 subquery,
1905 negated,
1906 } => {
1907 let mut subquery_tables: std::collections::HashSet<String> = subquery
1909 .from
1910 .iter()
1911 .filter_map(|item| match item {
1912 contextdb_parser::ast::FromItem::Table { name, .. } => Some(name.clone()),
1913 _ => None,
1914 })
1915 .collect();
1916 for cte in ctes {
1918 match cte {
1919 Cte::SqlCte { name, .. } | Cte::MatchCte { name, .. } => {
1920 subquery_tables.insert(name.clone());
1921 }
1922 }
1923 }
1924 if let Some(where_clause) = &subquery.where_clause
1925 && has_outer_table_ref(where_clause, &subquery_tables)
1926 {
1927 return Err(Error::Other(
1928 "correlated subqueries are not supported".to_string(),
1929 ));
1930 }
1931
1932 let query_plan = plan(&Statement::Select(SelectStatement {
1933 ctes: ctes.to_vec(),
1934 body: (**subquery).clone(),
1935 }))?;
1936 let result = execute_plan(db, &query_plan, params, tx)?;
1937 let select_expr = subquery
1938 .columns
1939 .first()
1940 .map(|column| column.expr.clone())
1941 .ok_or_else(|| Error::PlanError("subquery must select one column".to_string()))?;
1942 let list = result
1943 .rows
1944 .iter()
1945 .map(|row| eval_project_expr(&select_expr, row, &result.columns, params))
1946 .collect::<Result<Vec<_>>>()?
1947 .into_iter()
1948 .map(value_to_literal)
1949 .collect::<Result<Vec<_>>>()?;
1950 Ok(Expr::InList {
1951 expr: Box::new(resolve_in_subqueries_with_ctes(db, expr, params, tx, ctes)?),
1952 list,
1953 negated: *negated,
1954 })
1955 }
1956 Expr::BinaryOp { left, op, right } => Ok(Expr::BinaryOp {
1957 left: Box::new(resolve_in_subqueries_with_ctes(db, left, params, tx, ctes)?),
1958 op: *op,
1959 right: Box::new(resolve_in_subqueries_with_ctes(
1960 db, right, params, tx, ctes,
1961 )?),
1962 }),
1963 Expr::UnaryOp { op, operand } => Ok(Expr::UnaryOp {
1964 op: *op,
1965 operand: Box::new(resolve_in_subqueries_with_ctes(
1966 db, operand, params, tx, ctes,
1967 )?),
1968 }),
1969 Expr::InList {
1970 expr,
1971 list,
1972 negated,
1973 } => Ok(Expr::InList {
1974 expr: Box::new(resolve_in_subqueries_with_ctes(db, expr, params, tx, ctes)?),
1975 list: list
1976 .iter()
1977 .map(|item| resolve_in_subqueries_with_ctes(db, item, params, tx, ctes))
1978 .collect::<Result<Vec<_>>>()?,
1979 negated: *negated,
1980 }),
1981 Expr::Like {
1982 expr,
1983 pattern,
1984 negated,
1985 } => Ok(Expr::Like {
1986 expr: Box::new(resolve_in_subqueries_with_ctes(db, expr, params, tx, ctes)?),
1987 pattern: Box::new(resolve_in_subqueries_with_ctes(
1988 db, pattern, params, tx, ctes,
1989 )?),
1990 negated: *negated,
1991 }),
1992 Expr::IsNull { expr, negated } => Ok(Expr::IsNull {
1993 expr: Box::new(resolve_in_subqueries_with_ctes(db, expr, params, tx, ctes)?),
1994 negated: *negated,
1995 }),
1996 Expr::FunctionCall { name, args } => Ok(Expr::FunctionCall {
1997 name: name.clone(),
1998 args: args
1999 .iter()
2000 .map(|arg| resolve_in_subqueries_with_ctes(db, arg, params, tx, ctes))
2001 .collect::<Result<Vec<_>>>()?,
2002 }),
2003 _ => Ok(expr.clone()),
2004 }
2005}
2006
2007fn has_outer_table_ref(expr: &Expr, subquery_tables: &std::collections::HashSet<String>) -> bool {
2008 match expr {
2009 Expr::Column(ColumnRef {
2010 table: Some(table), ..
2011 }) => !subquery_tables.contains(table),
2012 Expr::BinaryOp { left, right, .. } => {
2013 has_outer_table_ref(left, subquery_tables)
2014 || has_outer_table_ref(right, subquery_tables)
2015 }
2016 Expr::UnaryOp { operand, .. } => has_outer_table_ref(operand, subquery_tables),
2017 Expr::InList { expr, list, .. } => {
2018 has_outer_table_ref(expr, subquery_tables)
2019 || list
2020 .iter()
2021 .any(|item| has_outer_table_ref(item, subquery_tables))
2022 }
2023 Expr::IsNull { expr, .. } => has_outer_table_ref(expr, subquery_tables),
2024 Expr::Like { expr, pattern, .. } => {
2025 has_outer_table_ref(expr, subquery_tables)
2026 || has_outer_table_ref(pattern, subquery_tables)
2027 }
2028 Expr::FunctionCall { args, .. } => args
2029 .iter()
2030 .any(|arg| has_outer_table_ref(arg, subquery_tables)),
2031 _ => false,
2032 }
2033}
2034
2035fn value_to_literal(value: Value) -> Result<Expr> {
2036 Ok(Expr::Literal(match value {
2037 Value::Null => Literal::Null,
2038 Value::Bool(v) => Literal::Bool(v),
2039 Value::Int64(v) => Literal::Integer(v),
2040 Value::Float64(v) => Literal::Real(v),
2041 Value::Text(v) => Literal::Text(v),
2042 Value::Uuid(v) => Literal::Text(v.to_string()),
2043 Value::Timestamp(v) => Literal::Integer(v),
2044 other => {
2045 return Err(Error::PlanError(format!(
2046 "unsupported subquery result value: {:?}",
2047 other
2048 )));
2049 }
2050 }))
2051}
2052
2053fn query_result_row_matches(
2054 row: &[Value],
2055 columns: &[String],
2056 expr: &Expr,
2057 params: &HashMap<String, Value>,
2058) -> Result<bool> {
2059 Ok(eval_query_result_bool_expr(row, columns, expr, params)?.unwrap_or(false))
2060}
2061
2062fn eval_query_result_bool_expr(
2063 row: &[Value],
2064 columns: &[String],
2065 expr: &Expr,
2066 params: &HashMap<String, Value>,
2067) -> Result<Option<bool>> {
2068 match expr {
2069 Expr::BinaryOp { left, op, right } => match op {
2070 BinOp::Eq | BinOp::Neq | BinOp::Lt | BinOp::Lte | BinOp::Gt | BinOp::Gte => {
2071 let left = eval_query_result_expr(left, row, columns, params)?;
2072 let right = eval_query_result_expr(right, row, columns, params)?;
2073 if left == Value::Null || right == Value::Null {
2074 return Ok(None);
2075 }
2076
2077 let result = match op {
2078 BinOp::Eq => {
2079 compare_values(&left, &right) == Some(Ordering::Equal) || left == right
2080 }
2081 BinOp::Neq => {
2082 !(compare_values(&left, &right) == Some(Ordering::Equal) || left == right)
2083 }
2084 BinOp::Lt => compare_values(&left, &right) == Some(Ordering::Less),
2085 BinOp::Lte => matches!(
2086 compare_values(&left, &right),
2087 Some(Ordering::Less | Ordering::Equal)
2088 ),
2089 BinOp::Gt => compare_values(&left, &right) == Some(Ordering::Greater),
2090 BinOp::Gte => matches!(
2091 compare_values(&left, &right),
2092 Some(Ordering::Greater | Ordering::Equal)
2093 ),
2094 BinOp::And | BinOp::Or => unreachable!(),
2095 };
2096 Ok(Some(result))
2097 }
2098 BinOp::And => {
2099 let left = eval_query_result_bool_expr(row, columns, left, params)?;
2100 if left == Some(false) {
2101 return Ok(Some(false));
2102 }
2103 let right = eval_query_result_bool_expr(row, columns, right, params)?;
2104 Ok(match (left, right) {
2105 (Some(true), Some(true)) => Some(true),
2106 (Some(true), other) => other,
2107 (None, Some(false)) => Some(false),
2108 (None, Some(true)) | (None, None) => None,
2109 (Some(false), _) => Some(false),
2110 })
2111 }
2112 BinOp::Or => {
2113 let left = eval_query_result_bool_expr(row, columns, left, params)?;
2114 if left == Some(true) {
2115 return Ok(Some(true));
2116 }
2117 let right = eval_query_result_bool_expr(row, columns, right, params)?;
2118 Ok(match (left, right) {
2119 (Some(false), Some(false)) => Some(false),
2120 (Some(false), other) => other,
2121 (None, Some(true)) => Some(true),
2122 (None, Some(false)) | (None, None) => None,
2123 (Some(true), _) => Some(true),
2124 })
2125 }
2126 },
2127 Expr::UnaryOp {
2128 op: UnaryOp::Not,
2129 operand,
2130 } => Ok(eval_query_result_bool_expr(row, columns, operand, params)?.map(|value| !value)),
2131 Expr::InList {
2132 expr,
2133 list,
2134 negated,
2135 } => {
2136 let needle = eval_query_result_expr(expr, row, columns, params)?;
2137 if needle == Value::Null {
2138 return Ok(None);
2139 }
2140
2141 let matched = list.iter().try_fold(false, |found, item| {
2142 if found {
2143 Ok(true)
2144 } else {
2145 let candidate = eval_query_result_expr(item, row, columns, params)?;
2146 Ok(
2147 matches!(compare_values(&needle, &candidate), Some(Ordering::Equal))
2148 || (candidate != Value::Null && needle == candidate),
2149 )
2150 }
2151 })?;
2152 Ok(Some(if *negated { !matched } else { matched }))
2153 }
2154 Expr::InSubquery { .. } => Err(Error::PlanError(
2155 "IN (subquery) must be resolved before execution".to_string(),
2156 )),
2157 Expr::Like {
2158 expr,
2159 pattern,
2160 negated,
2161 } => {
2162 let left = eval_query_result_expr(expr, row, columns, params)?;
2163 let right = eval_query_result_expr(pattern, row, columns, params)?;
2164 let matched = match (left, right) {
2165 (Value::Text(value), Value::Text(pattern)) => like_matches(&value, &pattern),
2166 _ => false,
2167 };
2168 Ok(Some(if *negated { !matched } else { matched }))
2169 }
2170 Expr::IsNull { expr, negated } => {
2171 let is_null = eval_query_result_expr(expr, row, columns, params)? == Value::Null;
2172 Ok(Some(if *negated { !is_null } else { is_null }))
2173 }
2174 Expr::FunctionCall { .. } => match eval_query_result_expr(expr, row, columns, params)? {
2175 Value::Bool(value) => Ok(Some(value)),
2176 Value::Null => Ok(None),
2177 _ => Err(Error::PlanError(format!(
2178 "unsupported WHERE expression: {:?}",
2179 expr
2180 ))),
2181 },
2182 _ => Err(Error::PlanError(format!(
2183 "unsupported WHERE expression: {:?}",
2184 expr
2185 ))),
2186 }
2187}
2188
2189fn lookup_query_result_column(
2190 row: &[Value],
2191 input_columns: &[String],
2192 column_ref: &ColumnRef,
2193) -> Result<Value> {
2194 if let Some(table) = &column_ref.table {
2195 let qualified = format!("{table}.{}", column_ref.column);
2196 let idx = input_columns
2199 .iter()
2200 .position(|name| name == &qualified)
2201 .or_else(|| {
2202 input_columns
2203 .iter()
2204 .position(|name| name == &column_ref.column)
2205 })
2206 .ok_or_else(|| Error::PlanError(format!("project column not found: {}", qualified)))?;
2207 return Ok(row.get(idx).cloned().unwrap_or(Value::Null));
2208 }
2209
2210 let matches = input_columns
2211 .iter()
2212 .enumerate()
2213 .filter_map(|(idx, name)| {
2214 (name == &column_ref.column
2215 || name.rsplit('.').next() == Some(column_ref.column.as_str()))
2216 .then_some(idx)
2217 })
2218 .collect::<Vec<_>>();
2219
2220 match matches.as_slice() {
2221 [] => Err(Error::PlanError(format!(
2222 "project column not found: {}",
2223 column_ref.column
2224 ))),
2225 [idx] => Ok(row.get(*idx).cloned().unwrap_or(Value::Null)),
2226 _ => Err(Error::PlanError(format!(
2227 "ambiguous column reference: {}",
2228 column_ref.column
2229 ))),
2230 }
2231}
2232
2233fn concatenate_rows(left: &[Value], right: &[Value]) -> Vec<Value> {
2234 let mut combined = Vec::with_capacity(left.len() + right.len());
2235 combined.extend_from_slice(left);
2236 combined.extend_from_slice(right);
2237 combined
2238}
2239
2240fn duplicate_column_names(left: &[String], right: &[String]) -> BTreeSet<String> {
2241 let left_names = left
2242 .iter()
2243 .map(|column| column.rsplit('.').next().unwrap_or(column.as_str()))
2244 .collect::<BTreeSet<_>>();
2245 right
2246 .iter()
2247 .filter_map(|column| {
2248 let bare = column.rsplit('.').next().unwrap_or(column.as_str());
2249 left_names.contains(bare).then(|| bare.to_string())
2250 })
2251 .collect()
2252}
2253
2254fn qualify_join_columns(
2255 columns: &[String],
2256 left_columns: &[String],
2257 right_columns: &[String],
2258 left_alias: &Option<String>,
2259 right_prefix: &str,
2260) -> Vec<String> {
2261 let left_prefix = left_alias.as_deref();
2262 columns
2263 .iter()
2264 .enumerate()
2265 .map(|(idx, column)| {
2266 if idx < left_columns.len() {
2267 if let Some(prefix) = left_prefix {
2268 format!(
2269 "{prefix}.{}",
2270 left_columns[idx].rsplit('.').next().unwrap_or(column)
2271 )
2272 } else {
2273 left_columns[idx].clone()
2274 }
2275 } else {
2276 let right_idx = idx - left_columns.len();
2277 let bare = right_columns[right_idx]
2278 .rsplit('.')
2279 .next()
2280 .unwrap_or(right_columns[right_idx].as_str());
2281 if column == bare {
2282 format!("{right_prefix}.{bare}")
2283 } else {
2284 column.clone()
2285 }
2286 }
2287 })
2288 .collect()
2289}
2290
2291fn right_table_name(plan: &PhysicalPlan) -> String {
2292 match plan {
2293 PhysicalPlan::Scan { table, alias, .. } => alias.clone().unwrap_or_else(|| table.clone()),
2294 _ => "right".to_string(),
2295 }
2296}
2297
2298fn distinct_row_key(row: &[Value]) -> Vec<u8> {
2299 bincode::serde::encode_to_vec(row, bincode::config::standard())
2300 .expect("query rows should serialize for DISTINCT")
2301}
2302
2303fn resolve_uuid(expr: &Expr, params: &HashMap<String, Value>) -> Result<uuid::Uuid> {
2304 match resolve_expr(expr, params)? {
2305 Value::Uuid(u) => Ok(u),
2306 Value::Text(t) => uuid::Uuid::parse_str(&t)
2307 .map_err(|e| Error::PlanError(format!("invalid uuid '{}': {}", t, e))),
2308 _ => Err(Error::PlanError(
2309 "graph start node must be UUID".to_string(),
2310 )),
2311 }
2312}
2313
2314fn resolve_graph_start_nodes_from_filter(
2317 db: &Database,
2318 filter: &Expr,
2319 params: &HashMap<String, Value>,
2320) -> Result<Vec<uuid::Uuid>> {
2321 if let Some(ids) = resolve_graph_start_ids_from_filter(filter, params)? {
2322 return Ok(ids);
2323 }
2324
2325 let (col_name, expected_value) = match filter {
2327 Expr::BinaryOp {
2328 left,
2329 op: BinOp::Eq,
2330 right,
2331 } => {
2332 if let Some(col) = extract_column_name(left) {
2333 (col, resolve_expr(right, params)?)
2334 } else if let Some(col) = extract_column_name(right) {
2335 (col, resolve_expr(left, params)?)
2336 } else {
2337 return Ok(vec![]);
2338 }
2339 }
2340 _ => return Ok(vec![]),
2341 };
2342
2343 let snapshot = db.snapshot();
2344 let mut uuids = Vec::new();
2345 for table_name in db.table_names() {
2346 let meta = match db.table_meta(&table_name) {
2347 Some(m) => m,
2348 None => continue,
2349 };
2350 let has_col = meta.columns.iter().any(|c| c.name == col_name);
2352 let has_id = meta.columns.iter().any(|c| c.name == "id");
2353 if !has_col || !has_id {
2354 continue;
2355 }
2356 let rows = db.scan_filter(&table_name, snapshot, &|row| {
2357 row.values.get(&col_name) == Some(&expected_value)
2358 })?;
2359 for row in rows {
2360 if let Some(Value::Uuid(id)) = row.values.get("id") {
2361 uuids.push(*id);
2362 }
2363 }
2364 }
2365 Ok(uuids)
2366}
2367
2368fn resolve_graph_start_nodes_from_plan(
2369 db: &Database,
2370 plan: &PhysicalPlan,
2371 params: &HashMap<String, Value>,
2372 tx: Option<TxId>,
2373) -> Result<Vec<uuid::Uuid>> {
2374 let result = execute_plan(db, plan, params, tx)?;
2375 result
2376 .rows
2377 .into_iter()
2378 .filter_map(|row| row.into_iter().next())
2379 .map(|value| match value {
2380 Value::Uuid(id) => Ok(id),
2381 Value::Text(text) => uuid::Uuid::parse_str(&text)
2382 .map_err(|_| Error::PlanError(format!("invalid UUID in graph start plan: {text}"))),
2383 other => Err(Error::PlanError(format!(
2384 "invalid graph start identifier from plan: {other:?}"
2385 ))),
2386 })
2387 .collect()
2388}
2389
2390fn resolve_graph_start_ids_from_filter(
2391 filter: &Expr,
2392 params: &HashMap<String, Value>,
2393) -> Result<Option<Vec<uuid::Uuid>>> {
2394 match filter {
2395 Expr::BinaryOp {
2396 left,
2397 op: BinOp::Eq,
2398 right,
2399 } if is_graph_id_ref(left) || is_graph_id_ref(right) => {
2400 let value = if is_graph_id_ref(left) {
2401 resolve_expr(right, params)?
2402 } else {
2403 resolve_expr(left, params)?
2404 };
2405 let id = match value {
2406 Value::Uuid(id) => id,
2407 Value::Text(text) => uuid::Uuid::parse_str(&text).map_err(|_| {
2408 Error::PlanError(format!("invalid UUID in graph filter: {text}"))
2409 })?,
2410 other => {
2411 return Err(Error::PlanError(format!(
2412 "invalid graph start identifier in filter: {other:?}"
2413 )));
2414 }
2415 };
2416 Ok(Some(vec![id]))
2417 }
2418 Expr::InList { expr, list, .. } if is_graph_id_ref(expr) => {
2419 let ids = list
2420 .iter()
2421 .map(|item| resolve_expr(item, params))
2422 .map(|value| match value? {
2423 Value::Uuid(id) => Ok(id),
2424 Value::Text(text) => uuid::Uuid::parse_str(&text).map_err(|_| {
2425 Error::PlanError(format!("invalid UUID in graph filter: {text}"))
2426 }),
2427 other => Err(Error::PlanError(format!(
2428 "invalid graph start identifier in filter: {other:?}"
2429 ))),
2430 })
2431 .collect::<Result<Vec<_>>>()?;
2432 Ok(Some(ids))
2433 }
2434 Expr::BinaryOp { left, right, .. } => {
2435 if let Some(ids) = resolve_graph_start_ids_from_filter(left, params)? {
2436 return Ok(Some(ids));
2437 }
2438 resolve_graph_start_ids_from_filter(right, params)
2439 }
2440 Expr::UnaryOp { operand, .. } => resolve_graph_start_ids_from_filter(operand, params),
2441 _ => Ok(None),
2442 }
2443}
2444
2445fn is_graph_id_ref(expr: &Expr) -> bool {
2446 matches!(
2447 expr,
2448 Expr::Column(contextdb_parser::ast::ColumnRef { column, .. }) if column == "id"
2449 )
2450}
2451
2452fn extract_column_name(expr: &Expr) -> Option<String> {
2454 match expr {
2455 Expr::Column(contextdb_parser::ast::ColumnRef { column, .. }) => Some(column.clone()),
2456 _ => None,
2457 }
2458}
2459
2460fn resolve_vector_from_expr(expr: &Expr, params: &HashMap<String, Value>) -> Result<Vec<f32>> {
2461 match resolve_expr(expr, params)? {
2462 Value::Vector(v) => Ok(v),
2463 Value::Text(name) => match params.get(&name) {
2464 Some(Value::Vector(v)) => Ok(v.clone()),
2465 _ => Err(Error::PlanError("vector parameter missing".to_string())),
2466 },
2467 _ => Err(Error::PlanError(
2468 "invalid vector query expression".to_string(),
2469 )),
2470 }
2471}
2472
2473fn validate_vector_columns(
2474 db: &Database,
2475 table: &str,
2476 values: &HashMap<String, Value>,
2477) -> Result<()> {
2478 let Some(meta) = db.table_meta(table) else {
2479 return Ok(());
2480 };
2481
2482 for column in &meta.columns {
2483 if let contextdb_core::ColumnType::Vector(expected) = column.column_type
2484 && let Some(Value::Vector(vector)) = values.get(&column.name)
2485 {
2486 let got = vector.len();
2487 if got != expected {
2488 return Err(Error::VectorDimensionMismatch { expected, got });
2489 }
2490 }
2491 }
2492
2493 Ok(())
2494}
2495
2496fn vector_value_for_table<'a>(
2497 db: &Database,
2498 table: &str,
2499 values: &'a HashMap<String, Value>,
2500) -> Option<&'a Vec<f32>> {
2501 let meta = db.table_meta(table)?;
2502 meta.columns
2503 .iter()
2504 .find_map(|column| match column.column_type {
2505 contextdb_core::ColumnType::Vector(_) => match values.get(&column.name) {
2506 Some(Value::Vector(vector)) => Some(vector),
2507 _ => None,
2508 },
2509 _ => None,
2510 })
2511}
2512
2513fn coerce_value_for_column(db: &Database, table: &str, col: &str, v: Value) -> Result<Value> {
2514 let Some(meta) = db.table_meta(table) else {
2515 return Ok(coerce_uuid_if_needed(col, v));
2516 };
2517 let Some(column) = meta.columns.iter().find(|c| c.name == col) else {
2518 return Ok(coerce_uuid_if_needed(col, v));
2519 };
2520
2521 match column.column_type {
2522 contextdb_core::ColumnType::Uuid => coerce_uuid_value(v),
2523 contextdb_core::ColumnType::Timestamp => coerce_timestamp_value(v),
2524 contextdb_core::ColumnType::Vector(dim) => coerce_vector_value(v, dim),
2525 _ => Ok(coerce_uuid_if_needed(col, v)),
2526 }
2527}
2528
2529fn coerce_uuid_value(v: Value) -> Result<Value> {
2530 match v {
2531 Value::Null => Ok(Value::Null),
2532 Value::Uuid(id) => Ok(Value::Uuid(id)),
2533 Value::Text(text) => uuid::Uuid::parse_str(&text)
2534 .map(Value::Uuid)
2535 .map_err(|err| Error::Other(format!("invalid UUID literal '{text}': {err}"))),
2536 other => Err(Error::Other(format!(
2537 "UUID column requires UUID or text literal, got {other:?}"
2538 ))),
2539 }
2540}
2541
2542fn coerce_uuid_if_needed(col: &str, v: Value) -> Value {
2543 if (col == "id" || col.ends_with("_id"))
2544 && let Value::Text(s) = &v
2545 && let Ok(u) = uuid::Uuid::parse_str(s)
2546 {
2547 return Value::Uuid(u);
2548 }
2549 v
2550}
2551
2552fn coerce_timestamp_value(v: Value) -> Result<Value> {
2553 match v {
2554 Value::Null => Ok(Value::Null),
2555 Value::Text(text) if text.eq_ignore_ascii_case("infinity") => {
2556 Ok(Value::Timestamp(i64::MAX))
2557 }
2558 Value::Text(text) => {
2559 let parsed = OffsetDateTime::parse(&text, &Rfc3339).map_err(|err| {
2560 Error::Other(format!("invalid TIMESTAMP literal '{text}': {err}"))
2561 })?;
2562 Ok(Value::Timestamp(
2563 parsed.unix_timestamp_nanos() as i64 / 1_000_000,
2564 ))
2565 }
2566 other => Ok(other),
2567 }
2568}
2569
2570fn coerce_vector_value(v: Value, expected_dim: usize) -> Result<Value> {
2571 let vector = match v {
2572 Value::Null => return Ok(Value::Null),
2573 Value::Vector(vector) => vector,
2574 Value::Text(text) => parse_text_vector_literal(&text)?,
2575 other => return Ok(other),
2576 };
2577
2578 if vector.len() != expected_dim {
2579 return Err(Error::VectorDimensionMismatch {
2580 expected: expected_dim,
2581 got: vector.len(),
2582 });
2583 }
2584
2585 Ok(Value::Vector(vector))
2586}
2587
2588fn parse_text_vector_literal(text: &str) -> Result<Vec<f32>> {
2589 let trimmed = text.trim();
2590 let inner = trimmed
2591 .strip_prefix('[')
2592 .and_then(|s| s.strip_suffix(']'))
2593 .ok_or_else(|| Error::Other(format!("invalid VECTOR literal '{text}'")))?;
2594
2595 if inner.trim().is_empty() {
2596 return Ok(Vec::new());
2597 }
2598
2599 inner
2600 .split(',')
2601 .map(|part| {
2602 part.trim().parse::<f32>().map_err(|err| {
2603 Error::Other(format!("invalid VECTOR component '{}': {err}", part.trim()))
2604 })
2605 })
2606 .collect()
2607}
2608
2609fn apply_missing_column_defaults(
2610 db: &Database,
2611 table: &str,
2612 values: &mut HashMap<String, Value>,
2613) -> Result<()> {
2614 let Some(meta) = db.table_meta(table) else {
2615 return Ok(());
2616 };
2617
2618 for column in &meta.columns {
2619 if values.contains_key(&column.name) {
2620 continue;
2621 }
2622 let Some(default) = &column.default else {
2623 continue;
2624 };
2625 let value = evaluate_stored_default_expr(default)?;
2626 values.insert(
2627 column.name.clone(),
2628 coerce_value_for_column(db, table, &column.name, value)?,
2629 );
2630 }
2631
2632 Ok(())
2633}
2634
2635fn evaluate_stored_default_expr(default: &str) -> Result<Value> {
2636 if default.eq_ignore_ascii_case("NOW()") {
2637 return eval_function("now", &[]);
2638 }
2639 if default.contains("FunctionCall") && default.contains("name: \"NOW\"") {
2640 return eval_function("now", &[]);
2641 }
2642 if default == "Literal(Null)" || default.eq_ignore_ascii_case("NULL") {
2643 return Ok(Value::Null);
2644 }
2645 if default.eq_ignore_ascii_case("TRUE") {
2646 return Ok(Value::Bool(true));
2647 }
2648 if default.eq_ignore_ascii_case("FALSE") {
2649 return Ok(Value::Bool(false));
2650 }
2651 if default.starts_with('\'') && default.ends_with('\'') && default.len() >= 2 {
2652 return Ok(Value::Text(
2653 default[1..default.len() - 1].replace("''", "'"),
2654 ));
2655 }
2656 if let Some(text) = default
2657 .strip_prefix("Literal(Text(\"")
2658 .and_then(|value| value.strip_suffix("\"))"))
2659 {
2660 return Ok(Value::Text(text.to_string()));
2661 }
2662 if let Some(value) = default
2663 .strip_prefix("Literal(Integer(")
2664 .and_then(|value| value.strip_suffix("))"))
2665 {
2666 let parsed = value.parse::<i64>().map_err(|err| {
2667 Error::Other(format!("invalid stored integer default '{value}': {err}"))
2668 })?;
2669 return Ok(Value::Int64(parsed));
2670 }
2671 if let Some(value) = default
2672 .strip_prefix("Literal(Real(")
2673 .and_then(|value| value.strip_suffix("))"))
2674 {
2675 let parsed = value
2676 .parse::<f64>()
2677 .map_err(|err| Error::Other(format!("invalid stored real default '{value}': {err}")))?;
2678 return Ok(Value::Float64(parsed));
2679 }
2680 if let Some(value) = default
2681 .strip_prefix("Literal(Bool(")
2682 .and_then(|value| value.strip_suffix("))"))
2683 {
2684 let parsed = value
2685 .parse::<bool>()
2686 .map_err(|err| Error::Other(format!("invalid stored bool default '{value}': {err}")))?;
2687 return Ok(Value::Bool(parsed));
2688 }
2689
2690 Err(Error::Other(format!(
2691 "unsupported stored DEFAULT expression: {default}"
2692 )))
2693}
2694
2695pub(crate) fn stored_default_expr(expr: &Expr) -> String {
2696 match expr {
2697 Expr::Literal(Literal::Null) => "NULL".to_string(),
2698 Expr::Literal(Literal::Bool(value)) => {
2699 if *value {
2700 "TRUE".to_string()
2701 } else {
2702 "FALSE".to_string()
2703 }
2704 }
2705 Expr::Literal(Literal::Integer(value)) => value.to_string(),
2706 Expr::Literal(Literal::Real(value)) => value.to_string(),
2707 Expr::Literal(Literal::Text(value)) => format!("'{}'", value.replace('\'', "''")),
2708 Expr::FunctionCall { name, args }
2709 if name.eq_ignore_ascii_case("NOW") && args.is_empty() =>
2710 {
2711 "NOW()".to_string()
2712 }
2713 _ => format!("{expr:?}"),
2714 }
2715}
2716
2717fn should_route_insert_to_graph(db: &Database, table: &str) -> bool {
2718 table.eq_ignore_ascii_case("edges")
2719 || db
2720 .table_meta(table)
2721 .is_some_and(|table_meta| !table_meta.dag_edge_types.is_empty())
2722}
2723
2724fn validate_expires_column(col: &contextdb_parser::ast::ColumnDef) -> Result<()> {
2725 if col.expires && !matches!(col.data_type, DataType::Timestamp) {
2726 return Err(Error::Other(
2727 "EXPIRES is only valid on TIMESTAMP columns".to_string(),
2728 ));
2729 }
2730 Ok(())
2731}
2732
2733fn expires_column_name(columns: &[contextdb_parser::ast::ColumnDef]) -> Result<Option<String>> {
2734 let mut expires_column = None;
2735 for col in columns {
2736 validate_expires_column(col)?;
2737 if col.expires {
2738 if expires_column.is_some() {
2739 return Err(Error::Other(
2740 "only one EXPIRES column is supported per table".to_string(),
2741 ));
2742 }
2743 expires_column = Some(col.name.clone());
2744 }
2745 }
2746 Ok(expires_column)
2747}
2748
2749pub(crate) fn map_column_type(dtype: &DataType) -> contextdb_core::ColumnType {
2750 match dtype {
2751 DataType::Uuid => contextdb_core::ColumnType::Uuid,
2752 DataType::Text => contextdb_core::ColumnType::Text,
2753 DataType::Integer => contextdb_core::ColumnType::Integer,
2754 DataType::Real => contextdb_core::ColumnType::Real,
2755 DataType::Boolean => contextdb_core::ColumnType::Boolean,
2756 DataType::Timestamp => contextdb_core::ColumnType::Timestamp,
2757 DataType::Json => contextdb_core::ColumnType::Json,
2758 DataType::Vector(dim) => contextdb_core::ColumnType::Vector(*dim as usize),
2759 }
2760}
2761
2762fn parse_conflict_policy(s: &str) -> Result<ConflictPolicy> {
2763 match s {
2764 "latest_wins" => Ok(ConflictPolicy::LatestWins),
2765 "server_wins" => Ok(ConflictPolicy::ServerWins),
2766 "edge_wins" => Ok(ConflictPolicy::EdgeWins),
2767 "insert_if_not_exists" => Ok(ConflictPolicy::InsertIfNotExists),
2768 _ => Err(Error::Other(format!("unknown conflict policy: {s}"))),
2769 }
2770}
2771
2772fn conflict_policy_to_string(p: ConflictPolicy) -> String {
2773 match p {
2774 ConflictPolicy::LatestWins => "latest_wins".to_string(),
2775 ConflictPolicy::ServerWins => "server_wins".to_string(),
2776 ConflictPolicy::EdgeWins => "edge_wins".to_string(),
2777 ConflictPolicy::InsertIfNotExists => "insert_if_not_exists".to_string(),
2778 }
2779}
2780
2781fn project_graph_frontier_rows(
2782 frontier: Vec<(HashMap<String, uuid::Uuid>, uuid::Uuid, u32)>,
2783 start_alias: &str,
2784 steps: &[GraphStepPlan],
2785) -> Result<Vec<Vec<Value>>> {
2786 frontier
2787 .into_iter()
2788 .map(|(bindings, id, depth)| {
2789 let mut row = Vec::with_capacity(steps.len() + 3);
2790 let start_id = bindings.get(start_alias).ok_or_else(|| {
2791 Error::PlanError(format!(
2792 "graph frontier missing required start alias binding '{start_alias}'"
2793 ))
2794 })?;
2795 row.push(Value::Uuid(*start_id));
2796 for step in steps {
2797 let target_id = bindings.get(&step.target_alias).ok_or_else(|| {
2798 Error::PlanError(format!(
2799 "graph frontier missing required target alias binding '{}'",
2800 step.target_alias
2801 ))
2802 })?;
2803 row.push(Value::Uuid(*target_id));
2804 }
2805 row.push(Value::Uuid(id));
2806 row.push(Value::Int64(depth as i64));
2807 Ok(row)
2808 })
2809 .collect()
2810}
2811
2812#[cfg(test)]
2813mod tests {
2814 use super::*;
2815 use contextdb_planner::GraphStepPlan;
2816 use uuid::Uuid;
2817
2818 #[test]
2819 fn graph_01_frontier_projection_requires_complete_bindings() {
2820 let steps = vec![GraphStepPlan {
2821 edge_types: vec!["EDGE".to_string()],
2822 direction: Direction::Outgoing,
2823 min_depth: 1,
2824 max_depth: 1,
2825 target_alias: "b".to_string(),
2826 }];
2827
2828 let missing_start = vec![(HashMap::new(), Uuid::new_v4(), 0)];
2829 let missing_target = vec![(
2830 HashMap::from([("a".to_string(), Uuid::new_v4())]),
2831 Uuid::new_v4(),
2832 0,
2833 )];
2834
2835 let start_result = project_graph_frontier_rows(missing_start, "a", &steps);
2836 assert!(
2837 matches!(start_result, Err(Error::PlanError(_))),
2838 "graph frontier projection should return a plan error on missing start alias binding, got {start_result:?}"
2839 );
2840
2841 let target_result = project_graph_frontier_rows(missing_target, "a", &steps);
2842 assert!(
2843 matches!(target_result, Err(Error::PlanError(_))),
2844 "graph frontier projection should return a plan error on missing target alias binding, got {target_result:?}"
2845 );
2846 }
2847}