1use nodedb_types::DatabaseId;
6use sqlparser::ast::{self};
7
8use super::dml_helpers::{
9 build_kv_insert_plan, build_vector_primary_insert_plan, convert_value_rows,
10};
11use crate::engine_rules::{self, InsertParams};
12use crate::error::{Result, SqlError};
13use crate::parser::normalize::{normalize_ident, normalize_object_name_checked};
14use crate::resolver::expr::convert_expr;
15use crate::types::*;
16
17pub use dml_update_delete::{plan_delete, plan_truncate_stmt, plan_update};
18
19#[path = "dml_update_delete.rs"]
20mod dml_update_delete;
21
22enum OnConflict {
24 None,
26 DoNothing,
28 DoUpdate(Vec<(String, SqlExpr)>),
31}
32
33fn classify_on_conflict(ins: &ast::Insert) -> Result<OnConflict> {
34 let Some(on) = ins.on.as_ref() else {
35 return Ok(OnConflict::None);
36 };
37 let ast::OnInsert::OnConflict(oc) = on else {
38 return Ok(OnConflict::None);
39 };
40 match &oc.action {
41 ast::OnConflictAction::DoNothing => Ok(OnConflict::DoNothing),
42 ast::OnConflictAction::DoUpdate(do_update) => {
43 let mut pairs = Vec::with_capacity(do_update.assignments.len());
44 for a in &do_update.assignments {
45 let name = match &a.target {
46 ast::AssignmentTarget::ColumnName(obj) => normalize_object_name_checked(obj)?,
47 _ => {
48 return Err(SqlError::Unsupported {
49 detail: "ON CONFLICT DO UPDATE SET target must be a column name".into(),
50 });
51 }
52 };
53 let expr = convert_expr(&a.value)?;
54 pairs.push((name, expr));
55 }
56 Ok(OnConflict::DoUpdate(pairs))
57 }
58 }
59}
60
61pub fn plan_insert(ins: &ast::Insert, catalog: &dyn SqlCatalog) -> Result<Vec<SqlPlan>> {
63 let if_absent = match classify_on_conflict(ins)? {
67 OnConflict::None => false,
68 OnConflict::DoNothing => true,
69 OnConflict::DoUpdate(updates) => {
70 return plan_upsert_with_on_conflict(ins, catalog, updates);
71 }
72 };
73 let table_name = match &ins.table {
74 ast::TableObject::TableName(name) => normalize_object_name_checked(name)?,
75 ast::TableObject::TableFunction(_) => {
76 return Err(SqlError::Unsupported {
77 detail: "INSERT INTO table function not supported".into(),
78 });
79 }
80 };
81 let info = catalog
82 .get_collection(DatabaseId::DEFAULT, &table_name)?
83 .ok_or_else(|| SqlError::UnknownTable {
84 name: table_name.clone(),
85 })?;
86
87 let columns: Vec<String> = ins.columns.iter().map(normalize_ident).collect();
88
89 if let Some(source) = &ins.source
91 && let ast::SetExpr::Select(_select) = &*source.body
92 {
93 let source_plan = super::select::plan_query(
94 source,
95 catalog,
96 &crate::functions::registry::FunctionRegistry::new(),
97 crate::TemporalScope::default(),
98 )?;
99 return Ok(vec![SqlPlan::InsertSelect {
100 target: table_name,
101 source: Box::new(source_plan),
102 limit: 0,
103 }]);
104 }
105
106 let source = ins.source.as_ref().ok_or_else(|| SqlError::Parse {
108 detail: "INSERT requires VALUES or SELECT".into(),
109 })?;
110
111 let rows_ast = match &*source.body {
112 ast::SetExpr::Values(values) => &values.rows,
113 _ => {
114 return Err(SqlError::Unsupported {
115 detail: "INSERT source must be VALUES or SELECT".into(),
116 });
117 }
118 };
119
120 if info.engine == EngineType::KeyValue {
122 let intent = if if_absent {
123 KvInsertIntent::InsertIfAbsent
124 } else {
125 KvInsertIntent::Insert
126 };
127 return build_kv_insert_plan(
128 table_name,
129 &columns,
130 rows_ast,
131 intent,
132 Vec::new(),
133 info.primary_key.as_deref(),
134 );
135 }
136
137 if info.primary == nodedb_types::PrimaryEngine::Vector
139 && let Some(ref vpc) = info.vector_primary
140 {
141 let rows_parsed = convert_value_rows(&columns, rows_ast)?;
142 return build_vector_primary_insert_plan(&table_name, vpc, &columns, rows_parsed);
143 }
144
145 let rows = convert_value_rows(&columns, rows_ast)?;
147 let column_defaults: Vec<(String, String)> = info
148 .columns
149 .iter()
150 .filter_map(|c| c.default.as_ref().map(|d| (c.name.clone(), d.clone())))
151 .collect();
152 let rules = engine_rules::resolve_engine_rules(info.engine);
153 rules.plan_insert(InsertParams {
154 collection: table_name,
155 columns,
156 rows,
157 column_defaults,
158 if_absent,
159 })
160}
161
162pub fn plan_upsert(ins: &ast::Insert, catalog: &dyn SqlCatalog) -> Result<Vec<SqlPlan>> {
166 let table_name = match &ins.table {
167 ast::TableObject::TableName(name) => normalize_object_name_checked(name)?,
168 ast::TableObject::TableFunction(_) => {
169 return Err(SqlError::Unsupported {
170 detail: "UPSERT INTO table function not supported".into(),
171 });
172 }
173 };
174 let info = catalog
175 .get_collection(DatabaseId::DEFAULT, &table_name)?
176 .ok_or_else(|| SqlError::UnknownTable {
177 name: table_name.clone(),
178 })?;
179
180 let columns: Vec<String> = ins.columns.iter().map(normalize_ident).collect();
181
182 let source = ins.source.as_ref().ok_or_else(|| SqlError::Parse {
183 detail: "UPSERT requires VALUES".into(),
184 })?;
185
186 let rows_ast = match &*source.body {
187 ast::SetExpr::Values(values) => &values.rows,
188 _ => {
189 return Err(SqlError::Unsupported {
190 detail: "UPSERT source must be VALUES".into(),
191 });
192 }
193 };
194
195 if info.engine == EngineType::KeyValue {
197 return build_kv_insert_plan(
198 table_name,
199 &columns,
200 rows_ast,
201 KvInsertIntent::Put,
202 Vec::new(),
203 info.primary_key.as_deref(),
204 );
205 }
206
207 let rows = convert_value_rows(&columns, rows_ast)?;
208 let column_defaults: Vec<(String, String)> = info
209 .columns
210 .iter()
211 .filter_map(|c| c.default.as_ref().map(|d| (c.name.clone(), d.clone())))
212 .collect();
213 let rules = engine_rules::resolve_engine_rules(info.engine);
214 rules.plan_upsert(engine_rules::UpsertParams {
215 collection: table_name,
216 columns,
217 rows,
218 column_defaults,
219 on_conflict_updates: Vec::new(),
220 })
221}
222
223fn plan_upsert_with_on_conflict(
225 ins: &ast::Insert,
226 catalog: &dyn SqlCatalog,
227 on_conflict_updates: Vec<(String, SqlExpr)>,
228) -> Result<Vec<SqlPlan>> {
229 let table_name = match &ins.table {
230 ast::TableObject::TableName(name) => normalize_object_name_checked(name)?,
231 ast::TableObject::TableFunction(_) => {
232 return Err(SqlError::Unsupported {
233 detail: "INSERT ... ON CONFLICT on a table function is not supported".into(),
234 });
235 }
236 };
237 let info = catalog
238 .get_collection(DatabaseId::DEFAULT, &table_name)?
239 .ok_or_else(|| SqlError::UnknownTable {
240 name: table_name.clone(),
241 })?;
242
243 let columns: Vec<String> = ins.columns.iter().map(normalize_ident).collect();
244
245 let source = ins.source.as_ref().ok_or_else(|| SqlError::Parse {
246 detail: "INSERT ... ON CONFLICT requires VALUES".into(),
247 })?;
248 let rows_ast = match &*source.body {
249 ast::SetExpr::Values(values) => &values.rows,
250 _ => {
251 return Err(SqlError::Unsupported {
252 detail: "INSERT ... ON CONFLICT source must be VALUES".into(),
253 });
254 }
255 };
256
257 if info.engine == EngineType::KeyValue {
262 return build_kv_insert_plan(
263 table_name,
264 &columns,
265 rows_ast,
266 KvInsertIntent::Put,
267 on_conflict_updates,
268 info.primary_key.as_deref(),
269 );
270 }
271
272 let rows = convert_value_rows(&columns, rows_ast)?;
273 let column_defaults: Vec<(String, String)> = info
274 .columns
275 .iter()
276 .filter_map(|c| c.default.as_ref().map(|d| (c.name.clone(), d.clone())))
277 .collect();
278 let rules = engine_rules::resolve_engine_rules(info.engine);
279 rules.plan_upsert(engine_rules::UpsertParams {
280 collection: table_name,
281 columns,
282 rows,
283 column_defaults,
284 on_conflict_updates,
285 })
286}