Skip to main content

nodedb_sql/planner/
dml.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! INSERT, UPDATE, DELETE planning.
4
5use nodedb_types::DatabaseId;
6use sqlparser::ast::{self};
7
8use super::dml_helpers::{
9    build_kv_insert_plan, build_vector_primary_insert_plan, convert_value_rows,
10};
11use crate::engine_rules::{self, InsertParams};
12use crate::error::{Result, SqlError};
13use crate::parser::normalize::{normalize_ident, normalize_object_name_checked};
14use crate::resolver::expr::convert_expr;
15use crate::types::*;
16
17pub use dml_update_delete::{plan_delete, plan_truncate_stmt, plan_update};
18
19#[path = "dml_update_delete.rs"]
20mod dml_update_delete;
21
22/// Classification of an `ON CONFLICT` clause attached to an INSERT.
23enum OnConflict {
24    /// No `ON CONFLICT` clause — plain INSERT (error on duplicate PK).
25    None,
26    /// `ON CONFLICT DO NOTHING` — skip rows that would conflict, no error.
27    DoNothing,
28    /// `ON CONFLICT (...) DO UPDATE SET ...` — apply the assignments against
29    /// the existing row on conflict.
30    DoUpdate(Vec<(String, SqlExpr)>),
31}
32
33fn classify_on_conflict(ins: &ast::Insert) -> Result<OnConflict> {
34    let Some(on) = ins.on.as_ref() else {
35        return Ok(OnConflict::None);
36    };
37    let ast::OnInsert::OnConflict(oc) = on else {
38        return Ok(OnConflict::None);
39    };
40    match &oc.action {
41        ast::OnConflictAction::DoNothing => Ok(OnConflict::DoNothing),
42        ast::OnConflictAction::DoUpdate(do_update) => {
43            let mut pairs = Vec::with_capacity(do_update.assignments.len());
44            for a in &do_update.assignments {
45                let name = match &a.target {
46                    ast::AssignmentTarget::ColumnName(obj) => normalize_object_name_checked(obj)?,
47                    _ => {
48                        return Err(SqlError::Unsupported {
49                            detail: "ON CONFLICT DO UPDATE SET target must be a column name".into(),
50                        });
51                    }
52                };
53                let expr = convert_expr(&a.value)?;
54                pairs.push((name, expr));
55            }
56            Ok(OnConflict::DoUpdate(pairs))
57        }
58    }
59}
60
61/// Plan an INSERT statement.
62pub fn plan_insert(ins: &ast::Insert, catalog: &dyn SqlCatalog) -> Result<Vec<SqlPlan>> {
63    // `INSERT ... ON CONFLICT DO UPDATE SET` reroutes to the upsert path
64    // with the assignments carried through. `DO NOTHING` stays on the
65    // INSERT path with `if_absent=true`.
66    let if_absent = match classify_on_conflict(ins)? {
67        OnConflict::None => false,
68        OnConflict::DoNothing => true,
69        OnConflict::DoUpdate(updates) => {
70            return plan_upsert_with_on_conflict(ins, catalog, updates);
71        }
72    };
73    let table_name = match &ins.table {
74        ast::TableObject::TableName(name) => normalize_object_name_checked(name)?,
75        ast::TableObject::TableFunction(_) => {
76            return Err(SqlError::Unsupported {
77                detail: "INSERT INTO table function not supported".into(),
78            });
79        }
80    };
81    let info = catalog
82        .get_collection(DatabaseId::DEFAULT, &table_name)?
83        .ok_or_else(|| SqlError::UnknownTable {
84            name: table_name.clone(),
85        })?;
86
87    let columns: Vec<String> = ins.columns.iter().map(normalize_ident).collect();
88
89    // Check for INSERT...SELECT.
90    if let Some(source) = &ins.source
91        && let ast::SetExpr::Select(_select) = &*source.body
92    {
93        let source_plan = super::select::plan_query(
94            source,
95            catalog,
96            &crate::functions::registry::FunctionRegistry::new(),
97            crate::TemporalScope::default(),
98        )?;
99        return Ok(vec![SqlPlan::InsertSelect {
100            target: table_name,
101            source: Box::new(source_plan),
102            limit: 0,
103        }]);
104    }
105
106    // VALUES clause.
107    let source = ins.source.as_ref().ok_or_else(|| SqlError::Parse {
108        detail: "INSERT requires VALUES or SELECT".into(),
109    })?;
110
111    let rows_ast = match &*source.body {
112        ast::SetExpr::Values(values) => &values.rows,
113        _ => {
114            return Err(SqlError::Unsupported {
115                detail: "INSERT source must be VALUES or SELECT".into(),
116            });
117        }
118    };
119
120    // KV engine: key and value are fundamentally separate — handle directly.
121    if info.engine == EngineType::KeyValue {
122        let intent = if if_absent {
123            KvInsertIntent::InsertIfAbsent
124        } else {
125            KvInsertIntent::Insert
126        };
127        return build_kv_insert_plan(
128            table_name,
129            &columns,
130            rows_ast,
131            intent,
132            Vec::new(),
133            info.primary_key.as_deref(),
134        );
135    }
136
137    // Vector-primary collection: bypass document encoding.
138    if info.primary == nodedb_types::PrimaryEngine::Vector
139        && let Some(ref vpc) = info.vector_primary
140    {
141        let rows_parsed = convert_value_rows(&columns, rows_ast)?;
142        return build_vector_primary_insert_plan(&table_name, vpc, &columns, rows_parsed);
143    }
144
145    // All other engines: delegate to engine rules.
146    let rows = convert_value_rows(&columns, rows_ast)?;
147    let column_defaults: Vec<(String, String)> = info
148        .columns
149        .iter()
150        .filter_map(|c| c.default.as_ref().map(|d| (c.name.clone(), d.clone())))
151        .collect();
152    let rules = engine_rules::resolve_engine_rules(info.engine);
153    rules.plan_insert(InsertParams {
154        collection: table_name,
155        columns,
156        rows,
157        column_defaults,
158        if_absent,
159    })
160}
161
162/// Plan an UPSERT statement (pre-processed from `UPSERT INTO` to `INSERT INTO`).
163///
164/// Same parsing as INSERT but routes through `engine_rules.plan_upsert()`.
165pub fn plan_upsert(ins: &ast::Insert, catalog: &dyn SqlCatalog) -> Result<Vec<SqlPlan>> {
166    let table_name = match &ins.table {
167        ast::TableObject::TableName(name) => normalize_object_name_checked(name)?,
168        ast::TableObject::TableFunction(_) => {
169            return Err(SqlError::Unsupported {
170                detail: "UPSERT INTO table function not supported".into(),
171            });
172        }
173    };
174    let info = catalog
175        .get_collection(DatabaseId::DEFAULT, &table_name)?
176        .ok_or_else(|| SqlError::UnknownTable {
177            name: table_name.clone(),
178        })?;
179
180    let columns: Vec<String> = ins.columns.iter().map(normalize_ident).collect();
181
182    let source = ins.source.as_ref().ok_or_else(|| SqlError::Parse {
183        detail: "UPSERT requires VALUES".into(),
184    })?;
185
186    let rows_ast = match &*source.body {
187        ast::SetExpr::Values(values) => &values.rows,
188        _ => {
189            return Err(SqlError::Unsupported {
190                detail: "UPSERT source must be VALUES".into(),
191            });
192        }
193    };
194
195    // KV: upsert is just a PUT (natural overwrite).
196    if info.engine == EngineType::KeyValue {
197        return build_kv_insert_plan(
198            table_name,
199            &columns,
200            rows_ast,
201            KvInsertIntent::Put,
202            Vec::new(),
203            info.primary_key.as_deref(),
204        );
205    }
206
207    let rows = convert_value_rows(&columns, rows_ast)?;
208    let column_defaults: Vec<(String, String)> = info
209        .columns
210        .iter()
211        .filter_map(|c| c.default.as_ref().map(|d| (c.name.clone(), d.clone())))
212        .collect();
213    let rules = engine_rules::resolve_engine_rules(info.engine);
214    rules.plan_upsert(engine_rules::UpsertParams {
215        collection: table_name,
216        columns,
217        rows,
218        column_defaults,
219        on_conflict_updates: Vec::new(),
220    })
221}
222
223/// Plan an `INSERT ... ON CONFLICT DO UPDATE SET` statement.
224fn plan_upsert_with_on_conflict(
225    ins: &ast::Insert,
226    catalog: &dyn SqlCatalog,
227    on_conflict_updates: Vec<(String, SqlExpr)>,
228) -> Result<Vec<SqlPlan>> {
229    let table_name = match &ins.table {
230        ast::TableObject::TableName(name) => normalize_object_name_checked(name)?,
231        ast::TableObject::TableFunction(_) => {
232            return Err(SqlError::Unsupported {
233                detail: "INSERT ... ON CONFLICT on a table function is not supported".into(),
234            });
235        }
236    };
237    let info = catalog
238        .get_collection(DatabaseId::DEFAULT, &table_name)?
239        .ok_or_else(|| SqlError::UnknownTable {
240            name: table_name.clone(),
241        })?;
242
243    let columns: Vec<String> = ins.columns.iter().map(normalize_ident).collect();
244
245    let source = ins.source.as_ref().ok_or_else(|| SqlError::Parse {
246        detail: "INSERT ... ON CONFLICT requires VALUES".into(),
247    })?;
248    let rows_ast = match &*source.body {
249        ast::SetExpr::Values(values) => &values.rows,
250        _ => {
251            return Err(SqlError::Unsupported {
252                detail: "INSERT ... ON CONFLICT source must be VALUES".into(),
253            });
254        }
255    };
256
257    // KV: `INSERT ... ON CONFLICT (key) DO UPDATE SET ...` is an opt-in
258    // overwrite — same physical semantics as UPSERT, with the optional
259    // per-row assignments carried through for the Data Plane to apply
260    // against the existing row.
261    if info.engine == EngineType::KeyValue {
262        return build_kv_insert_plan(
263            table_name,
264            &columns,
265            rows_ast,
266            KvInsertIntent::Put,
267            on_conflict_updates,
268            info.primary_key.as_deref(),
269        );
270    }
271
272    let rows = convert_value_rows(&columns, rows_ast)?;
273    let column_defaults: Vec<(String, String)> = info
274        .columns
275        .iter()
276        .filter_map(|c| c.default.as_ref().map(|d| (c.name.clone(), d.clone())))
277        .collect();
278    let rules = engine_rules::resolve_engine_rules(info.engine);
279    rules.plan_upsert(engine_rules::UpsertParams {
280        collection: table_name,
281        columns,
282        rows,
283        column_defaults,
284        on_conflict_updates,
285    })
286}