Skip to main content

nodedb_sql/planner/
merge.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! MERGE statement planning.
4//!
5//! Translates `sqlparser::ast::Statement::Merge` into `SqlPlan::Merge`.
6//! Supported engines: `document_schemaless`, `document_strict`.
7//! All other engines return `SqlError::Unsupported`.
8
9use nodedb_types::DatabaseId;
10use sqlparser::ast::{self, MergeAction, MergeClauseKind as AstMergeClauseKind, MergeInsertKind};
11
12use super::ast_helpers::{qualified_ident_pair, strip_and_convert_filters};
13use crate::engine_rules::{self, MergeParams, ScanParams};
14use crate::error::{Result, SqlError};
15use crate::parser::normalize::{normalize_ident, normalize_object_name_checked};
16use crate::resolver::expr::convert_expr;
17use crate::temporal::TemporalScope;
18use crate::types::*;
19use crate::types::{MergeClauseKind, MergePlanAction, MergePlanClause, SqlPlan};
20
21/// Plan a `MERGE INTO target USING source ON ... WHEN ... THEN ...` statement.
22pub fn plan_merge(stmt: &ast::Statement, catalog: &dyn SqlCatalog) -> Result<Vec<SqlPlan>> {
23    let ast::Statement::Merge(merge) = stmt else {
24        return Err(SqlError::Parse {
25            detail: "expected MERGE statement".into(),
26        });
27    };
28
29    if merge.clauses.is_empty() {
30        return Err(SqlError::Parse {
31            detail: "MERGE statement requires at least one WHEN arm".into(),
32        });
33    }
34
35    // ── Resolve target ──
36    let (target_name, target_alias) = extract_table_factor_name_alias(&merge.table)?;
37    // The ON clause uses the alias (or table name if no alias) as the qualifier.
38    let target_ref = target_alias.as_deref().unwrap_or(target_name.as_str());
39    let target_info = catalog
40        .get_collection(DatabaseId::DEFAULT, &target_name)?
41        .ok_or_else(|| SqlError::UnknownTable {
42            name: target_name.clone(),
43        })?;
44
45    // ── Resolve source ──
46    let source_plan = plan_merge_source(&merge.source, catalog)?;
47    let source_alias = merge_source_alias(&merge.source, &source_plan);
48
49    // ── Parse ON clause into equi-join columns ──
50    let (target_join_col, source_join_col) =
51        extract_merge_equijoin(&merge.on, target_ref, &source_alias)?;
52
53    // ── Convert WHEN clauses ──
54    let clauses = convert_merge_clauses(&merge.clauses, target_ref, &source_alias)?;
55
56    // ── Dispatch to engine rules ──
57    let rules = engine_rules::resolve_engine_rules(target_info.engine);
58    rules.plan_merge(MergeParams {
59        collection: target_name,
60        source: Box::new(source_plan),
61        target_join_col,
62        source_join_col,
63        source_alias,
64        clauses,
65        returning: false,
66    })
67}
68
69// ── Source planning ────────────────────────────────────────────────────────
70
71/// Plan the USING <source> clause.
72///
73/// Supports:
74/// - Table name: `USING src_table ON ...`
75/// - Derived subquery: `USING (SELECT ...) AS alias ON ...`
76/// - VALUES constructor: treated as a subquery alias.
77fn plan_merge_source(factor: &ast::TableFactor, catalog: &dyn SqlCatalog) -> Result<SqlPlan> {
78    match factor {
79        ast::TableFactor::Table { name, alias, .. } => {
80            let source_name = normalize_object_name_checked(name)?;
81            let source_info = catalog
82                .get_collection(DatabaseId::DEFAULT, &source_name)?
83                .ok_or_else(|| SqlError::UnknownTable {
84                    name: source_name.clone(),
85                })?;
86            let alias_str = alias.as_ref().map(|a| normalize_ident(&a.name));
87            let source_rules = engine_rules::resolve_engine_rules(source_info.engine);
88            source_rules.plan_scan(ScanParams {
89                collection: source_name,
90                alias: alias_str,
91                filters: Vec::new(),
92                projection: Vec::new(),
93                sort_keys: Vec::new(),
94                limit: None,
95                offset: 0,
96                distinct: false,
97                window_functions: Vec::new(),
98                indexes: Vec::new(),
99                temporal: TemporalScope::default(),
100                bitemporal: source_info.bitemporal,
101            })
102        }
103        ast::TableFactor::Derived {
104            lateral: _,
105            subquery,
106            alias,
107            sample: _,
108        } => {
109            use crate::functions::registry::FunctionRegistry;
110            let alias_name = alias
111                .as_ref()
112                .map(|a| normalize_ident(&a.name))
113                .unwrap_or_else(|| "source".to_string());
114            let functions = FunctionRegistry::new();
115            let plan = crate::planner::select::plan_query(
116                subquery,
117                catalog,
118                &functions,
119                TemporalScope::default(),
120            )?;
121            // Wrap in an alias scan-like node; for Merge we pass the sub-plan
122            // directly. The alias is tracked separately via `source_alias`.
123            let _ = alias_name;
124            Ok(plan)
125        }
126        other => Err(SqlError::Unsupported {
127            detail: format!(
128                "MERGE USING source type not supported: {other}; \
129                 use a table name or a subquery"
130            ),
131        }),
132    }
133}
134
135/// Determine the alias used to qualify source-column references in WHEN arms.
136fn merge_source_alias(factor: &ast::TableFactor, source_plan: &SqlPlan) -> String {
137    match factor {
138        ast::TableFactor::Table { name, alias, .. } => alias
139            .as_ref()
140            .map(|a| normalize_ident(&a.name))
141            .unwrap_or_else(|| {
142                normalize_object_name_checked(name).unwrap_or_else(|_| "source".to_string())
143            }),
144        ast::TableFactor::Derived { alias, .. } => alias
145            .as_ref()
146            .map(|a| normalize_ident(&a.name))
147            .unwrap_or_else(|| "source".to_string()),
148        _ => match source_plan {
149            SqlPlan::Scan {
150                collection: _,
151                alias: Some(a),
152                ..
153            } => a.clone(),
154            SqlPlan::Scan { collection, .. } => collection.clone(),
155            _ => "source".to_string(),
156        },
157    }
158}
159
160// ── ON clause parsing ──────────────────────────────────────────────────────
161
162/// Extract a single equi-join predicate of the form `target.col = source.col`
163/// from the MERGE ON expression.  Returns `(target_col, source_col)`.
164fn extract_merge_equijoin(
165    on: &ast::Expr,
166    target_ref: &str,
167    source_ref: &str,
168) -> Result<(String, String)> {
169    if let ast::Expr::BinaryOp {
170        left,
171        op: ast::BinaryOperator::Eq,
172        right,
173    } = on
174    {
175        let lhs = qualified_ident_pair(left);
176        let rhs = qualified_ident_pair(right);
177        match (lhs, rhs) {
178            (Some((lt, lc)), Some((rt, rc))) => {
179                if lt == target_ref && rt == source_ref {
180                    return Ok((lc, rc));
181                }
182                if lt == source_ref && rt == target_ref {
183                    return Ok((rc, lc));
184                }
185            }
186            // Unqualified bare-column references: assume target.col = source.col
187            // pattern when one side is unqualified.
188            (Some((t, c)), None) if t == source_ref => {
189                if let ast::Expr::Identifier(ident) = right.as_ref() {
190                    return Ok((normalize_ident(ident), c));
191                }
192            }
193            (None, Some((t, c))) if t == source_ref => {
194                if let ast::Expr::Identifier(ident) = left.as_ref() {
195                    return Ok((normalize_ident(ident), c));
196                }
197            }
198            _ => {}
199        }
200    }
201    Err(SqlError::Unsupported {
202        detail: format!(
203            "MERGE ON clause must be a single equi-join predicate of the form \
204             `{target_ref}.col = {source_ref}.col`; complex ON expressions are not \
205             yet supported"
206        ),
207    })
208}
209
210// ── WHEN clause conversion ─────────────────────────────────────────────────
211
212fn convert_merge_clauses(
213    clauses: &[ast::MergeClause],
214    target_ref: &str,
215    source_ref: &str,
216) -> Result<Vec<MergePlanClause>> {
217    clauses
218        .iter()
219        .map(|c| convert_one_clause(c, target_ref, source_ref))
220        .collect()
221}
222
223fn convert_one_clause(
224    clause: &ast::MergeClause,
225    target_ref: &str,
226    source_ref: &str,
227) -> Result<MergePlanClause> {
228    let kind = match clause.clause_kind {
229        AstMergeClauseKind::Matched => MergeClauseKind::Matched,
230        AstMergeClauseKind::NotMatched | AstMergeClauseKind::NotMatchedByTarget => {
231            MergeClauseKind::NotMatched
232        }
233        AstMergeClauseKind::NotMatchedBySource => MergeClauseKind::NotMatchedBySource,
234    };
235
236    let extra_predicate = match &clause.predicate {
237        Some(expr) => strip_and_convert_filters(vec![expr.clone()], target_ref)?,
238        None => Vec::new(),
239    };
240
241    let action = convert_merge_action(&clause.action, source_ref)?;
242
243    Ok(MergePlanClause {
244        kind,
245        extra_predicate,
246        action,
247    })
248}
249
250fn convert_merge_action(action: &MergeAction, source_ref: &str) -> Result<MergePlanAction> {
251    match action {
252        MergeAction::Update(update_expr) => {
253            let assignments = update_expr
254                .assignments
255                .iter()
256                .map(|a| {
257                    let col = match &a.target {
258                        ast::AssignmentTarget::ColumnName(name) => {
259                            normalize_object_name_checked(name)
260                        }
261                        ast::AssignmentTarget::Tuple(_) => Err(SqlError::Unsupported {
262                            detail: "tuple assignment target in MERGE UPDATE is not supported"
263                                .into(),
264                        }),
265                    }?;
266                    let val = convert_expr(&a.value)?;
267                    Ok((col, val))
268                })
269                .collect::<Result<Vec<_>>>()?;
270            Ok(MergePlanAction::Update { assignments })
271        }
272        MergeAction::Delete { .. } => Ok(MergePlanAction::Delete),
273        MergeAction::Insert(insert_expr) => {
274            let columns: Vec<String> = insert_expr
275                .columns
276                .iter()
277                .map(normalize_object_name_checked)
278                .collect::<Result<Vec<_>>>()?;
279
280            let values: Vec<crate::types_expr::SqlExpr> = match &insert_expr.kind {
281                MergeInsertKind::Values(vals) => {
282                    if vals.rows.len() != 1 {
283                        return Err(SqlError::Unsupported {
284                            detail: format!(
285                                "MERGE INSERT VALUES must have exactly one row; got {}",
286                                vals.rows.len()
287                            ),
288                        });
289                    }
290                    vals.rows[0]
291                        .iter()
292                        .map(convert_expr)
293                        .collect::<Result<Vec<_>>>()?
294                }
295                MergeInsertKind::Row => {
296                    return Err(SqlError::Unsupported {
297                        detail: "MERGE INSERT ROW is not supported; use explicit VALUES".into(),
298                    });
299                }
300            };
301
302            if !columns.is_empty() && columns.len() != values.len() {
303                return Err(SqlError::Parse {
304                    detail: format!(
305                        "MERGE INSERT column list ({}) and VALUES ({}) lengths do not match",
306                        columns.len(),
307                        values.len()
308                    ),
309                });
310            }
311
312            let _ = source_ref; // for future multi-row insert support
313            Ok(MergePlanAction::Insert { columns, values })
314        }
315    }
316}
317
318// ── Helpers ────────────────────────────────────────────────────────────────
319
320pub(super) fn extract_table_factor_name_alias(
321    factor: &ast::TableFactor,
322) -> Result<(String, Option<String>)> {
323    match factor {
324        ast::TableFactor::Table { name, alias, .. } => {
325            let table_name = normalize_object_name_checked(name)?;
326            let alias_str = alias.as_ref().map(|a| normalize_ident(&a.name));
327            Ok((table_name, alias_str))
328        }
329        other => Err(SqlError::Unsupported {
330            detail: format!("MERGE target must be a plain table name, not: {other}"),
331        }),
332    }
333}