Skip to main content

nodedb_sql/planner/
dml.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! INSERT, UPDATE, DELETE planning.
4
5use nodedb_types::DatabaseId;
6use sqlparser::ast::{self};
7
8use super::dml_helpers::{
9    build_kv_insert_plan, build_vector_primary_insert_plan, convert_value_rows,
10};
11use crate::engine_rules::{self, InsertParams};
12use crate::error::{Result, SqlError};
13use crate::parser::normalize::{normalize_ident, normalize_object_name_checked};
14use crate::resolver::expr::convert_expr;
15use crate::types::*;
16
17pub use dml_update_delete::{plan_delete, plan_truncate_stmt, plan_update};
18
19#[path = "dml_update_delete.rs"]
20mod dml_update_delete;
21
22/// Classification of an `ON CONFLICT` clause attached to an INSERT.
23enum OnConflict {
24    /// No `ON CONFLICT` clause — plain INSERT (error on duplicate PK).
25    None,
26    /// `ON CONFLICT DO NOTHING` — skip rows that would conflict, no error.
27    DoNothing,
28    /// `ON CONFLICT (...) DO UPDATE SET ...` — apply the assignments against
29    /// the existing row on conflict.
30    DoUpdate(Vec<(String, SqlExpr)>),
31}
32
33fn classify_on_conflict(ins: &ast::Insert) -> Result<OnConflict> {
34    let Some(on) = ins.on.as_ref() else {
35        return Ok(OnConflict::None);
36    };
37    let ast::OnInsert::OnConflict(oc) = on else {
38        return Ok(OnConflict::None);
39    };
40    match &oc.action {
41        ast::OnConflictAction::DoNothing => Ok(OnConflict::DoNothing),
42        ast::OnConflictAction::DoUpdate(do_update) => {
43            let mut pairs = Vec::with_capacity(do_update.assignments.len());
44            for a in &do_update.assignments {
45                let name = match &a.target {
46                    ast::AssignmentTarget::ColumnName(obj) => normalize_object_name_checked(obj)?,
47                    _ => {
48                        return Err(SqlError::Unsupported {
49                            detail: "ON CONFLICT DO UPDATE SET target must be a column name".into(),
50                        });
51                    }
52                };
53                let expr = convert_expr(&a.value)?;
54                pairs.push((name, expr));
55            }
56            Ok(OnConflict::DoUpdate(pairs))
57        }
58    }
59}
60
61/// Plan an INSERT statement.
62pub fn plan_insert(ins: &ast::Insert, catalog: &dyn SqlCatalog) -> Result<Vec<SqlPlan>> {
63    // `INSERT ... ON CONFLICT DO UPDATE SET` reroutes to the upsert path
64    // with the assignments carried through. `DO NOTHING` stays on the
65    // INSERT path with `if_absent=true`.
66    let if_absent = match classify_on_conflict(ins)? {
67        OnConflict::None => false,
68        OnConflict::DoNothing => true,
69        OnConflict::DoUpdate(updates) => {
70            return plan_upsert_with_on_conflict(ins, catalog, updates);
71        }
72    };
73    let table_name = match &ins.table {
74        ast::TableObject::TableName(name) => normalize_object_name_checked(name)?,
75        ast::TableObject::TableFunction(_) => {
76            return Err(SqlError::Unsupported {
77                detail: "INSERT INTO table function not supported".into(),
78            });
79        }
80    };
81    let info = catalog
82        .get_collection(DatabaseId::DEFAULT, &table_name)?
83        .ok_or_else(|| SqlError::UnknownTable {
84            name: table_name.clone(),
85        })?;
86
87    let columns: Vec<String> = ins.columns.iter().map(normalize_ident).collect();
88
89    // Check for INSERT...SELECT.
90    if let Some(source) = &ins.source
91        && let ast::SetExpr::Select(_select) = &*source.body
92    {
93        let source_plan = super::select::plan_query(
94            source,
95            catalog,
96            &crate::functions::registry::FunctionRegistry::new(),
97            crate::TemporalScope::default(),
98        )?;
99        return Ok(vec![SqlPlan::InsertSelect {
100            target: table_name,
101            source: Box::new(source_plan),
102            limit: 0,
103        }]);
104    }
105
106    // VALUES clause.
107    let source = ins.source.as_ref().ok_or_else(|| SqlError::Parse {
108        detail: "INSERT requires VALUES or SELECT".into(),
109    })?;
110
111    let rows_ast = match &*source.body {
112        ast::SetExpr::Values(values) => &values.rows,
113        _ => {
114            return Err(SqlError::Unsupported {
115                detail: "INSERT source must be VALUES or SELECT".into(),
116            });
117        }
118    };
119
120    // KV engine: key and value are fundamentally separate — handle directly.
121    if info.engine == EngineType::KeyValue {
122        let intent = if if_absent {
123            KvInsertIntent::InsertIfAbsent
124        } else {
125            KvInsertIntent::Insert
126        };
127        return build_kv_insert_plan(
128            table_name,
129            &columns,
130            rows_ast,
131            intent,
132            Vec::new(),
133            info.primary_key.as_deref(),
134        );
135    }
136
137    // Vector-primary collection: bypass document encoding.
138    if info.primary == nodedb_types::PrimaryEngine::Vector
139        && let Some(ref vpc) = info.vector_primary
140    {
141        let rows_parsed = convert_value_rows(&columns, rows_ast)?;
142        return build_vector_primary_insert_plan(&table_name, vpc, &columns, rows_parsed);
143    }
144
145    // All other engines: delegate to engine rules.
146    let rows = convert_value_rows(&columns, rows_ast)?;
147    let column_defaults: Vec<(String, String)> = info
148        .columns
149        .iter()
150        .filter_map(|c| c.default.as_ref().map(|d| (c.name.clone(), d.clone())))
151        .collect();
152    let column_schema: Vec<(String, String)> = info
153        .columns
154        .iter()
155        .filter_map(|c| c.raw_type.as_ref().map(|t| (c.name.clone(), t.clone())))
156        .collect();
157    let rules = engine_rules::resolve_engine_rules(info.engine);
158    rules.plan_insert(InsertParams {
159        collection: table_name,
160        columns,
161        rows,
162        column_defaults,
163        if_absent,
164        column_schema,
165    })
166}
167
168/// Plan an UPSERT statement (pre-processed from `UPSERT INTO` to `INSERT INTO`).
169///
170/// Same parsing as INSERT but routes through `engine_rules.plan_upsert()`.
171pub fn plan_upsert(ins: &ast::Insert, catalog: &dyn SqlCatalog) -> Result<Vec<SqlPlan>> {
172    let table_name = match &ins.table {
173        ast::TableObject::TableName(name) => normalize_object_name_checked(name)?,
174        ast::TableObject::TableFunction(_) => {
175            return Err(SqlError::Unsupported {
176                detail: "UPSERT INTO table function not supported".into(),
177            });
178        }
179    };
180    let info = catalog
181        .get_collection(DatabaseId::DEFAULT, &table_name)?
182        .ok_or_else(|| SqlError::UnknownTable {
183            name: table_name.clone(),
184        })?;
185
186    let columns: Vec<String> = ins.columns.iter().map(normalize_ident).collect();
187
188    let source = ins.source.as_ref().ok_or_else(|| SqlError::Parse {
189        detail: "UPSERT requires VALUES".into(),
190    })?;
191
192    let rows_ast = match &*source.body {
193        ast::SetExpr::Values(values) => &values.rows,
194        _ => {
195            return Err(SqlError::Unsupported {
196                detail: "UPSERT source must be VALUES".into(),
197            });
198        }
199    };
200
201    // KV: upsert is just a PUT (natural overwrite).
202    if info.engine == EngineType::KeyValue {
203        return build_kv_insert_plan(
204            table_name,
205            &columns,
206            rows_ast,
207            KvInsertIntent::Put,
208            Vec::new(),
209            info.primary_key.as_deref(),
210        );
211    }
212
213    let rows = convert_value_rows(&columns, rows_ast)?;
214    let column_defaults: Vec<(String, String)> = info
215        .columns
216        .iter()
217        .filter_map(|c| c.default.as_ref().map(|d| (c.name.clone(), d.clone())))
218        .collect();
219    let column_schema: Vec<(String, String)> = info
220        .columns
221        .iter()
222        .filter_map(|c| c.raw_type.as_ref().map(|t| (c.name.clone(), t.clone())))
223        .collect();
224    let rules = engine_rules::resolve_engine_rules(info.engine);
225    rules.plan_upsert(engine_rules::UpsertParams {
226        collection: table_name,
227        columns,
228        rows,
229        column_defaults,
230        on_conflict_updates: Vec::new(),
231        column_schema,
232    })
233}
234
235/// Plan an `INSERT ... ON CONFLICT DO UPDATE SET` statement.
236fn plan_upsert_with_on_conflict(
237    ins: &ast::Insert,
238    catalog: &dyn SqlCatalog,
239    on_conflict_updates: Vec<(String, SqlExpr)>,
240) -> Result<Vec<SqlPlan>> {
241    let table_name = match &ins.table {
242        ast::TableObject::TableName(name) => normalize_object_name_checked(name)?,
243        ast::TableObject::TableFunction(_) => {
244            return Err(SqlError::Unsupported {
245                detail: "INSERT ... ON CONFLICT on a table function is not supported".into(),
246            });
247        }
248    };
249    let info = catalog
250        .get_collection(DatabaseId::DEFAULT, &table_name)?
251        .ok_or_else(|| SqlError::UnknownTable {
252            name: table_name.clone(),
253        })?;
254
255    let columns: Vec<String> = ins.columns.iter().map(normalize_ident).collect();
256
257    let source = ins.source.as_ref().ok_or_else(|| SqlError::Parse {
258        detail: "INSERT ... ON CONFLICT requires VALUES".into(),
259    })?;
260    let rows_ast = match &*source.body {
261        ast::SetExpr::Values(values) => &values.rows,
262        _ => {
263            return Err(SqlError::Unsupported {
264                detail: "INSERT ... ON CONFLICT source must be VALUES".into(),
265            });
266        }
267    };
268
269    // KV: `INSERT ... ON CONFLICT (key) DO UPDATE SET ...` is an opt-in
270    // overwrite — same physical semantics as UPSERT, with the optional
271    // per-row assignments carried through for the Data Plane to apply
272    // against the existing row.
273    if info.engine == EngineType::KeyValue {
274        return build_kv_insert_plan(
275            table_name,
276            &columns,
277            rows_ast,
278            KvInsertIntent::Put,
279            on_conflict_updates,
280            info.primary_key.as_deref(),
281        );
282    }
283
284    let rows = convert_value_rows(&columns, rows_ast)?;
285    let column_defaults: Vec<(String, String)> = info
286        .columns
287        .iter()
288        .filter_map(|c| c.default.as_ref().map(|d| (c.name.clone(), d.clone())))
289        .collect();
290    let column_schema: Vec<(String, String)> = info
291        .columns
292        .iter()
293        .filter_map(|c| c.raw_type.as_ref().map(|t| (c.name.clone(), t.clone())))
294        .collect();
295    let rules = engine_rules::resolve_engine_rules(info.engine);
296    rules.plan_upsert(engine_rules::UpsertParams {
297        collection: table_name,
298        columns,
299        rows,
300        column_defaults,
301        on_conflict_updates,
302        column_schema,
303    })
304}