1use nodedb_types::DatabaseId;
6use sqlparser::ast::{self};
7
8use super::dml_helpers::{
9 build_kv_insert_plan, build_vector_primary_insert_plan, convert_value_rows,
10};
11use crate::engine_rules::{self, InsertParams};
12use crate::error::{Result, SqlError};
13use crate::parser::normalize::{normalize_ident, normalize_object_name_checked};
14use crate::resolver::expr::convert_expr;
15use crate::types::*;
16
17pub use dml_update_delete::{plan_delete, plan_truncate_stmt, plan_update};
18
19#[path = "dml_update_delete.rs"]
20mod dml_update_delete;
21
22enum OnConflict {
24 None,
26 DoNothing,
28 DoUpdate(Vec<(String, SqlExpr)>),
31}
32
33fn classify_on_conflict(ins: &ast::Insert) -> Result<OnConflict> {
34 let Some(on) = ins.on.as_ref() else {
35 return Ok(OnConflict::None);
36 };
37 let ast::OnInsert::OnConflict(oc) = on else {
38 return Ok(OnConflict::None);
39 };
40 match &oc.action {
41 ast::OnConflictAction::DoNothing => Ok(OnConflict::DoNothing),
42 ast::OnConflictAction::DoUpdate(do_update) => {
43 let mut pairs = Vec::with_capacity(do_update.assignments.len());
44 for a in &do_update.assignments {
45 let name = match &a.target {
46 ast::AssignmentTarget::ColumnName(obj) => normalize_object_name_checked(obj)?,
47 _ => {
48 return Err(SqlError::Unsupported {
49 detail: "ON CONFLICT DO UPDATE SET target must be a column name".into(),
50 });
51 }
52 };
53 let expr = convert_expr(&a.value)?;
54 pairs.push((name, expr));
55 }
56 Ok(OnConflict::DoUpdate(pairs))
57 }
58 }
59}
60
61pub fn plan_insert(ins: &ast::Insert, catalog: &dyn SqlCatalog) -> Result<Vec<SqlPlan>> {
63 let if_absent = match classify_on_conflict(ins)? {
67 OnConflict::None => false,
68 OnConflict::DoNothing => true,
69 OnConflict::DoUpdate(updates) => {
70 return plan_upsert_with_on_conflict(ins, catalog, updates);
71 }
72 };
73 let table_name = match &ins.table {
74 ast::TableObject::TableName(name) => normalize_object_name_checked(name)?,
75 ast::TableObject::TableFunction(_) => {
76 return Err(SqlError::Unsupported {
77 detail: "INSERT INTO table function not supported".into(),
78 });
79 }
80 };
81 let info = catalog
82 .get_collection(DatabaseId::DEFAULT, &table_name)?
83 .ok_or_else(|| SqlError::UnknownTable {
84 name: table_name.clone(),
85 })?;
86
87 let columns: Vec<String> = ins.columns.iter().map(normalize_ident).collect();
88
89 if let Some(source) = &ins.source
91 && let ast::SetExpr::Select(_select) = &*source.body
92 {
93 let source_plan = super::select::plan_query(
94 source,
95 catalog,
96 &crate::functions::registry::FunctionRegistry::new(),
97 crate::TemporalScope::default(),
98 )?;
99 return Ok(vec![SqlPlan::InsertSelect {
100 target: table_name,
101 source: Box::new(source_plan),
102 limit: 0,
103 }]);
104 }
105
106 let source = ins.source.as_ref().ok_or_else(|| SqlError::Parse {
108 detail: "INSERT requires VALUES or SELECT".into(),
109 })?;
110
111 let rows_ast = match &*source.body {
112 ast::SetExpr::Values(values) => &values.rows,
113 _ => {
114 return Err(SqlError::Unsupported {
115 detail: "INSERT source must be VALUES or SELECT".into(),
116 });
117 }
118 };
119
120 if info.engine == EngineType::KeyValue {
122 let intent = if if_absent {
123 KvInsertIntent::InsertIfAbsent
124 } else {
125 KvInsertIntent::Insert
126 };
127 return build_kv_insert_plan(
128 table_name,
129 &columns,
130 rows_ast,
131 intent,
132 Vec::new(),
133 info.primary_key.as_deref(),
134 );
135 }
136
137 if info.primary == nodedb_types::PrimaryEngine::Vector
139 && let Some(ref vpc) = info.vector_primary
140 {
141 let rows_parsed = convert_value_rows(&columns, rows_ast)?;
142 return build_vector_primary_insert_plan(&table_name, vpc, &columns, rows_parsed);
143 }
144
145 let rows = convert_value_rows(&columns, rows_ast)?;
147 let column_defaults: Vec<(String, String)> = info
148 .columns
149 .iter()
150 .filter_map(|c| c.default.as_ref().map(|d| (c.name.clone(), d.clone())))
151 .collect();
152 let column_schema: Vec<(String, String)> = info
153 .columns
154 .iter()
155 .filter_map(|c| c.raw_type.as_ref().map(|t| (c.name.clone(), t.clone())))
156 .collect();
157 let rules = engine_rules::resolve_engine_rules(info.engine);
158 rules.plan_insert(InsertParams {
159 collection: table_name,
160 columns,
161 rows,
162 column_defaults,
163 if_absent,
164 column_schema,
165 })
166}
167
168pub fn plan_upsert(ins: &ast::Insert, catalog: &dyn SqlCatalog) -> Result<Vec<SqlPlan>> {
172 let table_name = match &ins.table {
173 ast::TableObject::TableName(name) => normalize_object_name_checked(name)?,
174 ast::TableObject::TableFunction(_) => {
175 return Err(SqlError::Unsupported {
176 detail: "UPSERT INTO table function not supported".into(),
177 });
178 }
179 };
180 let info = catalog
181 .get_collection(DatabaseId::DEFAULT, &table_name)?
182 .ok_or_else(|| SqlError::UnknownTable {
183 name: table_name.clone(),
184 })?;
185
186 let columns: Vec<String> = ins.columns.iter().map(normalize_ident).collect();
187
188 let source = ins.source.as_ref().ok_or_else(|| SqlError::Parse {
189 detail: "UPSERT requires VALUES".into(),
190 })?;
191
192 let rows_ast = match &*source.body {
193 ast::SetExpr::Values(values) => &values.rows,
194 _ => {
195 return Err(SqlError::Unsupported {
196 detail: "UPSERT source must be VALUES".into(),
197 });
198 }
199 };
200
201 if info.engine == EngineType::KeyValue {
203 return build_kv_insert_plan(
204 table_name,
205 &columns,
206 rows_ast,
207 KvInsertIntent::Put,
208 Vec::new(),
209 info.primary_key.as_deref(),
210 );
211 }
212
213 let rows = convert_value_rows(&columns, rows_ast)?;
214 let column_defaults: Vec<(String, String)> = info
215 .columns
216 .iter()
217 .filter_map(|c| c.default.as_ref().map(|d| (c.name.clone(), d.clone())))
218 .collect();
219 let column_schema: Vec<(String, String)> = info
220 .columns
221 .iter()
222 .filter_map(|c| c.raw_type.as_ref().map(|t| (c.name.clone(), t.clone())))
223 .collect();
224 let rules = engine_rules::resolve_engine_rules(info.engine);
225 rules.plan_upsert(engine_rules::UpsertParams {
226 collection: table_name,
227 columns,
228 rows,
229 column_defaults,
230 on_conflict_updates: Vec::new(),
231 column_schema,
232 })
233}
234
235fn plan_upsert_with_on_conflict(
237 ins: &ast::Insert,
238 catalog: &dyn SqlCatalog,
239 on_conflict_updates: Vec<(String, SqlExpr)>,
240) -> Result<Vec<SqlPlan>> {
241 let table_name = match &ins.table {
242 ast::TableObject::TableName(name) => normalize_object_name_checked(name)?,
243 ast::TableObject::TableFunction(_) => {
244 return Err(SqlError::Unsupported {
245 detail: "INSERT ... ON CONFLICT on a table function is not supported".into(),
246 });
247 }
248 };
249 let info = catalog
250 .get_collection(DatabaseId::DEFAULT, &table_name)?
251 .ok_or_else(|| SqlError::UnknownTable {
252 name: table_name.clone(),
253 })?;
254
255 let columns: Vec<String> = ins.columns.iter().map(normalize_ident).collect();
256
257 let source = ins.source.as_ref().ok_or_else(|| SqlError::Parse {
258 detail: "INSERT ... ON CONFLICT requires VALUES".into(),
259 })?;
260 let rows_ast = match &*source.body {
261 ast::SetExpr::Values(values) => &values.rows,
262 _ => {
263 return Err(SqlError::Unsupported {
264 detail: "INSERT ... ON CONFLICT source must be VALUES".into(),
265 });
266 }
267 };
268
269 if info.engine == EngineType::KeyValue {
274 return build_kv_insert_plan(
275 table_name,
276 &columns,
277 rows_ast,
278 KvInsertIntent::Put,
279 on_conflict_updates,
280 info.primary_key.as_deref(),
281 );
282 }
283
284 let rows = convert_value_rows(&columns, rows_ast)?;
285 let column_defaults: Vec<(String, String)> = info
286 .columns
287 .iter()
288 .filter_map(|c| c.default.as_ref().map(|d| (c.name.clone(), d.clone())))
289 .collect();
290 let column_schema: Vec<(String, String)> = info
291 .columns
292 .iter()
293 .filter_map(|c| c.raw_type.as_ref().map(|t| (c.name.clone(), t.clone())))
294 .collect();
295 let rules = engine_rules::resolve_engine_rules(info.engine);
296 rules.plan_upsert(engine_rules::UpsertParams {
297 collection: table_name,
298 columns,
299 rows,
300 column_defaults,
301 on_conflict_updates,
302 column_schema,
303 })
304}