Skip to main content

contextdb_engine/
schema_enforcer.rs

1use crate::database::Database;
2use contextdb_core::{Error, Result, Value};
3use contextdb_parser::ast::{Expr, Literal};
4use contextdb_planner::PhysicalPlan;
5use std::collections::HashMap;
6
7pub fn validate_dml(
8    plan: &PhysicalPlan,
9    db: &Database,
10    params: &HashMap<String, Value>,
11) -> Result<()> {
12    match plan {
13        PhysicalPlan::Update(p) => {
14            // Error-ordering precedence on UPDATE (per I13):
15            //   1. Table does not exist
16            //   2. Column in SET list does not exist on the table
17            //   3. Table-level ImmutableTable rejection
18            //   4. Column-level ImmutableColumn rejection (first flagged column in SET-list order)
19            //   5. STATE MACHINE / NOT NULL / coerce (handled later in executor)
20            let metas = db.relational_store().table_meta.read();
21            let table_meta = metas
22                .get(&p.table)
23                .ok_or_else(|| Error::TableNotFound(p.table.clone()))?;
24
25            // (2) Unknown-column check on every SET target.
26            for (col_name, _) in &p.assignments {
27                if !table_meta.columns.iter().any(|c| c.name == *col_name) {
28                    return Err(Error::Other(format!(
29                        "column '{}' does not exist on table '{}'",
30                        col_name, p.table
31                    )));
32                }
33            }
34
35            // (3) Table-level IMMUTABLE short-circuits column-level checks.
36            if table_meta.immutable {
37                return Err(Error::ImmutableTable(p.table.clone()));
38            }
39
40            // (4) Column-level IMMUTABLE: first flagged column in SET-list order wins.
41            for (col_name, _) in &p.assignments {
42                if let Some(col) = table_meta.columns.iter().find(|c| c.name == *col_name)
43                    && col.immutable
44                {
45                    return Err(Error::ImmutableColumn {
46                        table: p.table.clone(),
47                        column: col_name.clone(),
48                    });
49                }
50            }
51
52            Ok(())
53        }
54        PhysicalPlan::Delete(p) => {
55            let metas = db.relational_store().table_meta.read();
56            if metas.get(&p.table).is_some_and(|meta| meta.immutable) {
57                Err(Error::ImmutableTable(p.table.clone()))
58            } else {
59                Ok(())
60            }
61        }
62        PhysicalPlan::Insert(p) => {
63            let metas = db.relational_store().table_meta.read();
64            let table_meta = metas
65                .get(&p.table)
66                .ok_or_else(|| Error::TableNotFound(p.table.clone()))?;
67
68            // When no column list is provided, infer from table metadata.
69            let columns: Vec<String> = if p.columns.is_empty() {
70                table_meta.columns.iter().map(|c| c.name.clone()).collect()
71            } else {
72                p.columns.clone()
73            };
74
75            // Validate that all INSERT columns exist in the table schema
76            for col_name in &columns {
77                if !table_meta.columns.iter().any(|c| c.name == *col_name) {
78                    return Err(Error::Other(format!(
79                        "column '{}' does not exist in table '{}'",
80                        col_name, p.table
81                    )));
82                }
83            }
84
85            for row in &p.values {
86                for column in table_meta
87                    .columns
88                    .iter()
89                    .filter(|column| !column.nullable && !column.primary_key)
90                {
91                    if let Some(index) = columns.iter().position(|name| name == &column.name) {
92                        let value = row
93                            .get(index)
94                            .ok_or_else(|| {
95                                Error::PlanError("column/value count mismatch".to_string())
96                            })
97                            .and_then(|expr| resolve_expr(expr, params))?;
98                        if value == Value::Null {
99                            return Err(Error::Other(format!(
100                                "NOT NULL constraint violated: {}.{}",
101                                p.table, column.name
102                            )));
103                        }
104                    } else if column.default.is_none() {
105                        return Err(Error::Other(format!(
106                            "NOT NULL constraint violated: {}.{}",
107                            p.table, column.name
108                        )));
109                    }
110                }
111            }
112
113            Ok(())
114        }
115        _ => Ok(()),
116    }
117}
118
119fn resolve_expr(expr: &Expr, params: &HashMap<String, Value>) -> Result<Value> {
120    match expr {
121        Expr::Literal(l) => Ok(match l {
122            Literal::Null => Value::Null,
123            Literal::Bool(v) => Value::Bool(*v),
124            Literal::Integer(v) => Value::Int64(*v),
125            Literal::Real(v) => Value::Float64(*v),
126            Literal::Text(v) => {
127                if let Ok(id) = uuid::Uuid::parse_str(v) {
128                    Value::Uuid(id)
129                } else {
130                    Value::Text(v.clone())
131                }
132            }
133            Literal::Vector(v) => Value::Vector(v.clone()),
134        }),
135        Expr::Parameter(p) => params
136            .get(p)
137            .cloned()
138            .ok_or_else(|| Error::NotFound(format!("missing parameter: {}", p))),
139        Expr::Column(c) => Ok(Value::Text(c.column.clone())),
140        _ => Err(Error::PlanError(
141            "unsupported expression in schema enforcer".to_string(),
142        )),
143    }
144}