1use sqlparser::ast::{self};
4
5use super::dml_helpers::{
6 convert_value_rows, expr_to_sql_value, extract_point_keys,
7 extract_table_name_from_table_with_joins,
8};
9use crate::engine_rules::{self, DeleteParams, InsertParams, UpdateParams};
10use crate::error::{Result, SqlError};
11use crate::parser::normalize::{normalize_ident, normalize_object_name};
12use crate::resolver::expr::convert_expr;
13use crate::types::*;
14
15enum OnConflict {
17 None,
19 DoNothing,
21 DoUpdate(Vec<(String, SqlExpr)>),
24}
25
26fn classify_on_conflict(ins: &ast::Insert) -> Result<OnConflict> {
27 let Some(on) = ins.on.as_ref() else {
28 return Ok(OnConflict::None);
29 };
30 let ast::OnInsert::OnConflict(oc) = on else {
31 return Ok(OnConflict::None);
32 };
33 match &oc.action {
34 ast::OnConflictAction::DoNothing => Ok(OnConflict::DoNothing),
35 ast::OnConflictAction::DoUpdate(do_update) => {
36 let mut pairs = Vec::with_capacity(do_update.assignments.len());
37 for a in &do_update.assignments {
38 let name = match &a.target {
39 ast::AssignmentTarget::ColumnName(obj) => normalize_object_name(obj),
40 _ => {
41 return Err(SqlError::Unsupported {
42 detail: "ON CONFLICT DO UPDATE SET target must be a column name".into(),
43 });
44 }
45 };
46 let expr = convert_expr(&a.value)?;
47 pairs.push((name, expr));
48 }
49 Ok(OnConflict::DoUpdate(pairs))
50 }
51 }
52}
53
54pub fn plan_insert(ins: &ast::Insert, catalog: &dyn SqlCatalog) -> Result<Vec<SqlPlan>> {
56 let if_absent = match classify_on_conflict(ins)? {
60 OnConflict::None => false,
61 OnConflict::DoNothing => true,
62 OnConflict::DoUpdate(updates) => {
63 return plan_upsert_with_on_conflict(ins, catalog, updates);
64 }
65 };
66 let table_name = match &ins.table {
67 ast::TableObject::TableName(name) => normalize_object_name(name),
68 ast::TableObject::TableFunction(_) => {
69 return Err(SqlError::Unsupported {
70 detail: "INSERT INTO table function not supported".into(),
71 });
72 }
73 };
74 let info = catalog
75 .get_collection(&table_name)?
76 .ok_or_else(|| SqlError::UnknownTable {
77 name: table_name.clone(),
78 })?;
79
80 let columns: Vec<String> = ins.columns.iter().map(normalize_ident).collect();
81
82 if let Some(source) = &ins.source
84 && let ast::SetExpr::Select(_select) = &*source.body
85 {
86 let source_plan = super::select::plan_query(
87 source,
88 catalog,
89 &crate::functions::registry::FunctionRegistry::new(),
90 )?;
91 return Ok(vec![SqlPlan::InsertSelect {
92 target: table_name,
93 source: Box::new(source_plan),
94 limit: 0,
95 }]);
96 }
97
98 let source = ins.source.as_ref().ok_or_else(|| SqlError::Parse {
100 detail: "INSERT requires VALUES or SELECT".into(),
101 })?;
102
103 let rows_ast = match &*source.body {
104 ast::SetExpr::Values(values) => &values.rows,
105 _ => {
106 return Err(SqlError::Unsupported {
107 detail: "INSERT source must be VALUES or SELECT".into(),
108 });
109 }
110 };
111
112 if info.engine == EngineType::KeyValue {
114 let intent = if if_absent {
115 KvInsertIntent::InsertIfAbsent
116 } else {
117 KvInsertIntent::Insert
118 };
119 return build_kv_insert_plan(table_name, &columns, rows_ast, intent, Vec::new());
120 }
121
122 let rows = convert_value_rows(&columns, rows_ast)?;
124 let column_defaults: Vec<(String, String)> = info
125 .columns
126 .iter()
127 .filter_map(|c| c.default.as_ref().map(|d| (c.name.clone(), d.clone())))
128 .collect();
129 let rules = engine_rules::resolve_engine_rules(info.engine);
130 rules.plan_insert(InsertParams {
131 collection: table_name,
132 columns,
133 rows,
134 column_defaults,
135 if_absent,
136 })
137}
138
139pub fn plan_upsert(ins: &ast::Insert, catalog: &dyn SqlCatalog) -> Result<Vec<SqlPlan>> {
143 let table_name = match &ins.table {
144 ast::TableObject::TableName(name) => normalize_object_name(name),
145 ast::TableObject::TableFunction(_) => {
146 return Err(SqlError::Unsupported {
147 detail: "UPSERT INTO table function not supported".into(),
148 });
149 }
150 };
151 let info = catalog
152 .get_collection(&table_name)?
153 .ok_or_else(|| SqlError::UnknownTable {
154 name: table_name.clone(),
155 })?;
156
157 let columns: Vec<String> = ins.columns.iter().map(normalize_ident).collect();
158
159 let source = ins.source.as_ref().ok_or_else(|| SqlError::Parse {
160 detail: "UPSERT requires VALUES".into(),
161 })?;
162
163 let rows_ast = match &*source.body {
164 ast::SetExpr::Values(values) => &values.rows,
165 _ => {
166 return Err(SqlError::Unsupported {
167 detail: "UPSERT source must be VALUES".into(),
168 });
169 }
170 };
171
172 if info.engine == EngineType::KeyValue {
174 return build_kv_insert_plan(
175 table_name,
176 &columns,
177 rows_ast,
178 KvInsertIntent::Put,
179 Vec::new(),
180 );
181 }
182
183 let rows = convert_value_rows(&columns, rows_ast)?;
184 let column_defaults: Vec<(String, String)> = info
185 .columns
186 .iter()
187 .filter_map(|c| c.default.as_ref().map(|d| (c.name.clone(), d.clone())))
188 .collect();
189 let rules = engine_rules::resolve_engine_rules(info.engine);
190 rules.plan_upsert(engine_rules::UpsertParams {
191 collection: table_name,
192 columns,
193 rows,
194 column_defaults,
195 on_conflict_updates: Vec::new(),
196 })
197}
198
199fn plan_upsert_with_on_conflict(
204 ins: &ast::Insert,
205 catalog: &dyn SqlCatalog,
206 on_conflict_updates: Vec<(String, SqlExpr)>,
207) -> Result<Vec<SqlPlan>> {
208 let table_name = match &ins.table {
209 ast::TableObject::TableName(name) => normalize_object_name(name),
210 ast::TableObject::TableFunction(_) => {
211 return Err(SqlError::Unsupported {
212 detail: "INSERT ... ON CONFLICT on a table function is not supported".into(),
213 });
214 }
215 };
216 let info = catalog
217 .get_collection(&table_name)?
218 .ok_or_else(|| SqlError::UnknownTable {
219 name: table_name.clone(),
220 })?;
221
222 let columns: Vec<String> = ins.columns.iter().map(normalize_ident).collect();
223
224 let source = ins.source.as_ref().ok_or_else(|| SqlError::Parse {
225 detail: "INSERT ... ON CONFLICT requires VALUES".into(),
226 })?;
227 let rows_ast = match &*source.body {
228 ast::SetExpr::Values(values) => &values.rows,
229 _ => {
230 return Err(SqlError::Unsupported {
231 detail: "INSERT ... ON CONFLICT source must be VALUES".into(),
232 });
233 }
234 };
235
236 if info.engine == EngineType::KeyValue {
241 return build_kv_insert_plan(
242 table_name,
243 &columns,
244 rows_ast,
245 KvInsertIntent::Put,
246 on_conflict_updates,
247 );
248 }
249
250 let rows = convert_value_rows(&columns, rows_ast)?;
251 let column_defaults: Vec<(String, String)> = info
252 .columns
253 .iter()
254 .filter_map(|c| c.default.as_ref().map(|d| (c.name.clone(), d.clone())))
255 .collect();
256 let rules = engine_rules::resolve_engine_rules(info.engine);
257 rules.plan_upsert(engine_rules::UpsertParams {
258 collection: table_name,
259 columns,
260 rows,
261 column_defaults,
262 on_conflict_updates,
263 })
264}
265
266fn build_kv_insert_plan(
271 table_name: String,
272 columns: &[String],
273 rows_ast: &[Vec<ast::Expr>],
274 intent: KvInsertIntent,
275 on_conflict_updates: Vec<(String, SqlExpr)>,
276) -> Result<Vec<SqlPlan>> {
277 let key_idx = columns.iter().position(|c| c == "key");
278 let ttl_idx = columns.iter().position(|c| c == "ttl");
279 let mut entries = Vec::with_capacity(rows_ast.len());
280 let mut ttl_secs: u64 = 0;
281 for row_exprs in rows_ast {
282 let key_val = match key_idx {
283 Some(idx) => expr_to_sql_value(&row_exprs[idx])?,
284 None => SqlValue::String(String::new()),
285 };
286 if let Some(idx) = ttl_idx {
287 match expr_to_sql_value(&row_exprs[idx]) {
288 Ok(SqlValue::Int(n)) => ttl_secs = n.max(0) as u64,
289 Ok(SqlValue::Float(f)) => ttl_secs = f.max(0.0) as u64,
290 _ => {}
291 }
292 }
293 let value_cols: Vec<(String, SqlValue)> = columns
294 .iter()
295 .enumerate()
296 .filter(|(i, _)| Some(*i) != key_idx && Some(*i) != ttl_idx)
297 .map(|(i, col)| {
298 let val = expr_to_sql_value(&row_exprs[i])?;
299 Ok((col.clone(), val))
300 })
301 .collect::<Result<Vec<_>>>()?;
302 entries.push((key_val, value_cols));
303 }
304 Ok(vec![SqlPlan::KvInsert {
305 collection: table_name,
306 entries,
307 ttl_secs,
308 intent,
309 on_conflict_updates,
310 }])
311}
312
313pub fn plan_update(stmt: &ast::Statement, catalog: &dyn SqlCatalog) -> Result<Vec<SqlPlan>> {
315 let ast::Statement::Update(update) = stmt else {
316 return Err(SqlError::Parse {
317 detail: "expected UPDATE statement".into(),
318 });
319 };
320
321 let table_name = extract_table_name_from_table_with_joins(&update.table)?;
322 let info = catalog
323 .get_collection(&table_name)?
324 .ok_or_else(|| SqlError::UnknownTable {
325 name: table_name.clone(),
326 })?;
327
328 let assigns: Vec<(String, SqlExpr)> = update
329 .assignments
330 .iter()
331 .map(|a| {
332 let col = match &a.target {
333 ast::AssignmentTarget::ColumnName(name) => normalize_object_name(name),
334 ast::AssignmentTarget::Tuple(names) => names
335 .iter()
336 .map(normalize_object_name)
337 .collect::<Vec<_>>()
338 .join(","),
339 };
340 let val = convert_expr(&a.value)?;
341 Ok((col, val))
342 })
343 .collect::<Result<_>>()?;
344
345 let filters = match &update.selection {
346 Some(expr) => super::select::convert_where_to_filters(expr)?,
347 None => Vec::new(),
348 };
349
350 let target_keys = extract_point_keys(update.selection.as_ref(), &info);
352
353 let rules = engine_rules::resolve_engine_rules(info.engine);
354 rules.plan_update(UpdateParams {
355 collection: table_name,
356 assignments: assigns,
357 filters,
358 target_keys,
359 returning: update.returning.is_some(),
360 })
361}
362
363pub fn plan_delete(stmt: &ast::Statement, catalog: &dyn SqlCatalog) -> Result<Vec<SqlPlan>> {
365 let ast::Statement::Delete(delete) = stmt else {
366 return Err(SqlError::Parse {
367 detail: "expected DELETE statement".into(),
368 });
369 };
370
371 let from_tables = match &delete.from {
372 ast::FromTable::WithFromKeyword(tables) | ast::FromTable::WithoutKeyword(tables) => tables,
373 };
374 let table_name =
375 extract_table_name_from_table_with_joins(from_tables.first().ok_or_else(|| {
376 SqlError::Parse {
377 detail: "DELETE requires a FROM table".into(),
378 }
379 })?)?;
380 let info = catalog
381 .get_collection(&table_name)?
382 .ok_or_else(|| SqlError::UnknownTable {
383 name: table_name.clone(),
384 })?;
385
386 let filters = match &delete.selection {
387 Some(expr) => super::select::convert_where_to_filters(expr)?,
388 None => Vec::new(),
389 };
390
391 let target_keys = extract_point_keys(delete.selection.as_ref(), &info);
392
393 let rules = engine_rules::resolve_engine_rules(info.engine);
394 rules.plan_delete(DeleteParams {
395 collection: table_name,
396 filters,
397 target_keys,
398 })
399}
400
401pub fn plan_truncate_stmt(stmt: &ast::Statement) -> Result<Vec<SqlPlan>> {
403 let ast::Statement::Truncate(truncate) = stmt else {
404 return Err(SqlError::Parse {
405 detail: "expected TRUNCATE statement".into(),
406 });
407 };
408 let restart_identity = matches!(
409 truncate.identity,
410 Some(sqlparser::ast::TruncateIdentityOption::Restart)
411 );
412 truncate
413 .table_names
414 .iter()
415 .map(|t| {
416 Ok(SqlPlan::Truncate {
417 collection: normalize_object_name(&t.name),
418 restart_identity,
419 })
420 })
421 .collect()
422}
423
424