1use crate::database::{Database, QueryResult};
2use crate::sync_types::ConflictPolicy;
3use contextdb_core::*;
4use contextdb_parser::ast::{
5 AlterAction, BinOp, ColumnRef, Cte, DataType, Expr, Literal, SelectStatement,
6 SetDiskLimitValue, SetMemoryLimitValue, SortDirection, Statement, UnaryOp,
7};
8use contextdb_planner::{
9 DeletePlan, GraphStepPlan, InsertPlan, OnConflictPlan, PhysicalPlan, UpdatePlan, plan,
10};
11use roaring::RoaringTreemap;
12use std::cmp::Ordering;
13use std::collections::{BTreeSet, HashMap, HashSet};
14use std::time::{SystemTime, UNIX_EPOCH};
15use time::OffsetDateTime;
16use time::format_description::well_known::Rfc3339;
17
18pub(crate) fn execute_plan(
19 db: &Database,
20 plan: &PhysicalPlan,
21 params: &HashMap<String, Value>,
22 tx: Option<TxId>,
23) -> Result<QueryResult> {
24 match plan {
25 PhysicalPlan::CreateTable(p) => {
26 db.check_disk_budget("CREATE TABLE")?;
27 let expires_column = expires_column_name(&p.columns)?;
28 let meta = TableMeta {
29 columns: p
30 .columns
31 .iter()
32 .map(|c| contextdb_core::ColumnDef {
33 name: c.name.clone(),
34 column_type: map_column_type(&c.data_type),
35 nullable: c.nullable,
36 primary_key: c.primary_key,
37 unique: c.unique,
38 default: c.default.as_ref().map(stored_default_expr),
39 expires: c.expires,
40 })
41 .collect(),
42 immutable: p.immutable,
43 state_machine: p.state_machine.as_ref().map(|sm| StateMachineConstraint {
44 column: sm.column.clone(),
45 transitions: sm
46 .transitions
47 .iter()
48 .map(|(from, tos)| (from.clone(), tos.clone()))
49 .collect(),
50 }),
51 dag_edge_types: p.dag_edge_types.clone(),
52 natural_key_column: None,
53 propagation_rules: p.propagation_rules.clone(),
54 default_ttl_seconds: p.retain.as_ref().map(|r| r.duration_seconds),
55 sync_safe: p.retain.as_ref().is_some_and(|r| r.sync_safe),
56 expires_column,
57 };
58 let metadata_bytes = meta.estimated_bytes();
59 db.accountant().try_allocate_for(
60 metadata_bytes,
61 "ddl",
62 "create_table",
63 "Reduce schema size or raise MEMORY_LIMIT before creating more tables.",
64 )?;
65 db.relational_store().create_table(&p.name, meta);
66 if let Some(table_meta) = db.table_meta(&p.name) {
67 db.persist_table_meta(&p.name, &table_meta)?;
68 db.allocate_ddl_lsn(|lsn| db.log_create_table_ddl(&p.name, &table_meta, lsn));
69 }
70 Ok(QueryResult::empty_with_affected(0))
71 }
72 PhysicalPlan::DropTable(name) => {
73 let bytes_to_release = estimate_drop_table_bytes(db, name);
74 db.drop_table_aux_state(name);
75 db.relational_store().drop_table(name);
76 db.remove_persisted_table(name)?;
77 db.allocate_ddl_lsn(|lsn| db.log_drop_table_ddl(name, lsn));
78 db.accountant().release(bytes_to_release);
79 Ok(QueryResult::empty_with_affected(0))
80 }
81 PhysicalPlan::AlterTable(p) => {
82 db.check_disk_budget("ALTER TABLE")?;
83 let store = db.relational_store();
84 match &p.action {
85 AlterAction::AddColumn(col) => {
86 if col.primary_key {
87 return Err(Error::Other(
88 "adding a primary key column via ALTER TABLE is not supported"
89 .to_string(),
90 ));
91 }
92 validate_expires_column(col)?;
93 let core_col = contextdb_core::ColumnDef {
94 name: col.name.clone(),
95 column_type: map_column_type(&col.data_type),
96 nullable: col.nullable,
97 primary_key: col.primary_key,
98 unique: col.unique,
99 default: col.default.as_ref().map(stored_default_expr),
100 expires: col.expires,
101 };
102 store
103 .alter_table_add_column(&p.table, core_col)
104 .map_err(Error::Other)?;
105 if col.expires {
106 let mut meta = store.table_meta.write();
107 let table_meta = meta.get_mut(&p.table).ok_or_else(|| {
108 Error::Other(format!("table '{}' not found", p.table))
109 })?;
110 table_meta.expires_column = Some(col.name.clone());
111 }
112 }
113 AlterAction::DropColumn(name) => {
114 store
115 .alter_table_drop_column(&p.table, name)
116 .map_err(Error::Other)?;
117 let mut meta = store.table_meta.write();
118 if let Some(table_meta) = meta.get_mut(&p.table)
119 && table_meta.expires_column.as_deref() == Some(name.as_str())
120 {
121 table_meta.expires_column = None;
122 }
123 }
124 AlterAction::RenameColumn { from, to } => {
125 store
126 .alter_table_rename_column(&p.table, from, to)
127 .map_err(Error::Other)?;
128 let mut meta = store.table_meta.write();
129 if let Some(table_meta) = meta.get_mut(&p.table)
130 && table_meta.expires_column.as_deref() == Some(from.as_str())
131 {
132 table_meta.expires_column = Some(to.clone());
133 }
134 }
135 AlterAction::SetRetain {
136 duration_seconds,
137 sync_safe,
138 } => {
139 let mut meta = store.table_meta.write();
140 let table_meta = meta
141 .get_mut(&p.table)
142 .ok_or_else(|| Error::Other(format!("table '{}' not found", p.table)))?;
143 if table_meta.immutable {
144 return Err(Error::Other(
145 "IMMUTABLE and RETAIN are mutually exclusive".to_string(),
146 ));
147 }
148 table_meta.default_ttl_seconds = Some(*duration_seconds);
149 table_meta.sync_safe = *sync_safe;
150 }
151 AlterAction::DropRetain => {
152 let mut meta = store.table_meta.write();
153 let table_meta = meta
154 .get_mut(&p.table)
155 .ok_or_else(|| Error::Other(format!("table '{}' not found", p.table)))?;
156 table_meta.default_ttl_seconds = None;
157 table_meta.sync_safe = false;
158 }
159 AlterAction::SetSyncConflictPolicy(policy) => {
160 let cp = parse_conflict_policy(policy)?;
161 db.set_table_conflict_policy(&p.table, cp);
162 }
163 AlterAction::DropSyncConflictPolicy => {
164 db.drop_table_conflict_policy(&p.table);
165 }
166 }
167 if let Some(table_meta) = db.table_meta(&p.table) {
168 db.persist_table_meta(&p.table, &table_meta)?;
169 if !matches!(
170 p.action,
171 AlterAction::AddColumn(_)
172 | AlterAction::SetRetain { .. }
173 | AlterAction::DropRetain
174 | AlterAction::SetSyncConflictPolicy(_)
175 | AlterAction::DropSyncConflictPolicy
176 ) {
177 db.persist_table_rows(&p.table)?;
178 }
179 db.allocate_ddl_lsn(|lsn| db.log_alter_table_ddl(&p.table, &table_meta, lsn));
180 }
181 Ok(QueryResult::empty_with_affected(0))
182 }
183 PhysicalPlan::Insert(p) => exec_insert(db, p, params, tx),
184 PhysicalPlan::Delete(p) => exec_delete(db, p, params, tx),
185 PhysicalPlan::Update(p) => exec_update(db, p, params, tx),
186 PhysicalPlan::Scan { table, filter, .. } => {
187 if table == "dual" {
188 return Ok(QueryResult {
189 columns: vec![],
190 rows: vec![vec![]],
191 rows_affected: 0,
192 });
193 }
194 let snapshot = db.snapshot();
195 let rows = db.scan(table, snapshot)?;
196 let schema_columns = db.table_meta(table).map(|meta| {
197 meta.columns
198 .into_iter()
199 .map(|column| column.name)
200 .collect::<Vec<_>>()
201 });
202 let resolved_filter = filter
203 .as_ref()
204 .map(|expr| resolve_in_subqueries(db, expr, params, tx))
205 .transpose()?;
206 materialize_rows(
207 rows,
208 resolved_filter.as_ref(),
209 params,
210 schema_columns.as_deref(),
211 )
212 }
213 PhysicalPlan::GraphBfs {
214 start_alias,
215 start_expr,
216 start_candidates,
217 steps,
218 filter,
219 } => {
220 let start_uuids = match resolve_uuid(start_expr, params) {
221 Ok(start) => vec![start],
222 Err(Error::PlanError(_))
223 if matches!(
224 start_expr,
225 Expr::Column(contextdb_parser::ast::ColumnRef { table: None, .. })
226 ) =>
227 {
228 if let Some(candidate_plan) = start_candidates {
230 resolve_graph_start_nodes_from_plan(db, candidate_plan, params, tx)?
231 } else if let Some(filter_expr) = filter {
232 let resolved_filter = resolve_in_subqueries(db, filter_expr, params, tx)?;
233 resolve_graph_start_nodes_from_filter(db, &resolved_filter, params)?
234 } else {
235 vec![]
236 }
237 }
238 Err(err) => return Err(err),
239 };
240 if start_uuids.is_empty() {
241 return Ok(QueryResult {
242 columns: vec!["id".to_string(), "depth".to_string()],
243 rows: vec![],
244 rows_affected: 0,
245 });
246 }
247 let snapshot = db.snapshot();
248 let mut frontier = start_uuids
249 .into_iter()
250 .map(|id| (HashMap::from([(start_alias.clone(), id)]), id, 0_u32))
251 .collect::<Vec<_>>();
252 let bfs_bytes = estimate_bfs_working_bytes(&frontier, steps);
253 db.accountant().try_allocate_for(
254 bfs_bytes,
255 "bfs_frontier",
256 "graph_bfs",
257 "Reduce traversal depth/fan-out or raise MEMORY_LIMIT before running BFS.",
258 )?;
259
260 let result = (|| {
261 for step in steps {
262 let edge_types_ref = if step.edge_types.is_empty() {
263 None
264 } else {
265 Some(step.edge_types.as_slice())
266 };
267 let mut next = Vec::new();
268
269 for (bindings, start, base_depth) in &frontier {
270 let res = db.graph().bfs(
271 *start,
272 edge_types_ref,
273 step.direction,
274 step.min_depth,
275 step.max_depth,
276 snapshot,
277 )?;
278 for node in res.nodes {
279 let total_depth = base_depth.saturating_add(node.depth);
280 let mut next_bindings = bindings.clone();
281 next_bindings.insert(step.target_alias.clone(), node.id);
282 next.push((next_bindings, node.id, total_depth));
283 }
284 }
285
286 frontier = dedupe_graph_frontier(next, steps);
287 if frontier.is_empty() {
288 break;
289 }
290 }
291
292 let mut columns =
293 steps
294 .iter()
295 .fold(vec![format!("{start_alias}.id")], |mut cols, step| {
296 cols.push(format!("{}.id", step.target_alias));
297 cols
298 });
299 columns.push("id".to_string());
300 columns.push("depth".to_string());
301
302 Ok(QueryResult {
303 columns,
304 rows: project_graph_frontier_rows(frontier, start_alias, steps)?,
305 rows_affected: 0,
306 })
307 })();
308 db.accountant().release(bfs_bytes);
309
310 result
311 }
312 PhysicalPlan::VectorSearch {
313 table,
314 query_expr,
315 k,
316 candidates,
317 ..
318 }
319 | PhysicalPlan::HnswSearch {
320 table,
321 query_expr,
322 k,
323 candidates,
324 ..
325 } => {
326 let query_vec = resolve_vector_from_expr(query_expr, params)?;
327 let snapshot = db.snapshot();
328 let all_rows = db.scan(table, snapshot)?;
329 let candidate_bitmap = if let Some(cands_plan) = candidates {
330 let qr = execute_plan(db, cands_plan, params, tx)?;
331 let mut bm = RoaringTreemap::new();
332 let row_id_idx = qr.columns.iter().position(|column| {
333 column == "row_id" || column.rsplit('.').next() == Some("row_id")
334 });
335 let id_idx = qr
336 .columns
337 .iter()
338 .position(|column| column == "id" || column.rsplit('.').next() == Some("id"));
339
340 if let Some(idx) = row_id_idx {
341 for row in qr.rows {
342 if let Some(Value::Int64(id)) = row.get(idx) {
343 bm.insert(*id as u64);
344 }
345 }
346 } else if let Some(idx) = id_idx {
347 let uuid_to_row_id: HashMap<uuid::Uuid, u64> = all_rows
348 .iter()
349 .filter_map(|row| match row.values.get("id") {
350 Some(Value::Uuid(uuid)) => Some((*uuid, row.row_id)),
351 _ => None,
352 })
353 .collect();
354 for row in qr.rows {
355 if let Some(Value::Uuid(uuid)) = row.get(idx)
356 && let Some(row_id) = uuid_to_row_id.get(uuid)
357 {
358 bm.insert(*row_id);
359 }
360 }
361 }
362 Some(bm)
363 } else {
364 None
365 };
366
367 let vector_bytes = estimate_vector_search_bytes(query_vec.len(), *k as usize);
368 db.accountant().try_allocate_for(
369 vector_bytes,
370 "vector_search",
371 "search",
372 "Reduce LIMIT/dimensionality or raise MEMORY_LIMIT before vector search.",
373 )?;
374 let res = db.query_vector(
375 &query_vec,
376 *k as usize,
377 candidate_bitmap.as_ref(),
378 db.snapshot(),
379 );
380 db.accountant().release(vector_bytes);
381 let res = res?;
382
383 let schema_columns = db.table_meta(table).map(|meta| {
385 meta.columns
386 .into_iter()
387 .map(|column| column.name)
388 .collect::<Vec<_>>()
389 });
390 let keys = if let Some(ref sc) = schema_columns {
391 sc.clone()
392 } else {
393 let mut ks = BTreeSet::new();
394 for r in &all_rows {
395 for k in r.values.keys() {
396 ks.insert(k.clone());
397 }
398 }
399 ks.into_iter().collect::<Vec<_>>()
400 };
401
402 let row_map: HashMap<u64, &VersionedRow> =
403 all_rows.iter().map(|r| (r.row_id, r)).collect();
404
405 let mut columns = vec!["row_id".to_string()];
406 columns.extend(keys.iter().cloned());
407 columns.push("score".to_string());
408
409 let rows = res
410 .into_iter()
411 .filter_map(|(rid, score)| {
412 row_map.get(&rid).map(|row| {
413 let mut out = vec![Value::Int64(rid as i64)];
414 for k in &keys {
415 out.push(row.values.get(k).cloned().unwrap_or(Value::Null));
416 }
417 out.push(Value::Float64(score as f64));
418 out
419 })
420 })
421 .collect();
422
423 Ok(QueryResult {
424 columns,
425 rows,
426 rows_affected: 0,
427 })
428 }
429 PhysicalPlan::MaterializeCte { input, .. } => execute_plan(db, input, params, tx),
430 PhysicalPlan::Project { input, columns } => {
431 let input_result = execute_plan(db, input, params, tx)?;
432 let has_aggregate = columns.iter().any(|column| {
433 matches!(
434 &column.expr,
435 Expr::FunctionCall { name, .. } if name.eq_ignore_ascii_case("count")
436 )
437 });
438 if has_aggregate {
439 if columns.iter().any(|column| {
440 !matches!(
441 &column.expr,
442 Expr::FunctionCall { name, .. } if name.eq_ignore_ascii_case("count")
443 )
444 }) {
445 return Err(Error::PlanError(
446 "mixed aggregate and non-aggregate columns without GROUP BY".to_string(),
447 ));
448 }
449
450 let output_columns = columns
451 .iter()
452 .map(|column| {
453 column.alias.clone().unwrap_or_else(|| match &column.expr {
454 Expr::FunctionCall { name, .. } => name.clone(),
455 _ => "expr".to_string(),
456 })
457 })
458 .collect::<Vec<_>>();
459
460 let aggregate_row = columns
461 .iter()
462 .map(|column| match &column.expr {
463 Expr::FunctionCall { name: _, args } => {
464 let count = if matches!(
465 args.as_slice(),
466 [Expr::Column(contextdb_parser::ast::ColumnRef { table: None, column })]
467 if column == "*"
468 ) {
469 input_result.rows.len() as i64
470 } else {
471 input_result
472 .rows
473 .iter()
474 .filter_map(|row| {
475 args.first().map(|arg| {
476 eval_query_result_expr(
477 arg,
478 row,
479 &input_result.columns,
480 params,
481 )
482 })
483 })
484 .collect::<Result<Vec<_>>>()?
485 .into_iter()
486 .filter(|value| *value != Value::Null)
487 .count() as i64
488 };
489 Ok(Value::Int64(count))
490 }
491 _ => Err(Error::PlanError(
492 "mixed aggregate and non-aggregate columns without GROUP BY"
493 .to_string(),
494 )),
495 })
496 .collect::<Result<Vec<_>>>()?;
497
498 return Ok(QueryResult {
499 columns: output_columns,
500 rows: vec![aggregate_row],
501 rows_affected: 0,
502 });
503 }
504
505 let output_columns = columns
506 .iter()
507 .map(|c| {
508 c.alias.clone().unwrap_or_else(|| match &c.expr {
509 Expr::Column(col) => col.column.clone(),
510 _ => "expr".to_string(),
511 })
512 })
513 .collect::<Vec<_>>();
514
515 let mut output_rows = Vec::with_capacity(input_result.rows.len());
516 for row in &input_result.rows {
517 let mut projected = Vec::with_capacity(columns.len());
518 for col in columns {
519 projected.push(eval_project_expr(
520 &col.expr,
521 row,
522 &input_result.columns,
523 params,
524 )?);
525 }
526 output_rows.push(projected);
527 }
528
529 Ok(QueryResult {
530 columns: output_columns,
531 rows: output_rows,
532 rows_affected: 0,
533 })
534 }
535 PhysicalPlan::Sort { input, keys } => {
536 let mut input_result = execute_plan(db, input, params, tx)?;
537 input_result.rows.sort_by(|left, right| {
538 for key in keys {
539 let Expr::Column(column_ref) = &key.expr else {
540 return Ordering::Equal;
541 };
542 let left_value =
543 match lookup_query_result_column(left, &input_result.columns, column_ref) {
544 Ok(value) => value,
545 Err(_) => return Ordering::Equal,
546 };
547 let right_value = match lookup_query_result_column(
548 right,
549 &input_result.columns,
550 column_ref,
551 ) {
552 Ok(value) => value,
553 Err(_) => return Ordering::Equal,
554 };
555 let ordering = compare_sort_values(&left_value, &right_value, key.direction);
556 if ordering != Ordering::Equal {
557 return ordering;
558 }
559 }
560 Ordering::Equal
561 });
562 Ok(input_result)
563 }
564 PhysicalPlan::Limit { input, count } => {
565 let mut input_result = execute_plan(db, input, params, tx)?;
566 input_result.rows.truncate(*count as usize);
567 Ok(input_result)
568 }
569 PhysicalPlan::Filter { input, predicate } => {
570 let mut input_result = execute_plan(db, input, params, tx)?;
571 input_result.rows.retain(|row| {
572 query_result_row_matches(row, &input_result.columns, predicate, params)
573 .unwrap_or(false)
574 });
575 Ok(input_result)
576 }
577 PhysicalPlan::Distinct { input } => {
578 let input_result = execute_plan(db, input, params, tx)?;
579 let mut seen = HashSet::<Vec<u8>>::new();
580 let rows = input_result
581 .rows
582 .into_iter()
583 .filter(|row| seen.insert(distinct_row_key(row)))
584 .collect();
585 Ok(QueryResult {
586 columns: input_result.columns,
587 rows,
588 rows_affected: input_result.rows_affected,
589 })
590 }
591 PhysicalPlan::Join {
592 left,
593 right,
594 condition,
595 join_type,
596 left_alias,
597 right_alias,
598 } => {
599 let left_result = execute_plan(db, left, params, tx)?;
600 let right_result = execute_plan(db, right, params, tx)?;
601 let right_duplicate_names =
602 duplicate_column_names(&left_result.columns, &right_result.columns);
603 let right_prefix = right_alias
604 .clone()
605 .unwrap_or_else(|| right_table_name(right));
606 let right_columns = right_result
607 .columns
608 .iter()
609 .map(|column| {
610 if right_duplicate_names.contains(column) {
611 format!("{right_prefix}.{column}")
612 } else {
613 column.clone()
614 }
615 })
616 .collect::<Vec<_>>();
617
618 let mut columns = left_result.columns.clone();
619 columns.extend(right_columns);
620
621 let mut rows = Vec::new();
622 for left_row in &left_result.rows {
623 let mut matched = false;
624 for right_row in &right_result.rows {
625 let combined = concatenate_rows(left_row, right_row);
626 if query_result_row_matches(&combined, &columns, condition, params)? {
627 matched = true;
628 rows.push(combined);
629 }
630 }
631
632 if !matched && matches!(join_type, contextdb_planner::JoinType::Left) {
633 let mut combined = left_row.clone();
634 combined.extend(std::iter::repeat_n(Value::Null, right_result.columns.len()));
635 rows.push(combined);
636 }
637 }
638
639 let output_columns = qualify_join_columns(
640 &columns,
641 &left_result.columns,
642 &right_result.columns,
643 left_alias,
644 &right_prefix,
645 );
646
647 Ok(QueryResult {
648 columns: output_columns,
649 rows,
650 rows_affected: 0,
651 })
652 }
653 PhysicalPlan::CreateIndex(_) => Ok(QueryResult::empty_with_affected(0)),
654 PhysicalPlan::SetMemoryLimit(val) => {
655 let limit = match val {
656 SetMemoryLimitValue::Bytes(bytes) => Some(*bytes),
657 SetMemoryLimitValue::None => None,
658 };
659 db.accountant().set_budget(limit)?;
660 db.persist_memory_limit(limit)?;
661 Ok(QueryResult::empty())
662 }
663 PhysicalPlan::ShowMemoryLimit => {
664 let usage = db.accountant().usage();
665 Ok(QueryResult {
666 columns: vec![
667 "limit".to_string(),
668 "used".to_string(),
669 "available".to_string(),
670 "startup_ceiling".to_string(),
671 ],
672 rows: vec![vec![
673 usage
674 .limit
675 .map(|value| Value::Int64(value as i64))
676 .unwrap_or_else(|| Value::Text("none".to_string())),
677 Value::Int64(usage.used as i64),
678 usage
679 .available
680 .map(|value| Value::Int64(value as i64))
681 .unwrap_or_else(|| Value::Text("none".to_string())),
682 usage
683 .startup_ceiling
684 .map(|value| Value::Int64(value as i64))
685 .unwrap_or_else(|| Value::Text("none".to_string())),
686 ]],
687 rows_affected: 0,
688 })
689 }
690 PhysicalPlan::SetDiskLimit(val) => {
691 let limit = match val {
692 SetDiskLimitValue::Bytes(bytes) => Some(*bytes),
693 SetDiskLimitValue::None => None,
694 };
695 db.set_disk_limit(limit)?;
696 db.persist_disk_limit(limit)?;
697 Ok(QueryResult::empty())
698 }
699 PhysicalPlan::ShowDiskLimit => {
700 let limit = db.disk_limit();
701 let used = db.disk_file_size();
702 let startup_ceiling = db.disk_limit_startup_ceiling();
703 Ok(QueryResult {
704 columns: vec![
705 "limit".to_string(),
706 "used".to_string(),
707 "available".to_string(),
708 "startup_ceiling".to_string(),
709 ],
710 rows: vec![vec![
711 limit
712 .map(|value| Value::Int64(value as i64))
713 .unwrap_or_else(|| Value::Text("none".to_string())),
714 used.map(|value| Value::Int64(value as i64))
715 .unwrap_or(Value::Null),
716 match (limit, used) {
717 (Some(limit), Some(used)) => {
718 Value::Int64(limit.saturating_sub(used) as i64)
719 }
720 _ => Value::Null,
721 },
722 startup_ceiling
723 .map(|value| Value::Int64(value as i64))
724 .unwrap_or_else(|| Value::Text("none".to_string())),
725 ]],
726 rows_affected: 0,
727 })
728 }
729 PhysicalPlan::SetSyncConflictPolicy(policy) => {
730 let cp = parse_conflict_policy(policy)?;
731 db.set_default_conflict_policy(cp);
732 Ok(QueryResult::empty())
733 }
734 PhysicalPlan::ShowSyncConflictPolicy => {
735 let policies = db.conflict_policies();
736 let default_str = conflict_policy_to_string(policies.default);
737 let mut rows = vec![vec![Value::Text(default_str)]];
738 for (table, policy) in &policies.per_table {
739 rows.push(vec![Value::Text(format!(
740 "{}={}",
741 table,
742 conflict_policy_to_string(*policy)
743 ))]);
744 }
745 Ok(QueryResult {
746 columns: vec!["policy".to_string()],
747 rows,
748 rows_affected: 0,
749 })
750 }
751 PhysicalPlan::Pipeline(plans) => {
752 let mut last = QueryResult::empty();
753 for p in plans {
754 last = execute_plan(db, p, params, tx)?;
755 }
756 Ok(last)
757 }
758 _ => Err(Error::PlanError(
759 "unsupported plan node in executor".to_string(),
760 )),
761 }
762}
763
764fn eval_project_expr(
765 expr: &Expr,
766 row: &[Value],
767 input_columns: &[String],
768 params: &HashMap<String, Value>,
769) -> Result<Value> {
770 match expr {
771 Expr::Column(c) => lookup_query_result_column(row, input_columns, c),
772 Expr::Literal(lit) => resolve_expr(&Expr::Literal(lit.clone()), params),
773 Expr::Parameter(name) => params
774 .get(name)
775 .cloned()
776 .ok_or_else(|| Error::NotFound(format!("missing parameter: {}", name))),
777 Expr::BinaryOp { left, op, right } => {
778 let left = eval_query_result_expr(left, row, input_columns, params)?;
779 let right = eval_query_result_expr(right, row, input_columns, params)?;
780 eval_binary_op(op, &left, &right)
781 }
782 Expr::UnaryOp { op, operand } => {
783 let value = eval_query_result_expr(operand, row, input_columns, params)?;
784 match op {
785 UnaryOp::Not => Ok(Value::Bool(!value_to_bool(&value))),
786 UnaryOp::Neg => match value {
787 Value::Int64(v) => Ok(Value::Int64(-v)),
788 Value::Float64(v) => Ok(Value::Float64(-v)),
789 _ => Err(Error::PlanError(
790 "cannot negate non-numeric value".to_string(),
791 )),
792 },
793 }
794 }
795 Expr::FunctionCall { name, args } => {
796 let values = args
797 .iter()
798 .map(|arg| eval_query_result_expr(arg, row, input_columns, params))
799 .collect::<Result<Vec<_>>>()?;
800 eval_function(name, &values)
801 }
802 Expr::IsNull { expr, negated } => {
803 let is_null = eval_query_result_expr(expr, row, input_columns, params)? == Value::Null;
804 Ok(Value::Bool(if *negated { !is_null } else { is_null }))
805 }
806 Expr::InList {
807 expr,
808 list,
809 negated,
810 } => {
811 let needle = eval_query_result_expr(expr, row, input_columns, params)?;
812 let matched = list.iter().try_fold(false, |found, item| {
813 if found {
814 Ok(true)
815 } else {
816 let candidate = eval_query_result_expr(item, row, input_columns, params)?;
817 Ok(
818 matches!(compare_values(&needle, &candidate), Some(Ordering::Equal))
819 || (needle != Value::Null
820 && candidate != Value::Null
821 && needle == candidate),
822 )
823 }
824 })?;
825 Ok(Value::Bool(if *negated { !matched } else { matched }))
826 }
827 Expr::Like {
828 expr,
829 pattern,
830 negated,
831 } => {
832 let matches = match (
833 eval_query_result_expr(expr, row, input_columns, params)?,
834 eval_query_result_expr(pattern, row, input_columns, params)?,
835 ) {
836 (Value::Text(value), Value::Text(pattern)) => like_matches(&value, &pattern),
837 _ => false,
838 };
839 Ok(Value::Bool(if *negated { !matches } else { matches }))
840 }
841 _ => resolve_expr(expr, params),
842 }
843}
844
845fn eval_query_result_expr(
846 expr: &Expr,
847 row: &[Value],
848 input_columns: &[String],
849 params: &HashMap<String, Value>,
850) -> Result<Value> {
851 match expr {
852 Expr::Column(c) => lookup_query_result_column(row, input_columns, c),
853 Expr::Literal(lit) => resolve_expr(&Expr::Literal(lit.clone()), params),
854 Expr::Parameter(name) => params
855 .get(name)
856 .cloned()
857 .ok_or_else(|| Error::NotFound(format!("missing parameter: {}", name))),
858 Expr::FunctionCall { name, args } => {
859 let values = args
860 .iter()
861 .map(|arg| eval_query_result_expr(arg, row, input_columns, params))
862 .collect::<Result<Vec<_>>>()?;
863 eval_function(name, &values)
864 }
865 _ => resolve_expr(expr, params),
866 }
867}
868
869fn exec_insert(
870 db: &Database,
871 p: &InsertPlan,
872 params: &HashMap<String, Value>,
873 tx: Option<TxId>,
874) -> Result<QueryResult> {
875 db.check_disk_budget("INSERT")?;
876 let txid = tx.ok_or_else(|| Error::Other("missing tx for insert".to_string()))?;
877
878 let columns: Vec<String> = if p.columns.is_empty() {
881 let meta = db
882 .table_meta(&p.table)
883 .ok_or_else(|| Error::TableNotFound(p.table.clone()))?;
884 meta.columns.iter().map(|c| c.name.clone()).collect()
885 } else {
886 p.columns.clone()
887 };
888
889 let mut rows_affected = 0;
890 for row in &p.values {
891 let mut values = HashMap::new();
892 for (idx, expr) in row.iter().enumerate() {
893 let col = columns
894 .get(idx)
895 .ok_or_else(|| Error::PlanError("column/value count mismatch".to_string()))?;
896 let v = resolve_expr(expr, params)?;
897 values.insert(col.clone(), coerce_value_for_column(db, &p.table, col, v)?);
898 }
899
900 apply_missing_column_defaults(db, &p.table, &mut values)?;
901
902 validate_vector_columns(db, &p.table, &values)?;
903 let row_bytes = estimate_table_row_bytes(db, &p.table, &values)?;
904 db.accountant().try_allocate_for(
905 row_bytes,
906 "insert",
907 "row_insert",
908 "Reduce row size or raise MEMORY_LIMIT before inserting more data.",
909 )?;
910 let checkpoint = db.write_set_checkpoint(txid)?;
911
912 let row_id = if let Some(on_conflict) = &p.on_conflict {
913 let conflict_col = &on_conflict.columns[0];
914 let conflict_value = values
915 .get(conflict_col)
916 .ok_or_else(|| Error::Other("conflict column not in values".to_string()))?;
917 let existing =
918 db.point_lookup(&p.table, conflict_col, conflict_value, db.snapshot())?;
919 let existing_row_id = existing.as_ref().map(|row| row.row_id);
920 let existing_has_vector = existing
921 .as_ref()
922 .is_some_and(|row| db.has_live_vector(row.row_id, db.snapshot()));
923 let upsert_values = if let Some(existing_row) = existing.as_ref() {
924 apply_on_conflict_updates(
925 db,
926 &p.table,
927 values.clone(),
928 existing_row,
929 on_conflict,
930 params,
931 )?
932 } else {
933 values.clone()
934 };
935
936 match db.upsert_row(txid, &p.table, conflict_col, upsert_values) {
937 Ok(UpsertResult::Inserted) => {
938 db.point_lookup_in_tx(
939 txid,
940 &p.table,
941 conflict_col,
942 conflict_value,
943 db.snapshot(),
944 )?
945 .ok_or_else(|| {
946 Error::Other("inserted upsert row not visible in tx".to_string())
947 })?
948 .row_id
949 }
950 Ok(UpsertResult::Updated) => {
951 if existing_has_vector && let Some(existing_row_id) = existing_row_id {
952 db.delete_vector(txid, existing_row_id)?;
953 }
954 db.point_lookup_in_tx(
955 txid,
956 &p.table,
957 conflict_col,
958 conflict_value,
959 db.snapshot(),
960 )?
961 .ok_or_else(|| {
962 Error::Other("updated upsert row not visible in tx".to_string())
963 })?
964 .row_id
965 }
966 Ok(UpsertResult::NoOp) => {
967 db.accountant().release(row_bytes);
968 0
969 }
970 Err(err) => {
971 db.accountant().release(row_bytes);
972 return Err(err);
973 }
974 }
975 } else {
976 match db.insert_row(txid, &p.table, values.clone()) {
977 Ok(row_id) => row_id,
978 Err(err) => {
979 db.accountant().release(row_bytes);
980 return Err(err);
981 }
982 }
983 };
984
985 if should_route_insert_to_graph(db, &p.table)
986 && let (
987 Some(Value::Uuid(source)),
988 Some(Value::Uuid(target)),
989 Some(Value::Text(edge_type)),
990 ) = (
991 values.get("source_id"),
992 values.get("target_id"),
993 values.get("edge_type"),
994 )
995 {
996 match db.insert_edge(txid, *source, *target, edge_type.clone(), HashMap::new()) {
997 Ok(true) => {}
998 Ok(false) => {
999 let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1000 db.accountant().release(row_bytes);
1001 continue;
1002 }
1003 Err(err) => {
1004 let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1005 db.accountant().release(row_bytes);
1006 return Err(err);
1007 }
1008 }
1009 }
1010
1011 if let Some(v) = vector_value_for_table(db, &p.table, &values)
1012 && row_id != 0
1013 && let Err(err) = db.insert_vector(txid, row_id, v.clone())
1014 {
1015 let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1016 db.accountant().release(row_bytes);
1017 return Err(err);
1018 }
1019
1020 rows_affected += 1;
1021 }
1022
1023 Ok(QueryResult::empty_with_affected(rows_affected))
1024}
1025
1026fn exec_delete(
1027 db: &Database,
1028 p: &DeletePlan,
1029 params: &HashMap<String, Value>,
1030 tx: Option<TxId>,
1031) -> Result<QueryResult> {
1032 let txid = tx.ok_or_else(|| Error::Other("missing tx for delete".to_string()))?;
1033 let snapshot = db.snapshot();
1034 let rows = db.scan(&p.table, snapshot)?;
1035 let resolved_where = p
1036 .where_clause
1037 .as_ref()
1038 .map(|expr| resolve_in_subqueries(db, expr, params, tx))
1039 .transpose()?;
1040 let matched: Vec<_> = rows
1041 .into_iter()
1042 .filter(|r| {
1043 resolved_where
1044 .as_ref()
1045 .is_none_or(|w| row_matches(r, w, params).unwrap_or(false))
1046 })
1047 .collect();
1048
1049 for row in &matched {
1050 if db.has_live_vector(row.row_id, snapshot) {
1051 db.delete_vector(txid, row.row_id)?;
1052 }
1053 db.delete_row(txid, &p.table, row.row_id)?;
1054 }
1055
1056 Ok(QueryResult::empty_with_affected(matched.len() as u64))
1057}
1058
1059fn exec_update(
1060 db: &Database,
1061 p: &UpdatePlan,
1062 params: &HashMap<String, Value>,
1063 tx: Option<TxId>,
1064) -> Result<QueryResult> {
1065 db.check_disk_budget("UPDATE")?;
1066 let txid = tx.ok_or_else(|| Error::Other("missing tx for update".to_string()))?;
1067 let snapshot = db.snapshot();
1068 let rows = db.scan(&p.table, snapshot)?;
1069 let resolved_where = p
1070 .where_clause
1071 .as_ref()
1072 .map(|expr| resolve_in_subqueries(db, expr, params, tx))
1073 .transpose()?;
1074 let matched: Vec<_> = rows
1075 .into_iter()
1076 .filter(|r| {
1077 resolved_where
1078 .as_ref()
1079 .is_none_or(|w| row_matches(r, w, params).unwrap_or(false))
1080 })
1081 .collect();
1082
1083 for row in &matched {
1084 let mut values = row.values.clone();
1085 for (k, vexpr) in &p.assignments {
1086 let value = eval_assignment_expr(vexpr, &row.values, params)?;
1087 values.insert(k.clone(), coerce_value_for_column(db, &p.table, k, value)?);
1088 }
1089 validate_update_state_transition(db, &p.table, row, &values)?;
1090 let row_uuid = values.get("id").and_then(Value::as_uuid).copied();
1091 let new_state = db
1092 .table_meta(&p.table)
1093 .as_ref()
1094 .and_then(|meta| meta.state_machine.as_ref())
1095 .and_then(|sm| values.get(&sm.column))
1096 .and_then(Value::as_text)
1097 .map(std::borrow::ToOwned::to_owned);
1098
1099 let old_has_vector = db.has_live_vector(row.row_id, snapshot);
1100 validate_vector_columns(db, &p.table, &values)?;
1101 let new_vector = vector_value_for_table(db, &p.table, &values).cloned();
1102 let new_row_bytes = estimate_table_row_bytes(db, &p.table, &values)?;
1103 db.accountant().try_allocate_for(
1104 new_row_bytes,
1105 "update",
1106 "row_replace",
1107 "Reduce row growth or raise MEMORY_LIMIT before updating this row.",
1108 )?;
1109 let checkpoint = db.write_set_checkpoint(txid)?;
1110
1111 if let Err(err) = db.delete_row(txid, &p.table, row.row_id) {
1112 db.accountant().release(new_row_bytes);
1113 return Err(err);
1114 }
1115 if old_has_vector && let Err(err) = db.delete_vector(txid, row.row_id) {
1116 db.accountant().release(new_row_bytes);
1117 let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1118 return Err(err);
1119 }
1120
1121 let new_row_id = match db.insert_row(txid, &p.table, values) {
1122 Ok(row_id) => row_id,
1123 Err(err) => {
1124 db.accountant().release(new_row_bytes);
1125 let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1126 return Err(err);
1127 }
1128 };
1129 if let Some(vector) = new_vector
1130 && let Err(err) = db.insert_vector(txid, new_row_id, vector)
1131 {
1132 db.accountant().release(new_row_bytes);
1133 let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1134 return Err(err);
1135 }
1136 if let Err(err) =
1137 db.propagate_state_change_if_needed(txid, &p.table, row_uuid, new_state.as_deref())
1138 {
1139 db.accountant().release(new_row_bytes);
1140 let _ = db.restore_write_set_checkpoint(txid, checkpoint);
1141 return Err(err);
1142 }
1143 }
1144
1145 Ok(QueryResult::empty_with_affected(matched.len() as u64))
1146}
1147
1148fn estimate_table_row_bytes(
1149 db: &Database,
1150 table: &str,
1151 values: &HashMap<String, Value>,
1152) -> Result<usize> {
1153 let meta = db
1154 .table_meta(table)
1155 .ok_or_else(|| Error::TableNotFound(table.to_string()))?;
1156 Ok(estimate_row_bytes_for_meta(values, &meta, false))
1157}
1158
1159fn validate_update_state_transition(
1160 db: &Database,
1161 table: &str,
1162 existing: &VersionedRow,
1163 next_values: &HashMap<String, Value>,
1164) -> Result<()> {
1165 let Some(meta) = db.table_meta(table) else {
1166 return Ok(());
1167 };
1168 let Some(state_machine) = meta.state_machine else {
1169 return Ok(());
1170 };
1171
1172 let old_state = existing
1173 .values
1174 .get(&state_machine.column)
1175 .and_then(Value::as_text);
1176 let new_state = next_values
1177 .get(&state_machine.column)
1178 .and_then(Value::as_text);
1179
1180 let (Some(old_state), Some(new_state)) = (old_state, new_state) else {
1181 return Ok(());
1182 };
1183
1184 if old_state == new_state
1185 || db.relational_store().validate_state_transition(
1186 table,
1187 &state_machine.column,
1188 old_state,
1189 new_state,
1190 )
1191 {
1192 return Ok(());
1193 }
1194
1195 Err(Error::InvalidStateTransition(format!(
1196 "{old_state} -> {new_state}"
1197 )))
1198}
1199
1200fn estimate_row_bytes_for_meta(
1201 values: &HashMap<String, Value>,
1202 meta: &TableMeta,
1203 include_vectors: bool,
1204) -> usize {
1205 let mut bytes = 96usize;
1206 for column in &meta.columns {
1207 let Some(value) = values.get(&column.name) else {
1208 continue;
1209 };
1210 if !include_vectors && matches!(column.column_type, ColumnType::Vector(_)) {
1211 continue;
1212 }
1213 bytes = bytes.saturating_add(32 + column.name.len() * 8 + value.estimated_bytes());
1214 }
1215 bytes
1216}
1217
1218fn estimate_vector_search_bytes(dimension: usize, k: usize) -> usize {
1219 k.saturating_mul(3)
1220 .saturating_mul(dimension)
1221 .saturating_mul(std::mem::size_of::<f32>())
1222}
1223
1224fn estimate_bfs_working_bytes<T>(
1225 frontier: &[T],
1226 steps: &[contextdb_planner::GraphStepPlan],
1227) -> usize {
1228 let max_hops = steps.iter().fold(0usize, |acc, step| {
1229 acc.saturating_add(step.max_depth as usize)
1230 });
1231 frontier
1232 .len()
1233 .saturating_mul(2048)
1234 .saturating_mul(max_hops.max(1))
1235}
1236
1237fn dedupe_graph_frontier(
1238 frontier: Vec<(HashMap<String, uuid::Uuid>, uuid::Uuid, u32)>,
1239 steps: &[contextdb_planner::GraphStepPlan],
1240) -> Vec<(HashMap<String, uuid::Uuid>, uuid::Uuid, u32)> {
1241 let mut best =
1242 HashMap::<Vec<uuid::Uuid>, (HashMap<String, uuid::Uuid>, uuid::Uuid, u32)>::new();
1243
1244 for (bindings, current_id, depth) in frontier {
1245 let mut key = Vec::with_capacity(steps.len());
1246 for step in steps {
1247 if let Some(id) = bindings.get(&step.target_alias) {
1248 key.push(*id);
1249 }
1250 }
1251
1252 best.entry(key)
1253 .and_modify(|existing| {
1254 if depth < existing.2 {
1255 *existing = (bindings.clone(), current_id, depth);
1256 }
1257 })
1258 .or_insert((bindings, current_id, depth));
1259 }
1260
1261 best.into_values().collect()
1262}
1263
1264fn estimate_drop_table_bytes(db: &Database, table: &str) -> usize {
1265 let meta = db.table_meta(table);
1266 let metadata_bytes = meta.as_ref().map(TableMeta::estimated_bytes).unwrap_or(0);
1267 let snapshot = db.snapshot();
1268 let rows = db.scan(table, snapshot).unwrap_or_default();
1269 let row_bytes = rows.iter().fold(0usize, |acc, row| {
1270 acc.saturating_add(meta.as_ref().map_or_else(
1271 || row.estimated_bytes(),
1272 |meta| estimate_row_bytes_for_meta(&row.values, meta, false),
1273 ))
1274 });
1275 let vector_bytes = rows
1276 .iter()
1277 .filter_map(|row| db.live_vector_entry(row.row_id, snapshot))
1278 .fold(0usize, |acc, entry| {
1279 acc.saturating_add(entry.estimated_bytes())
1280 });
1281 let edge_bytes = rows.iter().fold(0usize, |acc, row| {
1282 match (
1283 row.values.get("source_id").and_then(Value::as_uuid),
1284 row.values.get("target_id").and_then(Value::as_uuid),
1285 row.values.get("edge_type").and_then(Value::as_text),
1286 ) {
1287 (Some(_), Some(_), Some(edge_type)) => acc.saturating_add(
1288 96 + edge_type.len().saturating_mul(16) + estimate_row_value_bytes(&HashMap::new()),
1289 ),
1290 _ => acc,
1291 }
1292 });
1293 metadata_bytes
1294 .saturating_add(row_bytes)
1295 .saturating_add(vector_bytes)
1296 .saturating_add(edge_bytes)
1297}
1298
1299fn materialize_rows(
1300 rows: Vec<VersionedRow>,
1301 filter: Option<&Expr>,
1302 params: &HashMap<String, Value>,
1303 schema_columns: Option<&[String]>,
1304) -> Result<QueryResult> {
1305 let filtered: Vec<VersionedRow> = rows
1306 .into_iter()
1307 .filter(|r| filter.is_none_or(|f| row_matches(r, f, params).unwrap_or(false)))
1308 .collect();
1309
1310 let keys = if let Some(schema_columns) = schema_columns {
1311 schema_columns.to_vec()
1312 } else {
1313 let mut keys = BTreeSet::new();
1314 for r in &filtered {
1315 for k in r.values.keys() {
1316 keys.insert(k.clone());
1317 }
1318 }
1319 keys.into_iter().collect::<Vec<_>>()
1320 };
1321
1322 let mut columns = vec!["row_id".to_string()];
1323 columns.extend(keys.iter().cloned());
1324
1325 let rows = filtered
1326 .into_iter()
1327 .map(|r| {
1328 let mut out = vec![Value::Int64(r.row_id as i64)];
1329 for k in &keys {
1330 out.push(r.values.get(k).cloned().unwrap_or(Value::Null));
1331 }
1332 out
1333 })
1334 .collect();
1335
1336 Ok(QueryResult {
1337 columns,
1338 rows,
1339 rows_affected: 0,
1340 })
1341}
1342
1343fn row_matches(row: &VersionedRow, expr: &Expr, params: &HashMap<String, Value>) -> Result<bool> {
1344 Ok(eval_bool_expr(row, expr, params)?.unwrap_or(false))
1345}
1346
1347fn eval_expr_value(
1348 row: &VersionedRow,
1349 expr: &Expr,
1350 params: &HashMap<String, Value>,
1351) -> Result<Value> {
1352 match expr {
1353 Expr::Column(c) => {
1354 if c.column == "row_id" {
1355 Ok(Value::Int64(row.row_id as i64))
1356 } else {
1357 Ok(row.values.get(&c.column).cloned().unwrap_or(Value::Null))
1358 }
1359 }
1360 Expr::BinaryOp { left, op, right } => {
1361 let left = eval_expr_value(row, left, params)?;
1362 let right = eval_expr_value(row, right, params)?;
1363 eval_binary_op(op, &left, &right)
1364 }
1365 Expr::UnaryOp { op, operand } => {
1366 let value = eval_expr_value(row, operand, params)?;
1367 match op {
1368 UnaryOp::Not => Ok(Value::Bool(!value_to_bool(&value))),
1369 UnaryOp::Neg => match value {
1370 Value::Int64(v) => Ok(Value::Int64(-v)),
1371 Value::Float64(v) => Ok(Value::Float64(-v)),
1372 _ => Err(Error::PlanError(
1373 "cannot negate non-numeric value".to_string(),
1374 )),
1375 },
1376 }
1377 }
1378 Expr::FunctionCall { name, args } => eval_function_in_row_context(row, name, args, params),
1379 Expr::IsNull { expr, negated } => {
1380 let is_null = eval_expr_value(row, expr, params)? == Value::Null;
1381 Ok(Value::Bool(if *negated { !is_null } else { is_null }))
1382 }
1383 Expr::InList {
1384 expr,
1385 list,
1386 negated,
1387 } => {
1388 let needle = eval_expr_value(row, expr, params)?;
1389 let matched = list.iter().try_fold(false, |found, item| {
1390 if found {
1391 Ok(true)
1392 } else {
1393 let candidate = eval_expr_value(row, item, params)?;
1394 Ok(
1395 matches!(compare_values(&needle, &candidate), Some(Ordering::Equal))
1396 || (needle != Value::Null
1397 && candidate != Value::Null
1398 && needle == candidate),
1399 )
1400 }
1401 })?;
1402 Ok(Value::Bool(if *negated { !matched } else { matched }))
1403 }
1404 Expr::Like {
1405 expr,
1406 pattern,
1407 negated,
1408 } => {
1409 let matches = match (
1410 eval_expr_value(row, expr, params)?,
1411 eval_expr_value(row, pattern, params)?,
1412 ) {
1413 (Value::Text(value), Value::Text(pattern)) => like_matches(&value, &pattern),
1414 _ => false,
1415 };
1416 Ok(Value::Bool(if *negated { !matches } else { matches }))
1417 }
1418 _ => resolve_expr(expr, params),
1419 }
1420}
1421
1422pub fn resolve_expr(expr: &Expr, params: &HashMap<String, Value>) -> Result<Value> {
1423 match expr {
1424 Expr::Literal(l) => Ok(match l {
1425 Literal::Null => Value::Null,
1426 Literal::Bool(v) => Value::Bool(*v),
1427 Literal::Integer(v) => Value::Int64(*v),
1428 Literal::Real(v) => Value::Float64(*v),
1429 Literal::Text(v) => Value::Text(v.clone()),
1430 Literal::Vector(v) => Value::Vector(v.clone()),
1431 }),
1432 Expr::Parameter(p) => params
1433 .get(p)
1434 .cloned()
1435 .ok_or_else(|| Error::NotFound(format!("missing parameter: {}", p))),
1436 Expr::Column(c) => Ok(Value::Text(c.column.clone())),
1437 Expr::UnaryOp { op, operand } => match op {
1438 UnaryOp::Neg => match resolve_expr(operand, params)? {
1439 Value::Int64(v) => Ok(Value::Int64(-v)),
1440 Value::Float64(v) => Ok(Value::Float64(-v)),
1441 _ => Err(Error::PlanError(
1442 "cannot negate non-numeric value".to_string(),
1443 )),
1444 },
1445 UnaryOp::Not => Err(Error::PlanError(
1446 "boolean NOT requires row context".to_string(),
1447 )),
1448 },
1449 Expr::FunctionCall { name, args } => {
1450 let values = args
1451 .iter()
1452 .map(|arg| resolve_expr(arg, params))
1453 .collect::<Result<Vec<_>>>()?;
1454 eval_function(name, &values)
1455 }
1456 Expr::CosineDistance { right, .. } => resolve_expr(right, params),
1457 _ => Err(Error::PlanError("unsupported expression".to_string())),
1458 }
1459}
1460
1461fn compare_values(a: &Value, b: &Value) -> Option<Ordering> {
1462 match (a, b) {
1463 (Value::Int64(left), Value::Int64(right)) => Some(left.cmp(right)),
1464 (Value::Float64(left), Value::Float64(right)) => Some(left.total_cmp(right)),
1465 (Value::Text(left), Value::Text(right)) => Some(left.cmp(right)),
1466 (Value::Timestamp(left), Value::Timestamp(right)) => Some(left.cmp(right)),
1467 (Value::Int64(left), Value::Float64(right)) => Some((*left as f64).total_cmp(right)),
1468 (Value::Float64(left), Value::Int64(right)) => Some(left.total_cmp(&(*right as f64))),
1469 (Value::Timestamp(left), Value::Int64(right)) => Some(left.cmp(right)),
1470 (Value::Int64(left), Value::Timestamp(right)) => Some(left.cmp(right)),
1471 (Value::Bool(left), Value::Bool(right)) => Some(left.cmp(right)),
1472 (Value::Uuid(left), Value::Uuid(right)) => Some(left.cmp(right)),
1473 (Value::Uuid(u), Value::Text(t)) => {
1474 if let Ok(parsed) = t.parse::<uuid::Uuid>() {
1475 Some(u.cmp(&parsed))
1476 } else {
1477 None
1478 }
1479 }
1480 (Value::Text(t), Value::Uuid(u)) => {
1481 if let Ok(parsed) = t.parse::<uuid::Uuid>() {
1482 Some(parsed.cmp(u))
1483 } else {
1484 None
1485 }
1486 }
1487 (Value::Null, _) | (_, Value::Null) => None,
1488 _ => None,
1489 }
1490}
1491
1492fn eval_bool_expr(
1493 row: &VersionedRow,
1494 expr: &Expr,
1495 params: &HashMap<String, Value>,
1496) -> Result<Option<bool>> {
1497 match expr {
1498 Expr::BinaryOp { left, op, right } => match op {
1499 BinOp::Eq | BinOp::Neq | BinOp::Lt | BinOp::Lte | BinOp::Gt | BinOp::Gte => {
1500 let left = eval_expr_value(row, left, params)?;
1501 let right = eval_expr_value(row, right, params)?;
1502 if left == Value::Null || right == Value::Null {
1503 return Ok(None);
1504 }
1505
1506 let result = match op {
1507 BinOp::Eq => {
1508 compare_values(&left, &right) == Some(Ordering::Equal) || left == right
1509 }
1510 BinOp::Neq => {
1511 !(compare_values(&left, &right) == Some(Ordering::Equal) || left == right)
1512 }
1513 BinOp::Lt => compare_values(&left, &right) == Some(Ordering::Less),
1514 BinOp::Lte => matches!(
1515 compare_values(&left, &right),
1516 Some(Ordering::Less | Ordering::Equal)
1517 ),
1518 BinOp::Gt => compare_values(&left, &right) == Some(Ordering::Greater),
1519 BinOp::Gte => matches!(
1520 compare_values(&left, &right),
1521 Some(Ordering::Greater | Ordering::Equal)
1522 ),
1523 BinOp::And | BinOp::Or => unreachable!(),
1524 };
1525 Ok(Some(result))
1526 }
1527 BinOp::And => {
1528 let left = eval_bool_expr(row, left, params)?;
1529 if left == Some(false) {
1530 return Ok(Some(false));
1531 }
1532 let right = eval_bool_expr(row, right, params)?;
1533 Ok(match (left, right) {
1534 (Some(true), Some(true)) => Some(true),
1535 (Some(true), other) => other,
1536 (None, Some(false)) => Some(false),
1537 (None, Some(true)) | (None, None) => None,
1538 (Some(false), _) => Some(false),
1539 })
1540 }
1541 BinOp::Or => {
1542 let left = eval_bool_expr(row, left, params)?;
1543 if left == Some(true) {
1544 return Ok(Some(true));
1545 }
1546 let right = eval_bool_expr(row, right, params)?;
1547 Ok(match (left, right) {
1548 (Some(false), Some(false)) => Some(false),
1549 (Some(false), other) => other,
1550 (None, Some(true)) => Some(true),
1551 (None, Some(false)) | (None, None) => None,
1552 (Some(true), _) => Some(true),
1553 })
1554 }
1555 },
1556 Expr::UnaryOp {
1557 op: UnaryOp::Not,
1558 operand,
1559 } => Ok(eval_bool_expr(row, operand, params)?.map(|value| !value)),
1560 Expr::InList {
1561 expr,
1562 list,
1563 negated,
1564 } => {
1565 let needle = eval_expr_value(row, expr, params)?;
1566 if needle == Value::Null {
1567 return Ok(None);
1568 }
1569
1570 let matched = list.iter().try_fold(false, |found, item| {
1571 if found {
1572 Ok(true)
1573 } else {
1574 let candidate = eval_expr_value(row, item, params)?;
1575 Ok(
1576 matches!(compare_values(&needle, &candidate), Some(Ordering::Equal))
1577 || (candidate != Value::Null && needle == candidate),
1578 )
1579 }
1580 })?;
1581 Ok(Some(if *negated { !matched } else { matched }))
1582 }
1583 Expr::InSubquery { .. } => Err(Error::PlanError(
1584 "IN (subquery) must be resolved before execution".to_string(),
1585 )),
1586 Expr::Like {
1587 expr,
1588 pattern,
1589 negated,
1590 } => {
1591 let left = eval_expr_value(row, expr, params)?;
1592 let right = eval_expr_value(row, pattern, params)?;
1593 let matched = match (left, right) {
1594 (Value::Text(value), Value::Text(pattern)) => like_matches(&value, &pattern),
1595 _ => false,
1596 };
1597 Ok(Some(if *negated { !matched } else { matched }))
1598 }
1599 Expr::IsNull { expr, negated } => {
1600 let is_null = eval_expr_value(row, expr, params)? == Value::Null;
1601 Ok(Some(if *negated { !is_null } else { is_null }))
1602 }
1603 Expr::FunctionCall { .. } => match eval_expr_value(row, expr, params)? {
1604 Value::Bool(value) => Ok(Some(value)),
1605 Value::Null => Ok(None),
1606 _ => Err(Error::PlanError(format!(
1607 "unsupported WHERE expression: {:?}",
1608 expr
1609 ))),
1610 },
1611 _ => Err(Error::PlanError(format!(
1612 "unsupported WHERE expression: {:?}",
1613 expr
1614 ))),
1615 }
1616}
1617
1618fn eval_binary_op(op: &BinOp, left: &Value, right: &Value) -> Result<Value> {
1619 let bool_value = match op {
1620 BinOp::Eq => {
1621 if left == &Value::Null || right == &Value::Null {
1622 false
1623 } else {
1624 compare_values(left, right) == Some(Ordering::Equal) || left == right
1625 }
1626 }
1627 BinOp::Neq => {
1628 if left == &Value::Null || right == &Value::Null {
1629 false
1630 } else {
1631 !(compare_values(left, right) == Some(Ordering::Equal) || left == right)
1632 }
1633 }
1634 BinOp::Lt => compare_values(left, right) == Some(Ordering::Less),
1635 BinOp::Lte => matches!(
1636 compare_values(left, right),
1637 Some(Ordering::Less | Ordering::Equal)
1638 ),
1639 BinOp::Gt => compare_values(left, right) == Some(Ordering::Greater),
1640 BinOp::Gte => matches!(
1641 compare_values(left, right),
1642 Some(Ordering::Greater | Ordering::Equal)
1643 ),
1644 BinOp::And => value_to_bool(left) && value_to_bool(right),
1645 BinOp::Or => value_to_bool(left) || value_to_bool(right),
1646 };
1647 Ok(Value::Bool(bool_value))
1648}
1649
1650fn value_to_bool(value: &Value) -> bool {
1651 matches!(value, Value::Bool(true))
1652}
1653
1654fn compare_sort_values(left: &Value, right: &Value, direction: SortDirection) -> Ordering {
1655 match (left, right) {
1656 (Value::Null, Value::Null) => Ordering::Equal,
1657 (Value::Null, _) => match direction {
1658 SortDirection::Asc => Ordering::Greater,
1659 SortDirection::Desc => Ordering::Less,
1660 SortDirection::CosineDistance => Ordering::Equal,
1661 },
1662 (_, Value::Null) => match direction {
1663 SortDirection::Asc => Ordering::Less,
1664 SortDirection::Desc => Ordering::Greater,
1665 SortDirection::CosineDistance => Ordering::Equal,
1666 },
1667 _ => {
1668 let ordering = compare_values(left, right).unwrap_or(Ordering::Equal);
1669 match direction {
1670 SortDirection::Asc => ordering,
1671 SortDirection::Desc => ordering.reverse(),
1672 SortDirection::CosineDistance => ordering,
1673 }
1674 }
1675 }
1676}
1677
1678fn eval_assignment_expr(
1679 expr: &Expr,
1680 row_values: &HashMap<String, Value>,
1681 params: &HashMap<String, Value>,
1682) -> Result<Value> {
1683 match expr {
1684 Expr::Literal(lit) => literal_to_value(lit),
1685 Expr::Parameter(name) => params
1686 .get(name)
1687 .cloned()
1688 .ok_or_else(|| Error::Other(format!("unknown parameter: {}", name))),
1689 Expr::Column(col_ref) => row_values
1690 .get(&col_ref.column)
1691 .cloned()
1692 .ok_or_else(|| Error::Other(format!("column not found: {}", col_ref.column))),
1693 Expr::BinaryOp { left, op, right } => {
1694 let left = eval_assignment_expr(left, row_values, params)?;
1695 let right = eval_assignment_expr(right, row_values, params)?;
1696 eval_binary_op(op, &left, &right)
1697 }
1698 Expr::UnaryOp { op, operand } => match op {
1699 UnaryOp::Neg => match eval_assignment_expr(operand, row_values, params)? {
1700 Value::Int64(value) => Ok(Value::Int64(-value)),
1701 Value::Float64(value) => Ok(Value::Float64(-value)),
1702 _ => Err(Error::Other(format!(
1703 "unsupported expression in UPDATE SET: {:?}",
1704 expr
1705 ))),
1706 },
1707 UnaryOp::Not => Err(Error::Other(format!(
1708 "unsupported expression in UPDATE SET: {:?}",
1709 expr
1710 ))),
1711 },
1712 Expr::FunctionCall { name, args } => {
1713 let evaluated = args
1714 .iter()
1715 .map(|arg| eval_assignment_expr(arg, row_values, params))
1716 .collect::<Result<Vec<_>>>()?;
1717 eval_function(name, &evaluated)
1718 }
1719 _ => Err(Error::Other(format!(
1720 "unsupported expression in UPDATE SET: {:?}",
1721 expr
1722 ))),
1723 }
1724}
1725
1726fn apply_on_conflict_updates(
1727 db: &Database,
1728 table: &str,
1729 mut insert_values: HashMap<String, Value>,
1730 existing_row: &VersionedRow,
1731 on_conflict: &OnConflictPlan,
1732 params: &HashMap<String, Value>,
1733) -> Result<HashMap<String, Value>> {
1734 if on_conflict.update_columns.is_empty() {
1735 return Ok(insert_values);
1736 }
1737
1738 let mut merged = existing_row.values.clone();
1739 for (column, expr) in &on_conflict.update_columns {
1740 let value = eval_assignment_expr(expr, &existing_row.values, params)?;
1741 merged.insert(
1742 column.clone(),
1743 coerce_value_for_column(db, table, column, value)?,
1744 );
1745 }
1746
1747 for (column, value) in insert_values.drain() {
1748 merged.entry(column).or_insert(value);
1749 }
1750
1751 Ok(merged)
1752}
1753
1754fn literal_to_value(lit: &Literal) -> Result<Value> {
1755 Ok(match lit {
1756 Literal::Null => Value::Null,
1757 Literal::Bool(v) => Value::Bool(*v),
1758 Literal::Integer(v) => Value::Int64(*v),
1759 Literal::Real(v) => Value::Float64(*v),
1760 Literal::Text(v) => Value::Text(v.clone()),
1761 Literal::Vector(v) => Value::Vector(v.clone()),
1762 })
1763}
1764
1765fn eval_arithmetic(name: &str, args: &[Value]) -> Result<Value> {
1766 let [left, right] = args else {
1767 return Err(Error::PlanError(format!(
1768 "function {} expects 2 arguments",
1769 name
1770 )));
1771 };
1772
1773 match (left, right) {
1774 (Value::Int64(left), Value::Int64(right)) => match name {
1775 "__add" => Ok(Value::Int64(left + right)),
1776 "__sub" => Ok(Value::Int64(left - right)),
1777 "__mul" => Ok(Value::Int64(left * right)),
1778 "__div" => Ok(Value::Int64(left / right)),
1779 _ => Err(Error::PlanError(format!("unknown function: {}", name))),
1780 },
1781 (Value::Float64(left), Value::Float64(right)) => match name {
1782 "__add" => Ok(Value::Float64(left + right)),
1783 "__sub" => Ok(Value::Float64(left - right)),
1784 "__mul" => Ok(Value::Float64(left * right)),
1785 "__div" => Ok(Value::Float64(left / right)),
1786 _ => Err(Error::PlanError(format!("unknown function: {}", name))),
1787 },
1788 (Value::Int64(left), Value::Float64(right)) => match name {
1789 "__add" => Ok(Value::Float64(*left as f64 + right)),
1790 "__sub" => Ok(Value::Float64(*left as f64 - right)),
1791 "__mul" => Ok(Value::Float64(*left as f64 * right)),
1792 "__div" => Ok(Value::Float64(*left as f64 / right)),
1793 _ => Err(Error::PlanError(format!("unknown function: {}", name))),
1794 },
1795 (Value::Float64(left), Value::Int64(right)) => match name {
1796 "__add" => Ok(Value::Float64(left + *right as f64)),
1797 "__sub" => Ok(Value::Float64(left - *right as f64)),
1798 "__mul" => Ok(Value::Float64(left * *right as f64)),
1799 "__div" => Ok(Value::Float64(left / *right as f64)),
1800 _ => Err(Error::PlanError(format!("unknown function: {}", name))),
1801 },
1802 _ => Err(Error::PlanError(format!(
1803 "function {} expects numeric arguments",
1804 name
1805 ))),
1806 }
1807}
1808
1809fn eval_function_in_row_context(
1810 row: &VersionedRow,
1811 name: &str,
1812 args: &[Expr],
1813 params: &HashMap<String, Value>,
1814) -> Result<Value> {
1815 let values = args
1816 .iter()
1817 .map(|arg| eval_expr_value(row, arg, params))
1818 .collect::<Result<Vec<_>>>()?;
1819 eval_function(name, &values)
1820}
1821
1822fn eval_function(name: &str, args: &[Value]) -> Result<Value> {
1823 match name.to_ascii_lowercase().as_str() {
1824 "__add" | "__sub" | "__mul" | "__div" => eval_arithmetic(name, args),
1825 "coalesce" => Ok(args
1826 .iter()
1827 .find(|value| **value != Value::Null)
1828 .cloned()
1829 .unwrap_or(Value::Null)),
1830 "now" => Ok(Value::Timestamp(
1831 SystemTime::now()
1832 .duration_since(UNIX_EPOCH)
1833 .map_err(|err| Error::PlanError(err.to_string()))?
1834 .as_secs() as i64,
1835 )),
1836 _ => Err(Error::PlanError(format!("unknown function: {}", name))),
1837 }
1838}
1839
1840fn like_matches(value: &str, pattern: &str) -> bool {
1841 let value_chars = value.chars().collect::<Vec<_>>();
1842 let pattern_chars = pattern.chars().collect::<Vec<_>>();
1843 let (mut vi, mut pi) = (0usize, 0usize);
1844 let (mut star_idx, mut match_idx) = (None, 0usize);
1845
1846 while vi < value_chars.len() {
1847 if pi < pattern_chars.len()
1848 && (pattern_chars[pi] == '_' || pattern_chars[pi] == value_chars[vi])
1849 {
1850 vi += 1;
1851 pi += 1;
1852 } else if pi < pattern_chars.len() && pattern_chars[pi] == '%' {
1853 star_idx = Some(pi);
1854 match_idx = vi;
1855 pi += 1;
1856 } else if let Some(star) = star_idx {
1857 pi = star + 1;
1858 match_idx += 1;
1859 vi = match_idx;
1860 } else {
1861 return false;
1862 }
1863 }
1864
1865 while pi < pattern_chars.len() && pattern_chars[pi] == '%' {
1866 pi += 1;
1867 }
1868
1869 pi == pattern_chars.len()
1870}
1871
1872fn resolve_in_subqueries(
1873 db: &Database,
1874 expr: &Expr,
1875 params: &HashMap<String, Value>,
1876 tx: Option<TxId>,
1877) -> Result<Expr> {
1878 resolve_in_subqueries_with_ctes(db, expr, params, tx, &[])
1879}
1880
1881pub(crate) fn resolve_in_subqueries_with_ctes(
1882 db: &Database,
1883 expr: &Expr,
1884 params: &HashMap<String, Value>,
1885 tx: Option<TxId>,
1886 ctes: &[Cte],
1887) -> Result<Expr> {
1888 match expr {
1889 Expr::InSubquery {
1890 expr,
1891 subquery,
1892 negated,
1893 } => {
1894 let mut subquery_tables: std::collections::HashSet<String> = subquery
1896 .from
1897 .iter()
1898 .filter_map(|item| match item {
1899 contextdb_parser::ast::FromItem::Table { name, .. } => Some(name.clone()),
1900 _ => None,
1901 })
1902 .collect();
1903 for cte in ctes {
1905 match cte {
1906 Cte::SqlCte { name, .. } | Cte::MatchCte { name, .. } => {
1907 subquery_tables.insert(name.clone());
1908 }
1909 }
1910 }
1911 if let Some(where_clause) = &subquery.where_clause
1912 && has_outer_table_ref(where_clause, &subquery_tables)
1913 {
1914 return Err(Error::Other(
1915 "correlated subqueries are not supported".to_string(),
1916 ));
1917 }
1918
1919 let query_plan = plan(&Statement::Select(SelectStatement {
1920 ctes: ctes.to_vec(),
1921 body: (**subquery).clone(),
1922 }))?;
1923 let result = execute_plan(db, &query_plan, params, tx)?;
1924 let select_expr = subquery
1925 .columns
1926 .first()
1927 .map(|column| column.expr.clone())
1928 .ok_or_else(|| Error::PlanError("subquery must select one column".to_string()))?;
1929 let list = result
1930 .rows
1931 .iter()
1932 .map(|row| eval_project_expr(&select_expr, row, &result.columns, params))
1933 .collect::<Result<Vec<_>>>()?
1934 .into_iter()
1935 .map(value_to_literal)
1936 .collect::<Result<Vec<_>>>()?;
1937 Ok(Expr::InList {
1938 expr: Box::new(resolve_in_subqueries_with_ctes(db, expr, params, tx, ctes)?),
1939 list,
1940 negated: *negated,
1941 })
1942 }
1943 Expr::BinaryOp { left, op, right } => Ok(Expr::BinaryOp {
1944 left: Box::new(resolve_in_subqueries_with_ctes(db, left, params, tx, ctes)?),
1945 op: *op,
1946 right: Box::new(resolve_in_subqueries_with_ctes(
1947 db, right, params, tx, ctes,
1948 )?),
1949 }),
1950 Expr::UnaryOp { op, operand } => Ok(Expr::UnaryOp {
1951 op: *op,
1952 operand: Box::new(resolve_in_subqueries_with_ctes(
1953 db, operand, params, tx, ctes,
1954 )?),
1955 }),
1956 Expr::InList {
1957 expr,
1958 list,
1959 negated,
1960 } => Ok(Expr::InList {
1961 expr: Box::new(resolve_in_subqueries_with_ctes(db, expr, params, tx, ctes)?),
1962 list: list
1963 .iter()
1964 .map(|item| resolve_in_subqueries_with_ctes(db, item, params, tx, ctes))
1965 .collect::<Result<Vec<_>>>()?,
1966 negated: *negated,
1967 }),
1968 Expr::Like {
1969 expr,
1970 pattern,
1971 negated,
1972 } => Ok(Expr::Like {
1973 expr: Box::new(resolve_in_subqueries_with_ctes(db, expr, params, tx, ctes)?),
1974 pattern: Box::new(resolve_in_subqueries_with_ctes(
1975 db, pattern, params, tx, ctes,
1976 )?),
1977 negated: *negated,
1978 }),
1979 Expr::IsNull { expr, negated } => Ok(Expr::IsNull {
1980 expr: Box::new(resolve_in_subqueries_with_ctes(db, expr, params, tx, ctes)?),
1981 negated: *negated,
1982 }),
1983 Expr::FunctionCall { name, args } => Ok(Expr::FunctionCall {
1984 name: name.clone(),
1985 args: args
1986 .iter()
1987 .map(|arg| resolve_in_subqueries_with_ctes(db, arg, params, tx, ctes))
1988 .collect::<Result<Vec<_>>>()?,
1989 }),
1990 _ => Ok(expr.clone()),
1991 }
1992}
1993
1994fn has_outer_table_ref(expr: &Expr, subquery_tables: &std::collections::HashSet<String>) -> bool {
1995 match expr {
1996 Expr::Column(ColumnRef {
1997 table: Some(table), ..
1998 }) => !subquery_tables.contains(table),
1999 Expr::BinaryOp { left, right, .. } => {
2000 has_outer_table_ref(left, subquery_tables)
2001 || has_outer_table_ref(right, subquery_tables)
2002 }
2003 Expr::UnaryOp { operand, .. } => has_outer_table_ref(operand, subquery_tables),
2004 Expr::InList { expr, list, .. } => {
2005 has_outer_table_ref(expr, subquery_tables)
2006 || list
2007 .iter()
2008 .any(|item| has_outer_table_ref(item, subquery_tables))
2009 }
2010 Expr::IsNull { expr, .. } => has_outer_table_ref(expr, subquery_tables),
2011 Expr::Like { expr, pattern, .. } => {
2012 has_outer_table_ref(expr, subquery_tables)
2013 || has_outer_table_ref(pattern, subquery_tables)
2014 }
2015 Expr::FunctionCall { args, .. } => args
2016 .iter()
2017 .any(|arg| has_outer_table_ref(arg, subquery_tables)),
2018 _ => false,
2019 }
2020}
2021
2022fn value_to_literal(value: Value) -> Result<Expr> {
2023 Ok(Expr::Literal(match value {
2024 Value::Null => Literal::Null,
2025 Value::Bool(v) => Literal::Bool(v),
2026 Value::Int64(v) => Literal::Integer(v),
2027 Value::Float64(v) => Literal::Real(v),
2028 Value::Text(v) => Literal::Text(v),
2029 Value::Uuid(v) => Literal::Text(v.to_string()),
2030 Value::Timestamp(v) => Literal::Integer(v),
2031 other => {
2032 return Err(Error::PlanError(format!(
2033 "unsupported subquery result value: {:?}",
2034 other
2035 )));
2036 }
2037 }))
2038}
2039
2040fn query_result_row_matches(
2041 row: &[Value],
2042 columns: &[String],
2043 expr: &Expr,
2044 params: &HashMap<String, Value>,
2045) -> Result<bool> {
2046 Ok(eval_query_result_bool_expr(row, columns, expr, params)?.unwrap_or(false))
2047}
2048
2049fn eval_query_result_bool_expr(
2050 row: &[Value],
2051 columns: &[String],
2052 expr: &Expr,
2053 params: &HashMap<String, Value>,
2054) -> Result<Option<bool>> {
2055 match expr {
2056 Expr::BinaryOp { left, op, right } => match op {
2057 BinOp::Eq | BinOp::Neq | BinOp::Lt | BinOp::Lte | BinOp::Gt | BinOp::Gte => {
2058 let left = eval_query_result_expr(left, row, columns, params)?;
2059 let right = eval_query_result_expr(right, row, columns, params)?;
2060 if left == Value::Null || right == Value::Null {
2061 return Ok(None);
2062 }
2063
2064 let result = match op {
2065 BinOp::Eq => {
2066 compare_values(&left, &right) == Some(Ordering::Equal) || left == right
2067 }
2068 BinOp::Neq => {
2069 !(compare_values(&left, &right) == Some(Ordering::Equal) || left == right)
2070 }
2071 BinOp::Lt => compare_values(&left, &right) == Some(Ordering::Less),
2072 BinOp::Lte => matches!(
2073 compare_values(&left, &right),
2074 Some(Ordering::Less | Ordering::Equal)
2075 ),
2076 BinOp::Gt => compare_values(&left, &right) == Some(Ordering::Greater),
2077 BinOp::Gte => matches!(
2078 compare_values(&left, &right),
2079 Some(Ordering::Greater | Ordering::Equal)
2080 ),
2081 BinOp::And | BinOp::Or => unreachable!(),
2082 };
2083 Ok(Some(result))
2084 }
2085 BinOp::And => {
2086 let left = eval_query_result_bool_expr(row, columns, left, params)?;
2087 if left == Some(false) {
2088 return Ok(Some(false));
2089 }
2090 let right = eval_query_result_bool_expr(row, columns, right, params)?;
2091 Ok(match (left, right) {
2092 (Some(true), Some(true)) => Some(true),
2093 (Some(true), other) => other,
2094 (None, Some(false)) => Some(false),
2095 (None, Some(true)) | (None, None) => None,
2096 (Some(false), _) => Some(false),
2097 })
2098 }
2099 BinOp::Or => {
2100 let left = eval_query_result_bool_expr(row, columns, left, params)?;
2101 if left == Some(true) {
2102 return Ok(Some(true));
2103 }
2104 let right = eval_query_result_bool_expr(row, columns, right, params)?;
2105 Ok(match (left, right) {
2106 (Some(false), Some(false)) => Some(false),
2107 (Some(false), other) => other,
2108 (None, Some(true)) => Some(true),
2109 (None, Some(false)) | (None, None) => None,
2110 (Some(true), _) => Some(true),
2111 })
2112 }
2113 },
2114 Expr::UnaryOp {
2115 op: UnaryOp::Not,
2116 operand,
2117 } => Ok(eval_query_result_bool_expr(row, columns, operand, params)?.map(|value| !value)),
2118 Expr::InList {
2119 expr,
2120 list,
2121 negated,
2122 } => {
2123 let needle = eval_query_result_expr(expr, row, columns, params)?;
2124 if needle == Value::Null {
2125 return Ok(None);
2126 }
2127
2128 let matched = list.iter().try_fold(false, |found, item| {
2129 if found {
2130 Ok(true)
2131 } else {
2132 let candidate = eval_query_result_expr(item, row, columns, params)?;
2133 Ok(
2134 matches!(compare_values(&needle, &candidate), Some(Ordering::Equal))
2135 || (candidate != Value::Null && needle == candidate),
2136 )
2137 }
2138 })?;
2139 Ok(Some(if *negated { !matched } else { matched }))
2140 }
2141 Expr::InSubquery { .. } => Err(Error::PlanError(
2142 "IN (subquery) must be resolved before execution".to_string(),
2143 )),
2144 Expr::Like {
2145 expr,
2146 pattern,
2147 negated,
2148 } => {
2149 let left = eval_query_result_expr(expr, row, columns, params)?;
2150 let right = eval_query_result_expr(pattern, row, columns, params)?;
2151 let matched = match (left, right) {
2152 (Value::Text(value), Value::Text(pattern)) => like_matches(&value, &pattern),
2153 _ => false,
2154 };
2155 Ok(Some(if *negated { !matched } else { matched }))
2156 }
2157 Expr::IsNull { expr, negated } => {
2158 let is_null = eval_query_result_expr(expr, row, columns, params)? == Value::Null;
2159 Ok(Some(if *negated { !is_null } else { is_null }))
2160 }
2161 Expr::FunctionCall { .. } => match eval_query_result_expr(expr, row, columns, params)? {
2162 Value::Bool(value) => Ok(Some(value)),
2163 Value::Null => Ok(None),
2164 _ => Err(Error::PlanError(format!(
2165 "unsupported WHERE expression: {:?}",
2166 expr
2167 ))),
2168 },
2169 _ => Err(Error::PlanError(format!(
2170 "unsupported WHERE expression: {:?}",
2171 expr
2172 ))),
2173 }
2174}
2175
2176fn lookup_query_result_column(
2177 row: &[Value],
2178 input_columns: &[String],
2179 column_ref: &ColumnRef,
2180) -> Result<Value> {
2181 if let Some(table) = &column_ref.table {
2182 let qualified = format!("{table}.{}", column_ref.column);
2183 let idx = input_columns
2186 .iter()
2187 .position(|name| name == &qualified)
2188 .or_else(|| {
2189 input_columns
2190 .iter()
2191 .position(|name| name == &column_ref.column)
2192 })
2193 .ok_or_else(|| Error::PlanError(format!("project column not found: {}", qualified)))?;
2194 return Ok(row.get(idx).cloned().unwrap_or(Value::Null));
2195 }
2196
2197 let matches = input_columns
2198 .iter()
2199 .enumerate()
2200 .filter_map(|(idx, name)| {
2201 (name == &column_ref.column
2202 || name.rsplit('.').next() == Some(column_ref.column.as_str()))
2203 .then_some(idx)
2204 })
2205 .collect::<Vec<_>>();
2206
2207 match matches.as_slice() {
2208 [] => Err(Error::PlanError(format!(
2209 "project column not found: {}",
2210 column_ref.column
2211 ))),
2212 [idx] => Ok(row.get(*idx).cloned().unwrap_or(Value::Null)),
2213 _ => Err(Error::PlanError(format!(
2214 "ambiguous column reference: {}",
2215 column_ref.column
2216 ))),
2217 }
2218}
2219
2220fn concatenate_rows(left: &[Value], right: &[Value]) -> Vec<Value> {
2221 let mut combined = Vec::with_capacity(left.len() + right.len());
2222 combined.extend_from_slice(left);
2223 combined.extend_from_slice(right);
2224 combined
2225}
2226
2227fn duplicate_column_names(left: &[String], right: &[String]) -> BTreeSet<String> {
2228 let left_names = left
2229 .iter()
2230 .map(|column| column.rsplit('.').next().unwrap_or(column.as_str()))
2231 .collect::<BTreeSet<_>>();
2232 right
2233 .iter()
2234 .filter_map(|column| {
2235 let bare = column.rsplit('.').next().unwrap_or(column.as_str());
2236 left_names.contains(bare).then(|| bare.to_string())
2237 })
2238 .collect()
2239}
2240
2241fn qualify_join_columns(
2242 columns: &[String],
2243 left_columns: &[String],
2244 right_columns: &[String],
2245 left_alias: &Option<String>,
2246 right_prefix: &str,
2247) -> Vec<String> {
2248 let left_prefix = left_alias.as_deref();
2249 columns
2250 .iter()
2251 .enumerate()
2252 .map(|(idx, column)| {
2253 if idx < left_columns.len() {
2254 if let Some(prefix) = left_prefix {
2255 format!(
2256 "{prefix}.{}",
2257 left_columns[idx].rsplit('.').next().unwrap_or(column)
2258 )
2259 } else {
2260 left_columns[idx].clone()
2261 }
2262 } else {
2263 let right_idx = idx - left_columns.len();
2264 let bare = right_columns[right_idx]
2265 .rsplit('.')
2266 .next()
2267 .unwrap_or(right_columns[right_idx].as_str());
2268 if column == bare {
2269 format!("{right_prefix}.{bare}")
2270 } else {
2271 column.clone()
2272 }
2273 }
2274 })
2275 .collect()
2276}
2277
2278fn right_table_name(plan: &PhysicalPlan) -> String {
2279 match plan {
2280 PhysicalPlan::Scan { table, alias, .. } => alias.clone().unwrap_or_else(|| table.clone()),
2281 _ => "right".to_string(),
2282 }
2283}
2284
2285fn distinct_row_key(row: &[Value]) -> Vec<u8> {
2286 bincode::serde::encode_to_vec(row, bincode::config::standard())
2287 .expect("query rows should serialize for DISTINCT")
2288}
2289
2290fn resolve_uuid(expr: &Expr, params: &HashMap<String, Value>) -> Result<uuid::Uuid> {
2291 match resolve_expr(expr, params)? {
2292 Value::Uuid(u) => Ok(u),
2293 Value::Text(t) => uuid::Uuid::parse_str(&t)
2294 .map_err(|e| Error::PlanError(format!("invalid uuid '{}': {}", t, e))),
2295 _ => Err(Error::PlanError(
2296 "graph start node must be UUID".to_string(),
2297 )),
2298 }
2299}
2300
2301fn resolve_graph_start_nodes_from_filter(
2304 db: &Database,
2305 filter: &Expr,
2306 params: &HashMap<String, Value>,
2307) -> Result<Vec<uuid::Uuid>> {
2308 if let Some(ids) = resolve_graph_start_ids_from_filter(filter, params)? {
2309 return Ok(ids);
2310 }
2311
2312 let (col_name, expected_value) = match filter {
2314 Expr::BinaryOp {
2315 left,
2316 op: BinOp::Eq,
2317 right,
2318 } => {
2319 if let Some(col) = extract_column_name(left) {
2320 (col, resolve_expr(right, params)?)
2321 } else if let Some(col) = extract_column_name(right) {
2322 (col, resolve_expr(left, params)?)
2323 } else {
2324 return Ok(vec![]);
2325 }
2326 }
2327 _ => return Ok(vec![]),
2328 };
2329
2330 let snapshot = db.snapshot();
2331 let mut uuids = Vec::new();
2332 for table_name in db.table_names() {
2333 let meta = match db.table_meta(&table_name) {
2334 Some(m) => m,
2335 None => continue,
2336 };
2337 let has_col = meta.columns.iter().any(|c| c.name == col_name);
2339 let has_id = meta.columns.iter().any(|c| c.name == "id");
2340 if !has_col || !has_id {
2341 continue;
2342 }
2343 let rows = db.scan_filter(&table_name, snapshot, &|row| {
2344 row.values.get(&col_name) == Some(&expected_value)
2345 })?;
2346 for row in rows {
2347 if let Some(Value::Uuid(id)) = row.values.get("id") {
2348 uuids.push(*id);
2349 }
2350 }
2351 }
2352 Ok(uuids)
2353}
2354
2355fn resolve_graph_start_nodes_from_plan(
2356 db: &Database,
2357 plan: &PhysicalPlan,
2358 params: &HashMap<String, Value>,
2359 tx: Option<TxId>,
2360) -> Result<Vec<uuid::Uuid>> {
2361 let result = execute_plan(db, plan, params, tx)?;
2362 result
2363 .rows
2364 .into_iter()
2365 .filter_map(|row| row.into_iter().next())
2366 .map(|value| match value {
2367 Value::Uuid(id) => Ok(id),
2368 Value::Text(text) => uuid::Uuid::parse_str(&text)
2369 .map_err(|_| Error::PlanError(format!("invalid UUID in graph start plan: {text}"))),
2370 other => Err(Error::PlanError(format!(
2371 "invalid graph start identifier from plan: {other:?}"
2372 ))),
2373 })
2374 .collect()
2375}
2376
2377fn resolve_graph_start_ids_from_filter(
2378 filter: &Expr,
2379 params: &HashMap<String, Value>,
2380) -> Result<Option<Vec<uuid::Uuid>>> {
2381 match filter {
2382 Expr::BinaryOp {
2383 left,
2384 op: BinOp::Eq,
2385 right,
2386 } if is_graph_id_ref(left) || is_graph_id_ref(right) => {
2387 let value = if is_graph_id_ref(left) {
2388 resolve_expr(right, params)?
2389 } else {
2390 resolve_expr(left, params)?
2391 };
2392 let id = match value {
2393 Value::Uuid(id) => id,
2394 Value::Text(text) => uuid::Uuid::parse_str(&text).map_err(|_| {
2395 Error::PlanError(format!("invalid UUID in graph filter: {text}"))
2396 })?,
2397 other => {
2398 return Err(Error::PlanError(format!(
2399 "invalid graph start identifier in filter: {other:?}"
2400 )));
2401 }
2402 };
2403 Ok(Some(vec![id]))
2404 }
2405 Expr::InList { expr, list, .. } if is_graph_id_ref(expr) => {
2406 let ids = list
2407 .iter()
2408 .map(|item| resolve_expr(item, params))
2409 .map(|value| match value? {
2410 Value::Uuid(id) => Ok(id),
2411 Value::Text(text) => uuid::Uuid::parse_str(&text).map_err(|_| {
2412 Error::PlanError(format!("invalid UUID in graph filter: {text}"))
2413 }),
2414 other => Err(Error::PlanError(format!(
2415 "invalid graph start identifier in filter: {other:?}"
2416 ))),
2417 })
2418 .collect::<Result<Vec<_>>>()?;
2419 Ok(Some(ids))
2420 }
2421 Expr::BinaryOp { left, right, .. } => {
2422 if let Some(ids) = resolve_graph_start_ids_from_filter(left, params)? {
2423 return Ok(Some(ids));
2424 }
2425 resolve_graph_start_ids_from_filter(right, params)
2426 }
2427 Expr::UnaryOp { operand, .. } => resolve_graph_start_ids_from_filter(operand, params),
2428 _ => Ok(None),
2429 }
2430}
2431
2432fn is_graph_id_ref(expr: &Expr) -> bool {
2433 matches!(
2434 expr,
2435 Expr::Column(contextdb_parser::ast::ColumnRef { column, .. }) if column == "id"
2436 )
2437}
2438
2439fn extract_column_name(expr: &Expr) -> Option<String> {
2441 match expr {
2442 Expr::Column(contextdb_parser::ast::ColumnRef { column, .. }) => Some(column.clone()),
2443 _ => None,
2444 }
2445}
2446
2447fn resolve_vector_from_expr(expr: &Expr, params: &HashMap<String, Value>) -> Result<Vec<f32>> {
2448 match resolve_expr(expr, params)? {
2449 Value::Vector(v) => Ok(v),
2450 Value::Text(name) => match params.get(&name) {
2451 Some(Value::Vector(v)) => Ok(v.clone()),
2452 _ => Err(Error::PlanError("vector parameter missing".to_string())),
2453 },
2454 _ => Err(Error::PlanError(
2455 "invalid vector query expression".to_string(),
2456 )),
2457 }
2458}
2459
2460fn validate_vector_columns(
2461 db: &Database,
2462 table: &str,
2463 values: &HashMap<String, Value>,
2464) -> Result<()> {
2465 let Some(meta) = db.table_meta(table) else {
2466 return Ok(());
2467 };
2468
2469 for column in &meta.columns {
2470 if let contextdb_core::ColumnType::Vector(expected) = column.column_type
2471 && let Some(Value::Vector(vector)) = values.get(&column.name)
2472 {
2473 let got = vector.len();
2474 if got != expected {
2475 return Err(Error::VectorDimensionMismatch { expected, got });
2476 }
2477 }
2478 }
2479
2480 Ok(())
2481}
2482
2483fn vector_value_for_table<'a>(
2484 db: &Database,
2485 table: &str,
2486 values: &'a HashMap<String, Value>,
2487) -> Option<&'a Vec<f32>> {
2488 let meta = db.table_meta(table)?;
2489 meta.columns
2490 .iter()
2491 .find_map(|column| match column.column_type {
2492 contextdb_core::ColumnType::Vector(_) => match values.get(&column.name) {
2493 Some(Value::Vector(vector)) => Some(vector),
2494 _ => None,
2495 },
2496 _ => None,
2497 })
2498}
2499
2500fn coerce_value_for_column(db: &Database, table: &str, col: &str, v: Value) -> Result<Value> {
2501 let Some(meta) = db.table_meta(table) else {
2502 return Ok(coerce_uuid_if_needed(col, v));
2503 };
2504 let Some(column) = meta.columns.iter().find(|c| c.name == col) else {
2505 return Ok(coerce_uuid_if_needed(col, v));
2506 };
2507
2508 match column.column_type {
2509 contextdb_core::ColumnType::Uuid => coerce_uuid_value(v),
2510 contextdb_core::ColumnType::Timestamp => coerce_timestamp_value(v),
2511 contextdb_core::ColumnType::Vector(dim) => coerce_vector_value(v, dim),
2512 _ => Ok(coerce_uuid_if_needed(col, v)),
2513 }
2514}
2515
2516fn coerce_uuid_value(v: Value) -> Result<Value> {
2517 match v {
2518 Value::Null => Ok(Value::Null),
2519 Value::Uuid(id) => Ok(Value::Uuid(id)),
2520 Value::Text(text) => uuid::Uuid::parse_str(&text)
2521 .map(Value::Uuid)
2522 .map_err(|err| Error::Other(format!("invalid UUID literal '{text}': {err}"))),
2523 other => Err(Error::Other(format!(
2524 "UUID column requires UUID or text literal, got {other:?}"
2525 ))),
2526 }
2527}
2528
2529fn coerce_uuid_if_needed(col: &str, v: Value) -> Value {
2530 if (col == "id" || col.ends_with("_id"))
2531 && let Value::Text(s) = &v
2532 && let Ok(u) = uuid::Uuid::parse_str(s)
2533 {
2534 return Value::Uuid(u);
2535 }
2536 v
2537}
2538
2539fn coerce_timestamp_value(v: Value) -> Result<Value> {
2540 match v {
2541 Value::Null => Ok(Value::Null),
2542 Value::Text(text) if text.eq_ignore_ascii_case("infinity") => {
2543 Ok(Value::Timestamp(i64::MAX))
2544 }
2545 Value::Text(text) => {
2546 let parsed = OffsetDateTime::parse(&text, &Rfc3339).map_err(|err| {
2547 Error::Other(format!("invalid TIMESTAMP literal '{text}': {err}"))
2548 })?;
2549 Ok(Value::Timestamp(
2550 parsed.unix_timestamp_nanos() as i64 / 1_000_000,
2551 ))
2552 }
2553 other => Ok(other),
2554 }
2555}
2556
2557fn coerce_vector_value(v: Value, expected_dim: usize) -> Result<Value> {
2558 let vector = match v {
2559 Value::Null => return Ok(Value::Null),
2560 Value::Vector(vector) => vector,
2561 Value::Text(text) => parse_text_vector_literal(&text)?,
2562 other => return Ok(other),
2563 };
2564
2565 if vector.len() != expected_dim {
2566 return Err(Error::VectorDimensionMismatch {
2567 expected: expected_dim,
2568 got: vector.len(),
2569 });
2570 }
2571
2572 Ok(Value::Vector(vector))
2573}
2574
2575fn parse_text_vector_literal(text: &str) -> Result<Vec<f32>> {
2576 let trimmed = text.trim();
2577 let inner = trimmed
2578 .strip_prefix('[')
2579 .and_then(|s| s.strip_suffix(']'))
2580 .ok_or_else(|| Error::Other(format!("invalid VECTOR literal '{text}'")))?;
2581
2582 if inner.trim().is_empty() {
2583 return Ok(Vec::new());
2584 }
2585
2586 inner
2587 .split(',')
2588 .map(|part| {
2589 part.trim().parse::<f32>().map_err(|err| {
2590 Error::Other(format!("invalid VECTOR component '{}': {err}", part.trim()))
2591 })
2592 })
2593 .collect()
2594}
2595
2596fn apply_missing_column_defaults(
2597 db: &Database,
2598 table: &str,
2599 values: &mut HashMap<String, Value>,
2600) -> Result<()> {
2601 let Some(meta) = db.table_meta(table) else {
2602 return Ok(());
2603 };
2604
2605 for column in &meta.columns {
2606 if values.contains_key(&column.name) {
2607 continue;
2608 }
2609 let Some(default) = &column.default else {
2610 continue;
2611 };
2612 let value = evaluate_stored_default_expr(default)?;
2613 values.insert(
2614 column.name.clone(),
2615 coerce_value_for_column(db, table, &column.name, value)?,
2616 );
2617 }
2618
2619 Ok(())
2620}
2621
2622fn evaluate_stored_default_expr(default: &str) -> Result<Value> {
2623 if default.eq_ignore_ascii_case("NOW()") {
2624 return eval_function("now", &[]);
2625 }
2626 if default.contains("FunctionCall") && default.contains("name: \"NOW\"") {
2627 return eval_function("now", &[]);
2628 }
2629 if default == "Literal(Null)" || default.eq_ignore_ascii_case("NULL") {
2630 return Ok(Value::Null);
2631 }
2632 if default.eq_ignore_ascii_case("TRUE") {
2633 return Ok(Value::Bool(true));
2634 }
2635 if default.eq_ignore_ascii_case("FALSE") {
2636 return Ok(Value::Bool(false));
2637 }
2638 if default.starts_with('\'') && default.ends_with('\'') && default.len() >= 2 {
2639 return Ok(Value::Text(
2640 default[1..default.len() - 1].replace("''", "'"),
2641 ));
2642 }
2643 if let Some(text) = default
2644 .strip_prefix("Literal(Text(\"")
2645 .and_then(|value| value.strip_suffix("\"))"))
2646 {
2647 return Ok(Value::Text(text.to_string()));
2648 }
2649 if let Some(value) = default
2650 .strip_prefix("Literal(Integer(")
2651 .and_then(|value| value.strip_suffix("))"))
2652 {
2653 let parsed = value.parse::<i64>().map_err(|err| {
2654 Error::Other(format!("invalid stored integer default '{value}': {err}"))
2655 })?;
2656 return Ok(Value::Int64(parsed));
2657 }
2658 if let Some(value) = default
2659 .strip_prefix("Literal(Real(")
2660 .and_then(|value| value.strip_suffix("))"))
2661 {
2662 let parsed = value
2663 .parse::<f64>()
2664 .map_err(|err| Error::Other(format!("invalid stored real default '{value}': {err}")))?;
2665 return Ok(Value::Float64(parsed));
2666 }
2667 if let Some(value) = default
2668 .strip_prefix("Literal(Bool(")
2669 .and_then(|value| value.strip_suffix("))"))
2670 {
2671 let parsed = value
2672 .parse::<bool>()
2673 .map_err(|err| Error::Other(format!("invalid stored bool default '{value}': {err}")))?;
2674 return Ok(Value::Bool(parsed));
2675 }
2676
2677 Err(Error::Other(format!(
2678 "unsupported stored DEFAULT expression: {default}"
2679 )))
2680}
2681
2682pub(crate) fn stored_default_expr(expr: &Expr) -> String {
2683 match expr {
2684 Expr::Literal(Literal::Null) => "NULL".to_string(),
2685 Expr::Literal(Literal::Bool(value)) => {
2686 if *value {
2687 "TRUE".to_string()
2688 } else {
2689 "FALSE".to_string()
2690 }
2691 }
2692 Expr::Literal(Literal::Integer(value)) => value.to_string(),
2693 Expr::Literal(Literal::Real(value)) => value.to_string(),
2694 Expr::Literal(Literal::Text(value)) => format!("'{}'", value.replace('\'', "''")),
2695 Expr::FunctionCall { name, args }
2696 if name.eq_ignore_ascii_case("NOW") && args.is_empty() =>
2697 {
2698 "NOW()".to_string()
2699 }
2700 _ => format!("{expr:?}"),
2701 }
2702}
2703
2704fn should_route_insert_to_graph(db: &Database, table: &str) -> bool {
2705 table.eq_ignore_ascii_case("edges")
2706 || db
2707 .table_meta(table)
2708 .is_some_and(|table_meta| !table_meta.dag_edge_types.is_empty())
2709}
2710
2711fn validate_expires_column(col: &contextdb_parser::ast::ColumnDef) -> Result<()> {
2712 if col.expires && !matches!(col.data_type, DataType::Timestamp) {
2713 return Err(Error::Other(
2714 "EXPIRES is only valid on TIMESTAMP columns".to_string(),
2715 ));
2716 }
2717 Ok(())
2718}
2719
2720fn expires_column_name(columns: &[contextdb_parser::ast::ColumnDef]) -> Result<Option<String>> {
2721 let mut expires_column = None;
2722 for col in columns {
2723 validate_expires_column(col)?;
2724 if col.expires {
2725 if expires_column.is_some() {
2726 return Err(Error::Other(
2727 "only one EXPIRES column is supported per table".to_string(),
2728 ));
2729 }
2730 expires_column = Some(col.name.clone());
2731 }
2732 }
2733 Ok(expires_column)
2734}
2735
2736pub(crate) fn map_column_type(dtype: &DataType) -> contextdb_core::ColumnType {
2737 match dtype {
2738 DataType::Uuid => contextdb_core::ColumnType::Uuid,
2739 DataType::Text => contextdb_core::ColumnType::Text,
2740 DataType::Integer => contextdb_core::ColumnType::Integer,
2741 DataType::Real => contextdb_core::ColumnType::Real,
2742 DataType::Boolean => contextdb_core::ColumnType::Boolean,
2743 DataType::Timestamp => contextdb_core::ColumnType::Timestamp,
2744 DataType::Json => contextdb_core::ColumnType::Json,
2745 DataType::Vector(dim) => contextdb_core::ColumnType::Vector(*dim as usize),
2746 }
2747}
2748
2749fn parse_conflict_policy(s: &str) -> Result<ConflictPolicy> {
2750 match s {
2751 "latest_wins" => Ok(ConflictPolicy::LatestWins),
2752 "server_wins" => Ok(ConflictPolicy::ServerWins),
2753 "edge_wins" => Ok(ConflictPolicy::EdgeWins),
2754 "insert_if_not_exists" => Ok(ConflictPolicy::InsertIfNotExists),
2755 _ => Err(Error::Other(format!("unknown conflict policy: {s}"))),
2756 }
2757}
2758
2759fn conflict_policy_to_string(p: ConflictPolicy) -> String {
2760 match p {
2761 ConflictPolicy::LatestWins => "latest_wins".to_string(),
2762 ConflictPolicy::ServerWins => "server_wins".to_string(),
2763 ConflictPolicy::EdgeWins => "edge_wins".to_string(),
2764 ConflictPolicy::InsertIfNotExists => "insert_if_not_exists".to_string(),
2765 }
2766}
2767
2768fn project_graph_frontier_rows(
2769 frontier: Vec<(HashMap<String, uuid::Uuid>, uuid::Uuid, u32)>,
2770 start_alias: &str,
2771 steps: &[GraphStepPlan],
2772) -> Result<Vec<Vec<Value>>> {
2773 frontier
2774 .into_iter()
2775 .map(|(bindings, id, depth)| {
2776 let mut row = Vec::with_capacity(steps.len() + 3);
2777 let start_id = bindings.get(start_alias).ok_or_else(|| {
2778 Error::PlanError(format!(
2779 "graph frontier missing required start alias binding '{start_alias}'"
2780 ))
2781 })?;
2782 row.push(Value::Uuid(*start_id));
2783 for step in steps {
2784 let target_id = bindings.get(&step.target_alias).ok_or_else(|| {
2785 Error::PlanError(format!(
2786 "graph frontier missing required target alias binding '{}'",
2787 step.target_alias
2788 ))
2789 })?;
2790 row.push(Value::Uuid(*target_id));
2791 }
2792 row.push(Value::Uuid(id));
2793 row.push(Value::Int64(depth as i64));
2794 Ok(row)
2795 })
2796 .collect()
2797}
2798
2799#[cfg(test)]
2800mod tests {
2801 use super::*;
2802 use contextdb_planner::GraphStepPlan;
2803 use uuid::Uuid;
2804
2805 #[test]
2806 fn graph_01_frontier_projection_requires_complete_bindings() {
2807 let steps = vec![GraphStepPlan {
2808 edge_types: vec!["EDGE".to_string()],
2809 direction: Direction::Outgoing,
2810 min_depth: 1,
2811 max_depth: 1,
2812 target_alias: "b".to_string(),
2813 }];
2814
2815 let missing_start = vec![(HashMap::new(), Uuid::new_v4(), 0)];
2816 let missing_target = vec![(
2817 HashMap::from([("a".to_string(), Uuid::new_v4())]),
2818 Uuid::new_v4(),
2819 0,
2820 )];
2821
2822 let start_result = project_graph_frontier_rows(missing_start, "a", &steps);
2823 assert!(
2824 matches!(start_result, Err(Error::PlanError(_))),
2825 "graph frontier projection should return a plan error on missing start alias binding, got {start_result:?}"
2826 );
2827
2828 let target_result = project_graph_frontier_rows(missing_target, "a", &steps);
2829 assert!(
2830 matches!(target_result, Err(Error::PlanError(_))),
2831 "graph frontier projection should return a plan error on missing target alias binding, got {target_result:?}"
2832 );
2833 }
2834}