Skip to main content

nodedb_sql/planner/
dml.rs

1//! INSERT, UPDATE, DELETE planning.
2
3use sqlparser::ast::{self};
4
5use super::dml_helpers::{
6    convert_value_rows, expr_to_sql_value, extract_point_keys,
7    extract_table_name_from_table_with_joins,
8};
9use crate::engine_rules::{self, DeleteParams, InsertParams, UpdateParams};
10use crate::error::{Result, SqlError};
11use crate::parser::normalize::{normalize_ident, normalize_object_name};
12use crate::resolver::expr::convert_expr;
13use crate::types::*;
14
15/// Classification of an `ON CONFLICT` clause attached to an INSERT.
16enum OnConflict {
17    /// No `ON CONFLICT` clause — plain INSERT (error on duplicate PK).
18    None,
19    /// `ON CONFLICT DO NOTHING` — skip rows that would conflict, no error.
20    DoNothing,
21    /// `ON CONFLICT (...) DO UPDATE SET ...` — apply the assignments against
22    /// the existing row on conflict.
23    DoUpdate(Vec<(String, SqlExpr)>),
24}
25
26fn classify_on_conflict(ins: &ast::Insert) -> Result<OnConflict> {
27    let Some(on) = ins.on.as_ref() else {
28        return Ok(OnConflict::None);
29    };
30    let ast::OnInsert::OnConflict(oc) = on else {
31        return Ok(OnConflict::None);
32    };
33    match &oc.action {
34        ast::OnConflictAction::DoNothing => Ok(OnConflict::DoNothing),
35        ast::OnConflictAction::DoUpdate(do_update) => {
36            let mut pairs = Vec::with_capacity(do_update.assignments.len());
37            for a in &do_update.assignments {
38                let name = match &a.target {
39                    ast::AssignmentTarget::ColumnName(obj) => normalize_object_name(obj),
40                    _ => {
41                        return Err(SqlError::Unsupported {
42                            detail: "ON CONFLICT DO UPDATE SET target must be a column name".into(),
43                        });
44                    }
45                };
46                let expr = convert_expr(&a.value)?;
47                pairs.push((name, expr));
48            }
49            Ok(OnConflict::DoUpdate(pairs))
50        }
51    }
52}
53
54/// Plan an INSERT statement.
55pub fn plan_insert(ins: &ast::Insert, catalog: &dyn SqlCatalog) -> Result<Vec<SqlPlan>> {
56    // `INSERT ... ON CONFLICT DO UPDATE SET` reroutes to the upsert path
57    // with the assignments carried through. `DO NOTHING` stays on the
58    // INSERT path with `if_absent=true`.
59    let if_absent = match classify_on_conflict(ins)? {
60        OnConflict::None => false,
61        OnConflict::DoNothing => true,
62        OnConflict::DoUpdate(updates) => {
63            return plan_upsert_with_on_conflict(ins, catalog, updates);
64        }
65    };
66    let table_name = match &ins.table {
67        ast::TableObject::TableName(name) => normalize_object_name(name),
68        ast::TableObject::TableFunction(_) => {
69            return Err(SqlError::Unsupported {
70                detail: "INSERT INTO table function not supported".into(),
71            });
72        }
73    };
74    let info = catalog
75        .get_collection(&table_name)?
76        .ok_or_else(|| SqlError::UnknownTable {
77            name: table_name.clone(),
78        })?;
79
80    let columns: Vec<String> = ins.columns.iter().map(normalize_ident).collect();
81
82    // Check for INSERT...SELECT.
83    if let Some(source) = &ins.source
84        && let ast::SetExpr::Select(_select) = &*source.body
85    {
86        let source_plan = super::select::plan_query(
87            source,
88            catalog,
89            &crate::functions::registry::FunctionRegistry::new(),
90        )?;
91        return Ok(vec![SqlPlan::InsertSelect {
92            target: table_name,
93            source: Box::new(source_plan),
94            limit: 0,
95        }]);
96    }
97
98    // VALUES clause.
99    let source = ins.source.as_ref().ok_or_else(|| SqlError::Parse {
100        detail: "INSERT requires VALUES or SELECT".into(),
101    })?;
102
103    let rows_ast = match &*source.body {
104        ast::SetExpr::Values(values) => &values.rows,
105        _ => {
106            return Err(SqlError::Unsupported {
107                detail: "INSERT source must be VALUES or SELECT".into(),
108            });
109        }
110    };
111
112    // KV engine: key and value are fundamentally separate — handle directly.
113    if info.engine == EngineType::KeyValue {
114        let intent = if if_absent {
115            KvInsertIntent::InsertIfAbsent
116        } else {
117            KvInsertIntent::Insert
118        };
119        return build_kv_insert_plan(table_name, &columns, rows_ast, intent, Vec::new());
120    }
121
122    // All other engines: delegate to engine rules.
123    let rows = convert_value_rows(&columns, rows_ast)?;
124    let column_defaults: Vec<(String, String)> = info
125        .columns
126        .iter()
127        .filter_map(|c| c.default.as_ref().map(|d| (c.name.clone(), d.clone())))
128        .collect();
129    let rules = engine_rules::resolve_engine_rules(info.engine);
130    rules.plan_insert(InsertParams {
131        collection: table_name,
132        columns,
133        rows,
134        column_defaults,
135        if_absent,
136    })
137}
138
139/// Plan an UPSERT statement (pre-processed from `UPSERT INTO` to `INSERT INTO`).
140///
141/// Same parsing as INSERT but routes through `engine_rules.plan_upsert()`.
142pub fn plan_upsert(ins: &ast::Insert, catalog: &dyn SqlCatalog) -> Result<Vec<SqlPlan>> {
143    let table_name = match &ins.table {
144        ast::TableObject::TableName(name) => normalize_object_name(name),
145        ast::TableObject::TableFunction(_) => {
146            return Err(SqlError::Unsupported {
147                detail: "UPSERT INTO table function not supported".into(),
148            });
149        }
150    };
151    let info = catalog
152        .get_collection(&table_name)?
153        .ok_or_else(|| SqlError::UnknownTable {
154            name: table_name.clone(),
155        })?;
156
157    let columns: Vec<String> = ins.columns.iter().map(normalize_ident).collect();
158
159    let source = ins.source.as_ref().ok_or_else(|| SqlError::Parse {
160        detail: "UPSERT requires VALUES".into(),
161    })?;
162
163    let rows_ast = match &*source.body {
164        ast::SetExpr::Values(values) => &values.rows,
165        _ => {
166            return Err(SqlError::Unsupported {
167                detail: "UPSERT source must be VALUES".into(),
168            });
169        }
170    };
171
172    // KV: upsert is just a PUT (natural overwrite).
173    if info.engine == EngineType::KeyValue {
174        return build_kv_insert_plan(
175            table_name,
176            &columns,
177            rows_ast,
178            KvInsertIntent::Put,
179            Vec::new(),
180        );
181    }
182
183    let rows = convert_value_rows(&columns, rows_ast)?;
184    let column_defaults: Vec<(String, String)> = info
185        .columns
186        .iter()
187        .filter_map(|c| c.default.as_ref().map(|d| (c.name.clone(), d.clone())))
188        .collect();
189    let rules = engine_rules::resolve_engine_rules(info.engine);
190    rules.plan_upsert(engine_rules::UpsertParams {
191        collection: table_name,
192        columns,
193        rows,
194        column_defaults,
195        on_conflict_updates: Vec::new(),
196    })
197}
198
199/// Plan an `INSERT ... ON CONFLICT DO UPDATE SET` statement. Identical to
200/// `plan_upsert` except the assignments are carried onto the upsert plan
201/// so the Data Plane can evaluate them against the existing row instead
202/// of merging the would-be-inserted values.
203fn plan_upsert_with_on_conflict(
204    ins: &ast::Insert,
205    catalog: &dyn SqlCatalog,
206    on_conflict_updates: Vec<(String, SqlExpr)>,
207) -> Result<Vec<SqlPlan>> {
208    let table_name = match &ins.table {
209        ast::TableObject::TableName(name) => normalize_object_name(name),
210        ast::TableObject::TableFunction(_) => {
211            return Err(SqlError::Unsupported {
212                detail: "INSERT ... ON CONFLICT on a table function is not supported".into(),
213            });
214        }
215    };
216    let info = catalog
217        .get_collection(&table_name)?
218        .ok_or_else(|| SqlError::UnknownTable {
219            name: table_name.clone(),
220        })?;
221
222    let columns: Vec<String> = ins.columns.iter().map(normalize_ident).collect();
223
224    let source = ins.source.as_ref().ok_or_else(|| SqlError::Parse {
225        detail: "INSERT ... ON CONFLICT requires VALUES".into(),
226    })?;
227    let rows_ast = match &*source.body {
228        ast::SetExpr::Values(values) => &values.rows,
229        _ => {
230            return Err(SqlError::Unsupported {
231                detail: "INSERT ... ON CONFLICT source must be VALUES".into(),
232            });
233        }
234    };
235
236    // KV: `INSERT ... ON CONFLICT (key) DO UPDATE SET ...` is an opt-in
237    // overwrite — same physical semantics as UPSERT, with the optional
238    // per-row assignments carried through for the Data Plane to apply
239    // against the existing row.
240    if info.engine == EngineType::KeyValue {
241        return build_kv_insert_plan(
242            table_name,
243            &columns,
244            rows_ast,
245            KvInsertIntent::Put,
246            on_conflict_updates,
247        );
248    }
249
250    let rows = convert_value_rows(&columns, rows_ast)?;
251    let column_defaults: Vec<(String, String)> = info
252        .columns
253        .iter()
254        .filter_map(|c| c.default.as_ref().map(|d| (c.name.clone(), d.clone())))
255        .collect();
256    let rules = engine_rules::resolve_engine_rules(info.engine);
257    rules.plan_upsert(engine_rules::UpsertParams {
258        collection: table_name,
259        columns,
260        rows,
261        column_defaults,
262        on_conflict_updates,
263    })
264}
265
266/// Build a `SqlPlan::KvInsert` from a VALUES clause. Shared by plain INSERT,
267/// UPSERT, and `INSERT ... ON CONFLICT (key) DO UPDATE` — the three paths
268/// differ only in `intent` and `on_conflict_updates`, never in how entries
269/// are extracted from the row exprs.
270fn build_kv_insert_plan(
271    table_name: String,
272    columns: &[String],
273    rows_ast: &[Vec<ast::Expr>],
274    intent: KvInsertIntent,
275    on_conflict_updates: Vec<(String, SqlExpr)>,
276) -> Result<Vec<SqlPlan>> {
277    let key_idx = columns.iter().position(|c| c == "key");
278    let ttl_idx = columns.iter().position(|c| c == "ttl");
279    let mut entries = Vec::with_capacity(rows_ast.len());
280    let mut ttl_secs: u64 = 0;
281    for row_exprs in rows_ast {
282        let key_val = match key_idx {
283            Some(idx) => expr_to_sql_value(&row_exprs[idx])?,
284            None => SqlValue::String(String::new()),
285        };
286        if let Some(idx) = ttl_idx {
287            match expr_to_sql_value(&row_exprs[idx]) {
288                Ok(SqlValue::Int(n)) => ttl_secs = n.max(0) as u64,
289                Ok(SqlValue::Float(f)) => ttl_secs = f.max(0.0) as u64,
290                _ => {}
291            }
292        }
293        let value_cols: Vec<(String, SqlValue)> = columns
294            .iter()
295            .enumerate()
296            .filter(|(i, _)| Some(*i) != key_idx && Some(*i) != ttl_idx)
297            .map(|(i, col)| {
298                let val = expr_to_sql_value(&row_exprs[i])?;
299                Ok((col.clone(), val))
300            })
301            .collect::<Result<Vec<_>>>()?;
302        entries.push((key_val, value_cols));
303    }
304    Ok(vec![SqlPlan::KvInsert {
305        collection: table_name,
306        entries,
307        ttl_secs,
308        intent,
309        on_conflict_updates,
310    }])
311}
312
313/// Plan an UPDATE statement.
314pub fn plan_update(stmt: &ast::Statement, catalog: &dyn SqlCatalog) -> Result<Vec<SqlPlan>> {
315    let ast::Statement::Update(update) = stmt else {
316        return Err(SqlError::Parse {
317            detail: "expected UPDATE statement".into(),
318        });
319    };
320
321    let table_name = extract_table_name_from_table_with_joins(&update.table)?;
322    let info = catalog
323        .get_collection(&table_name)?
324        .ok_or_else(|| SqlError::UnknownTable {
325            name: table_name.clone(),
326        })?;
327
328    let assigns: Vec<(String, SqlExpr)> = update
329        .assignments
330        .iter()
331        .map(|a| {
332            let col = match &a.target {
333                ast::AssignmentTarget::ColumnName(name) => normalize_object_name(name),
334                ast::AssignmentTarget::Tuple(names) => names
335                    .iter()
336                    .map(normalize_object_name)
337                    .collect::<Vec<_>>()
338                    .join(","),
339            };
340            let val = convert_expr(&a.value)?;
341            Ok((col, val))
342        })
343        .collect::<Result<_>>()?;
344
345    let filters = match &update.selection {
346        Some(expr) => super::select::convert_where_to_filters(expr)?,
347        None => Vec::new(),
348    };
349
350    // Detect point updates (WHERE pk = literal).
351    let target_keys = extract_point_keys(update.selection.as_ref(), &info);
352
353    let rules = engine_rules::resolve_engine_rules(info.engine);
354    rules.plan_update(UpdateParams {
355        collection: table_name,
356        assignments: assigns,
357        filters,
358        target_keys,
359        returning: update.returning.is_some(),
360    })
361}
362
363/// Plan a DELETE statement.
364pub fn plan_delete(stmt: &ast::Statement, catalog: &dyn SqlCatalog) -> Result<Vec<SqlPlan>> {
365    let ast::Statement::Delete(delete) = stmt else {
366        return Err(SqlError::Parse {
367            detail: "expected DELETE statement".into(),
368        });
369    };
370
371    let from_tables = match &delete.from {
372        ast::FromTable::WithFromKeyword(tables) | ast::FromTable::WithoutKeyword(tables) => tables,
373    };
374    let table_name =
375        extract_table_name_from_table_with_joins(from_tables.first().ok_or_else(|| {
376            SqlError::Parse {
377                detail: "DELETE requires a FROM table".into(),
378            }
379        })?)?;
380    let info = catalog
381        .get_collection(&table_name)?
382        .ok_or_else(|| SqlError::UnknownTable {
383            name: table_name.clone(),
384        })?;
385
386    let filters = match &delete.selection {
387        Some(expr) => super::select::convert_where_to_filters(expr)?,
388        None => Vec::new(),
389    };
390
391    let target_keys = extract_point_keys(delete.selection.as_ref(), &info);
392
393    let rules = engine_rules::resolve_engine_rules(info.engine);
394    rules.plan_delete(DeleteParams {
395        collection: table_name,
396        filters,
397        target_keys,
398    })
399}
400
401/// Plan a TRUNCATE statement.
402pub fn plan_truncate_stmt(stmt: &ast::Statement) -> Result<Vec<SqlPlan>> {
403    let ast::Statement::Truncate(truncate) = stmt else {
404        return Err(SqlError::Parse {
405            detail: "expected TRUNCATE statement".into(),
406        });
407    };
408    let restart_identity = matches!(
409        truncate.identity,
410        Some(sqlparser::ast::TruncateIdentityOption::Restart)
411    );
412    truncate
413        .table_names
414        .iter()
415        .map(|t| {
416            Ok(SqlPlan::Truncate {
417                collection: normalize_object_name(&t.name),
418                restart_identity,
419            })
420        })
421        .collect()
422}
423
424// ── Helpers extracted to `dml_helpers.rs` to keep this file under 500 lines.