1use nodedb_types::DatabaseId;
10use sqlparser::ast::{self, MergeAction, MergeClauseKind as AstMergeClauseKind, MergeInsertKind};
11
12use super::ast_helpers::{qualified_ident_pair, strip_and_convert_filters};
13use crate::engine_rules::{self, MergeParams, ScanParams};
14use crate::error::{Result, SqlError};
15use crate::parser::normalize::{normalize_ident, normalize_object_name_checked};
16use crate::resolver::expr::convert_expr;
17use crate::temporal::TemporalScope;
18use crate::types::*;
19use crate::types::{MergeClauseKind, MergePlanAction, MergePlanClause, SqlPlan};
20
21pub fn plan_merge(stmt: &ast::Statement, catalog: &dyn SqlCatalog) -> Result<Vec<SqlPlan>> {
23 let ast::Statement::Merge(merge) = stmt else {
24 return Err(SqlError::Parse {
25 detail: "expected MERGE statement".into(),
26 });
27 };
28
29 if merge.clauses.is_empty() {
30 return Err(SqlError::Parse {
31 detail: "MERGE statement requires at least one WHEN arm".into(),
32 });
33 }
34
35 let (target_name, target_alias) = extract_table_factor_name_alias(&merge.table)?;
37 let target_ref = target_alias.as_deref().unwrap_or(target_name.as_str());
39 let target_info = catalog
40 .get_collection(DatabaseId::DEFAULT, &target_name)?
41 .ok_or_else(|| SqlError::UnknownTable {
42 name: target_name.clone(),
43 })?;
44
45 let source_plan = plan_merge_source(&merge.source, catalog)?;
47 let source_alias = merge_source_alias(&merge.source, &source_plan);
48
49 let (target_join_col, source_join_col) =
51 extract_merge_equijoin(&merge.on, target_ref, &source_alias)?;
52
53 let clauses = convert_merge_clauses(&merge.clauses, target_ref, &source_alias)?;
55
56 let rules = engine_rules::resolve_engine_rules(target_info.engine);
58 rules.plan_merge(MergeParams {
59 collection: target_name,
60 source: Box::new(source_plan),
61 target_join_col,
62 source_join_col,
63 source_alias,
64 clauses,
65 returning: false,
66 })
67}
68
69fn plan_merge_source(factor: &ast::TableFactor, catalog: &dyn SqlCatalog) -> Result<SqlPlan> {
78 match factor {
79 ast::TableFactor::Table { name, alias, .. } => {
80 let source_name = normalize_object_name_checked(name)?;
81 let source_info = catalog
82 .get_collection(DatabaseId::DEFAULT, &source_name)?
83 .ok_or_else(|| SqlError::UnknownTable {
84 name: source_name.clone(),
85 })?;
86 let alias_str = alias.as_ref().map(|a| normalize_ident(&a.name));
87 let source_rules = engine_rules::resolve_engine_rules(source_info.engine);
88 source_rules.plan_scan(ScanParams {
89 collection: source_name,
90 alias: alias_str,
91 filters: Vec::new(),
92 projection: Vec::new(),
93 sort_keys: Vec::new(),
94 limit: None,
95 offset: 0,
96 distinct: false,
97 window_functions: Vec::new(),
98 indexes: Vec::new(),
99 temporal: TemporalScope::default(),
100 bitemporal: source_info.bitemporal,
101 })
102 }
103 ast::TableFactor::Derived {
104 lateral: _,
105 subquery,
106 alias,
107 sample: _,
108 } => {
109 use crate::functions::registry::FunctionRegistry;
110 let alias_name = alias
111 .as_ref()
112 .map(|a| normalize_ident(&a.name))
113 .unwrap_or_else(|| "source".to_string());
114 let functions = FunctionRegistry::new();
115 let plan = crate::planner::select::plan_query(
116 subquery,
117 catalog,
118 &functions,
119 TemporalScope::default(),
120 )?;
121 let _ = alias_name;
124 Ok(plan)
125 }
126 other => Err(SqlError::Unsupported {
127 detail: format!(
128 "MERGE USING source type not supported: {other}; \
129 use a table name or a subquery"
130 ),
131 }),
132 }
133}
134
135fn merge_source_alias(factor: &ast::TableFactor, source_plan: &SqlPlan) -> String {
137 match factor {
138 ast::TableFactor::Table { name, alias, .. } => alias
139 .as_ref()
140 .map(|a| normalize_ident(&a.name))
141 .unwrap_or_else(|| {
142 normalize_object_name_checked(name).unwrap_or_else(|_| "source".to_string())
143 }),
144 ast::TableFactor::Derived { alias, .. } => alias
145 .as_ref()
146 .map(|a| normalize_ident(&a.name))
147 .unwrap_or_else(|| "source".to_string()),
148 _ => match source_plan {
149 SqlPlan::Scan {
150 collection: _,
151 alias: Some(a),
152 ..
153 } => a.clone(),
154 SqlPlan::Scan { collection, .. } => collection.clone(),
155 _ => "source".to_string(),
156 },
157 }
158}
159
160fn extract_merge_equijoin(
165 on: &ast::Expr,
166 target_ref: &str,
167 source_ref: &str,
168) -> Result<(String, String)> {
169 if let ast::Expr::BinaryOp {
170 left,
171 op: ast::BinaryOperator::Eq,
172 right,
173 } = on
174 {
175 let lhs = qualified_ident_pair(left);
176 let rhs = qualified_ident_pair(right);
177 match (lhs, rhs) {
178 (Some((lt, lc)), Some((rt, rc))) => {
179 if lt == target_ref && rt == source_ref {
180 return Ok((lc, rc));
181 }
182 if lt == source_ref && rt == target_ref {
183 return Ok((rc, lc));
184 }
185 }
186 (Some((t, c)), None) if t == source_ref => {
189 if let ast::Expr::Identifier(ident) = right.as_ref() {
190 return Ok((normalize_ident(ident), c));
191 }
192 }
193 (None, Some((t, c))) if t == source_ref => {
194 if let ast::Expr::Identifier(ident) = left.as_ref() {
195 return Ok((normalize_ident(ident), c));
196 }
197 }
198 _ => {}
199 }
200 }
201 Err(SqlError::Unsupported {
202 detail: format!(
203 "MERGE ON clause must be a single equi-join predicate of the form \
204 `{target_ref}.col = {source_ref}.col`; complex ON expressions are not \
205 yet supported"
206 ),
207 })
208}
209
210fn convert_merge_clauses(
213 clauses: &[ast::MergeClause],
214 target_ref: &str,
215 source_ref: &str,
216) -> Result<Vec<MergePlanClause>> {
217 clauses
218 .iter()
219 .map(|c| convert_one_clause(c, target_ref, source_ref))
220 .collect()
221}
222
223fn convert_one_clause(
224 clause: &ast::MergeClause,
225 target_ref: &str,
226 source_ref: &str,
227) -> Result<MergePlanClause> {
228 let kind = match clause.clause_kind {
229 AstMergeClauseKind::Matched => MergeClauseKind::Matched,
230 AstMergeClauseKind::NotMatched | AstMergeClauseKind::NotMatchedByTarget => {
231 MergeClauseKind::NotMatched
232 }
233 AstMergeClauseKind::NotMatchedBySource => MergeClauseKind::NotMatchedBySource,
234 };
235
236 let extra_predicate = match &clause.predicate {
237 Some(expr) => strip_and_convert_filters(vec![expr.clone()], target_ref)?,
238 None => Vec::new(),
239 };
240
241 let action = convert_merge_action(&clause.action, source_ref)?;
242
243 Ok(MergePlanClause {
244 kind,
245 extra_predicate,
246 action,
247 })
248}
249
250fn convert_merge_action(action: &MergeAction, source_ref: &str) -> Result<MergePlanAction> {
251 match action {
252 MergeAction::Update(update_expr) => {
253 let assignments = update_expr
254 .assignments
255 .iter()
256 .map(|a| {
257 let col = match &a.target {
258 ast::AssignmentTarget::ColumnName(name) => {
259 normalize_object_name_checked(name)
260 }
261 ast::AssignmentTarget::Tuple(_) => Err(SqlError::Unsupported {
262 detail: "tuple assignment target in MERGE UPDATE is not supported"
263 .into(),
264 }),
265 }?;
266 let val = convert_expr(&a.value)?;
267 Ok((col, val))
268 })
269 .collect::<Result<Vec<_>>>()?;
270 Ok(MergePlanAction::Update { assignments })
271 }
272 MergeAction::Delete { .. } => Ok(MergePlanAction::Delete),
273 MergeAction::Insert(insert_expr) => {
274 let columns: Vec<String> = insert_expr
275 .columns
276 .iter()
277 .map(normalize_object_name_checked)
278 .collect::<Result<Vec<_>>>()?;
279
280 let values: Vec<crate::types_expr::SqlExpr> = match &insert_expr.kind {
281 MergeInsertKind::Values(vals) => {
282 if vals.rows.len() != 1 {
283 return Err(SqlError::Unsupported {
284 detail: format!(
285 "MERGE INSERT VALUES must have exactly one row; got {}",
286 vals.rows.len()
287 ),
288 });
289 }
290 vals.rows[0]
291 .iter()
292 .map(convert_expr)
293 .collect::<Result<Vec<_>>>()?
294 }
295 MergeInsertKind::Row => {
296 return Err(SqlError::Unsupported {
297 detail: "MERGE INSERT ROW is not supported; use explicit VALUES".into(),
298 });
299 }
300 };
301
302 if !columns.is_empty() && columns.len() != values.len() {
303 return Err(SqlError::Parse {
304 detail: format!(
305 "MERGE INSERT column list ({}) and VALUES ({}) lengths do not match",
306 columns.len(),
307 values.len()
308 ),
309 });
310 }
311
312 let _ = source_ref; Ok(MergePlanAction::Insert { columns, values })
314 }
315 }
316}
317
318pub(super) fn extract_table_factor_name_alias(
321 factor: &ast::TableFactor,
322) -> Result<(String, Option<String>)> {
323 match factor {
324 ast::TableFactor::Table { name, alias, .. } => {
325 let table_name = normalize_object_name_checked(name)?;
326 let alias_str = alias.as_ref().map(|a| normalize_ident(&a.name));
327 Ok((table_name, alias_str))
328 }
329 other => Err(SqlError::Unsupported {
330 detail: format!("MERGE target must be a plain table name, not: {other}"),
331 }),
332 }
333}