contextdb_engine/
schema_enforcer.rs1use crate::database::Database;
2use contextdb_core::{Error, Result, Value};
3use contextdb_parser::ast::{Expr, Literal};
4use contextdb_planner::PhysicalPlan;
5use std::collections::HashMap;
6
7pub fn validate_dml(
8 plan: &PhysicalPlan,
9 db: &Database,
10 params: &HashMap<String, Value>,
11) -> Result<()> {
12 match plan {
13 PhysicalPlan::Update(p) => {
14 let metas = db.relational_store().table_meta.read();
21 let table_meta = metas
22 .get(&p.table)
23 .ok_or_else(|| Error::TableNotFound(p.table.clone()))?;
24
25 for (col_name, _) in &p.assignments {
27 if !table_meta.columns.iter().any(|c| c.name == *col_name) {
28 return Err(Error::Other(format!(
29 "column '{}' does not exist on table '{}'",
30 col_name, p.table
31 )));
32 }
33 }
34
35 if table_meta.immutable {
37 return Err(Error::ImmutableTable(p.table.clone()));
38 }
39
40 for (col_name, _) in &p.assignments {
42 if let Some(col) = table_meta.columns.iter().find(|c| c.name == *col_name)
43 && col.immutable
44 {
45 return Err(Error::ImmutableColumn {
46 table: p.table.clone(),
47 column: col_name.clone(),
48 });
49 }
50 }
51
52 Ok(())
53 }
54 PhysicalPlan::Delete(p) => {
55 let metas = db.relational_store().table_meta.read();
56 if metas.get(&p.table).is_some_and(|meta| meta.immutable) {
57 Err(Error::ImmutableTable(p.table.clone()))
58 } else {
59 Ok(())
60 }
61 }
62 PhysicalPlan::Insert(p) => {
63 let metas = db.relational_store().table_meta.read();
64 let table_meta = metas
65 .get(&p.table)
66 .ok_or_else(|| Error::TableNotFound(p.table.clone()))?;
67
68 let columns: Vec<String> = if p.columns.is_empty() {
70 table_meta.columns.iter().map(|c| c.name.clone()).collect()
71 } else {
72 p.columns.clone()
73 };
74
75 for col_name in &columns {
77 if !table_meta.columns.iter().any(|c| c.name == *col_name) {
78 return Err(Error::Other(format!(
79 "column '{}' does not exist in table '{}'",
80 col_name, p.table
81 )));
82 }
83 }
84
85 for row in &p.values {
86 for column in table_meta
87 .columns
88 .iter()
89 .filter(|column| !column.nullable && !column.primary_key)
90 {
91 if let Some(index) = columns.iter().position(|name| name == &column.name) {
92 let value = row
93 .get(index)
94 .ok_or_else(|| {
95 Error::PlanError("column/value count mismatch".to_string())
96 })
97 .and_then(|expr| resolve_expr(expr, params))?;
98 if value == Value::Null {
99 return Err(Error::Other(format!(
100 "NOT NULL constraint violated: {}.{}",
101 p.table, column.name
102 )));
103 }
104 } else if column.default.is_none() {
105 return Err(Error::Other(format!(
106 "NOT NULL constraint violated: {}.{}",
107 p.table, column.name
108 )));
109 }
110 }
111 }
112
113 Ok(())
114 }
115 _ => Ok(()),
116 }
117}
118
119fn resolve_expr(expr: &Expr, params: &HashMap<String, Value>) -> Result<Value> {
120 match expr {
121 Expr::Literal(l) => Ok(match l {
122 Literal::Null => Value::Null,
123 Literal::Bool(v) => Value::Bool(*v),
124 Literal::Integer(v) => Value::Int64(*v),
125 Literal::Real(v) => Value::Float64(*v),
126 Literal::Text(v) => {
127 if let Ok(id) = uuid::Uuid::parse_str(v) {
128 Value::Uuid(id)
129 } else {
130 Value::Text(v.clone())
131 }
132 }
133 Literal::Vector(v) => Value::Vector(v.clone()),
134 }),
135 Expr::Parameter(p) => params
136 .get(p)
137 .cloned()
138 .ok_or_else(|| Error::NotFound(format!("missing parameter: {}", p))),
139 Expr::Column(c) => Ok(Value::Text(c.column.clone())),
140 _ => Err(Error::PlanError(
141 "unsupported expression in schema enforcer".to_string(),
142 )),
143 }
144}