Skip to main content

sparrowdb_execution/engine/
mutation.rs

1//! Auto-generated submodule — see engine/mod.rs for context.
2use super::*;
3
4impl Engine {
5    // ── Mutation execution (called by GraphDb with a write transaction) ────────
6
7    /// Scan nodes matching the MATCH patterns in a `MatchMutate` statement and
8    /// return the list of matching `NodeId`s.  The caller is responsible for
9    /// applying the actual mutations inside a write transaction.
10    pub fn scan_match_mutate(&self, mm: &MatchMutateStatement) -> Result<Vec<NodeId>> {
11        if mm.match_patterns.is_empty() {
12            return Ok(vec![]);
13        }
14
15        // Guard: only single-node patterns (no multi-pattern, no relationship hops)
16        // are supported.  Silently ignoring extra patterns would mutate the wrong
17        // nodes; instead we surface a clear error.
18        if mm.match_patterns.len() != 1 || !mm.match_patterns[0].rels.is_empty() {
19            return Err(sparrowdb_common::Error::InvalidArgument(
20                "MATCH...SET/DELETE currently supports only single-node patterns (no relationships)"
21                    .into(),
22            ));
23        }
24
25        let pat = &mm.match_patterns[0];
26        if pat.nodes.is_empty() {
27            return Ok(vec![]);
28        }
29        let node_pat = &pat.nodes[0];
30        let label = node_pat.labels.first().cloned().unwrap_or_default();
31
32        let label_id = match self.snapshot.catalog.get_label(&label)? {
33            Some(id) => id as u32,
34            // SPA-266: unknown label → no nodes can match; return empty result.
35            None => return Ok(vec![]),
36        };
37
38        // Col_ids referenced by the WHERE clause (needed for WHERE evaluation
39        // even after the index narrows candidates by inline prop filter).
40        let mut where_col_ids: Vec<u32> = node_pat
41            .props
42            .iter()
43            .map(|pe| prop_name_to_col_id(&pe.key))
44            .collect();
45        if let Some(ref where_expr) = mm.where_clause {
46            collect_col_ids_from_expr(where_expr, &mut where_col_ids);
47        }
48
49        let var_name = node_pat.var.as_str();
50
51        // Use the property index for O(1) equality lookups on inline prop
52        // filters, falling back to full scan for overflow strings / params.
53        let candidates = self.scan_nodes_for_label_with_index(label_id, &node_pat.props)?;
54
55        let mut matching_ids = Vec::new();
56        for node_id in candidates {
57            // Re-read props needed for WHERE clause evaluation.
58            if mm.where_clause.is_some() {
59                let props = read_node_props(&self.snapshot.store, node_id, &where_col_ids)?;
60                if let Some(ref where_expr) = mm.where_clause {
61                    let mut row_vals =
62                        build_row_vals(&props, var_name, &where_col_ids, &self.snapshot.store);
63                    row_vals.extend(self.dollar_params());
64                    if !self.eval_where_graph(where_expr, &row_vals) {
65                        continue;
66                    }
67                }
68            }
69            matching_ids.push(node_id);
70        }
71
72        Ok(matching_ids)
73    }
74
75    /// Return the mutation carried by a `MatchMutate` statement, exposing it
76    /// to the caller (GraphDb) so it can apply it inside a write transaction.
77    pub fn mutation_from_match_mutate(mm: &MatchMutateStatement) -> &Mutation {
78        &mm.mutation
79    }
80
81    /// Scan edges matching a MATCH pattern with exactly one hop and return
82    /// `(src, dst, rel_type)` tuples for edge deletion.
83    ///
84    /// Supports `MATCH (a:Label)-[r:REL]->(b:Label) DELETE r` with optional
85    /// inline property filters on source and destination node patterns.
86    ///
87    /// Includes both checkpointed (CSR) and uncheckpointed (delta) edges.
88    pub fn scan_match_mutate_edges(
89        &self,
90        mm: &MatchMutateStatement,
91    ) -> Result<Vec<(NodeId, NodeId, String)>> {
92        if mm.match_patterns.len() != 1 {
93            return Err(sparrowdb_common::Error::InvalidArgument(
94                "MATCH...DELETE edge: only single-path patterns are supported".into(),
95            ));
96        }
97        let pat = &mm.match_patterns[0];
98        if pat.rels.len() != 1 || pat.nodes.len() != 2 {
99            return Err(sparrowdb_common::Error::InvalidArgument(
100                "MATCH...DELETE edge: pattern must have exactly one relationship hop".into(),
101            ));
102        }
103
104        let src_node_pat = &pat.nodes[0];
105        let dst_node_pat = &pat.nodes[1];
106        let rel_pat = &pat.rels[0];
107
108        let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
109        let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
110
111        // Resolve optional label-id constraints.
112        let src_label_id_opt: Option<u32> = if src_label.is_empty() {
113            None
114        } else {
115            match self.snapshot.catalog.get_label(&src_label)? {
116                Some(id) => Some(id as u32),
117                None => return Ok(vec![]), // unknown label → no matches
118            }
119        };
120        let dst_label_id_opt: Option<u32> = if dst_label.is_empty() {
121            None
122        } else {
123            match self.snapshot.catalog.get_label(&dst_label)? {
124                Some(id) => Some(id as u32),
125                None => return Ok(vec![]), // unknown label → no matches
126            }
127        };
128
129        // Filter registered rel tables by rel type and src/dst label.
130        let rel_tables: Vec<(u64, u32, u32, String)> = self
131            .snapshot
132            .catalog
133            .list_rel_tables_with_ids()
134            .into_iter()
135            .filter(|(_, sid, did, rt)| {
136                let type_ok = rel_pat.rel_type.is_empty() || rt == &rel_pat.rel_type;
137                let src_ok = src_label_id_opt.map(|id| id == *sid as u32).unwrap_or(true);
138                let dst_ok = dst_label_id_opt.map(|id| id == *did as u32).unwrap_or(true);
139                type_ok && src_ok && dst_ok
140            })
141            .map(|(cid, sid, did, rt)| (cid, sid as u32, did as u32, rt))
142            .collect();
143
144        // Pre-compute col_ids for inline prop filters (avoid re-computing per slot).
145        let src_filter_col_ids: Vec<u32> = src_node_pat
146            .props
147            .iter()
148            .map(|p| prop_name_to_col_id(&p.key))
149            .collect();
150        let dst_filter_col_ids: Vec<u32> = dst_node_pat
151            .props
152            .iter()
153            .map(|p| prop_name_to_col_id(&p.key))
154            .collect();
155
156        let mut result: Vec<(NodeId, NodeId, String)> = Vec::new();
157
158        for (catalog_rel_id, effective_src_lid, effective_dst_lid, rel_type) in &rel_tables {
159            let catalog_rel_id_u32 =
160                u32::try_from(*catalog_rel_id).expect("catalog_rel_id overflowed u32");
161
162            // ── Checkpointed edges (CSR) ──────────────────────────────────────
163            let hwm_src = match self.snapshot.store.hwm_for_label(*effective_src_lid) {
164                Ok(hwm) => hwm,
165                Err(_) => continue,
166            };
167            for src_slot in 0..hwm_src {
168                let src_node = NodeId(((*effective_src_lid as u64) << 32) | src_slot);
169                if self.is_node_tombstoned(src_node) {
170                    continue;
171                }
172                if !self.node_matches_prop_filter(
173                    src_node,
174                    &src_filter_col_ids,
175                    &src_node_pat.props,
176                ) {
177                    continue;
178                }
179                for dst_slot in self.csr_neighbors(catalog_rel_id_u32, src_slot) {
180                    let dst_node = NodeId(((*effective_dst_lid as u64) << 32) | dst_slot);
181                    if self.is_node_tombstoned(dst_node) {
182                        continue;
183                    }
184                    if !self.node_matches_prop_filter(
185                        dst_node,
186                        &dst_filter_col_ids,
187                        &dst_node_pat.props,
188                    ) {
189                        continue;
190                    }
191                    result.push((src_node, dst_node, rel_type.clone()));
192                }
193            }
194
195            // ── Uncheckpointed edges (delta log) ──────────────────────────────
196            for rec in self.read_delta_for(catalog_rel_id_u32) {
197                let r_src_label = (rec.src.0 >> 32) as u32;
198                let r_dst_label = (rec.dst.0 >> 32) as u32;
199                if src_label_id_opt
200                    .map(|id| id != r_src_label)
201                    .unwrap_or(false)
202                {
203                    continue;
204                }
205                if dst_label_id_opt
206                    .map(|id| id != r_dst_label)
207                    .unwrap_or(false)
208                {
209                    continue;
210                }
211                if self.is_node_tombstoned(rec.src) || self.is_node_tombstoned(rec.dst) {
212                    continue;
213                }
214                if !self.node_matches_prop_filter(rec.src, &src_filter_col_ids, &src_node_pat.props)
215                {
216                    continue;
217                }
218                if !self.node_matches_prop_filter(rec.dst, &dst_filter_col_ids, &dst_node_pat.props)
219                {
220                    continue;
221                }
222                result.push((rec.src, rec.dst, rel_type.clone()));
223            }
224        }
225
226        Ok(result)
227    }
228
229    // ── Node-scan helpers (shared by scan_match_create and scan_match_create_rows) ──
230
231    /// Returns `true` if the given node has been tombstoned (col 0 == u64::MAX).
232    ///
233    /// `NotFound` is expected for new/sparse nodes where col_0 has not been
234    /// written yet and is treated as "not tombstoned".  All other errors are
235    /// logged as warnings and also treated as "not tombstoned" so that
236    /// transient storage issues do not suppress valid nodes during a scan.
237    pub(crate) fn is_node_tombstoned(&self, node_id: NodeId) -> bool {
238        match self.snapshot.store.get_node_raw(node_id, &[0u32]) {
239            Ok(col0) => col0.iter().any(|&(c, v)| c == 0 && v == u64::MAX),
240            Err(sparrowdb_common::Error::NotFound) => false,
241            Err(e) => {
242                tracing::warn!(
243                    node_id = node_id.0,
244                    error = ?e,
245                    "tombstone check failed; treating node as not tombstoned"
246                );
247                false
248            }
249        }
250    }
251
252    /// Returns `true` if `node_id` satisfies every inline prop predicate in
253    /// `filter_col_ids` / `props`.
254    ///
255    /// `filter_col_ids` must be pre-computed from `props` with
256    /// `prop_name_to_col_id`.  Pass an empty slice when there are no filters
257    /// (the method returns `true` immediately).
258    pub(crate) fn node_matches_prop_filter(
259        &self,
260        node_id: NodeId,
261        filter_col_ids: &[u32],
262        props: &[sparrowdb_cypher::ast::PropEntry],
263    ) -> bool {
264        if props.is_empty() {
265            return true;
266        }
267        match self.snapshot.store.get_node_raw(node_id, filter_col_ids) {
268            Ok(raw_props) => matches_prop_filter_static(
269                &raw_props,
270                props,
271                &self.dollar_params(),
272                &self.snapshot.store,
273            ),
274            Err(_) => false,
275        }
276    }
277
278    // ── Scan for MATCH…CREATE (called by GraphDb with a write transaction) ──────
279
280    /// Return all live `NodeId`s for `label_id` whose inline prop predicates
281    /// match, using the `PropertyIndex` for O(1) equality lookups when possible.
282    ///
283    /// ## Index path (O(log n) per unique value)
284    ///
285    /// When there is exactly one inline prop filter and the literal is directly
286    /// encodable (integers and strings ≤ 7 bytes), the method:
287    ///   1. Calls `build_for` lazily — reads the column file once and caches it.
288    ///   2. Does a single `BTreeMap::get` to obtain the matching slot list.
289    ///   3. Verifies tombstones on the (usually tiny) candidate set.
290    ///
291    /// ## Fallback (O(n) full scan)
292    ///
293    /// When the filter cannot use the index (overflow string, multiple props,
294    /// parameter expressions, or `build_for` I/O error) the method falls back
295    /// to iterating all `0..hwm` slots — the same behaviour as before this fix.
296    ///
297    /// ## Multi-label support (SPA-200)
298    ///
299    /// After the primary-label scan, nodes where `label_id` is a **secondary**
300    /// label are also added from the catalog reverse index.  This ensures
301    /// `MATCH (n:A)` finds nodes where `:A` is secondary (e.g. `CREATE (n:B:A)`).
302    ///
303    /// ## Integration
304    ///
305    /// This replaces the inline `for slot in 0..hwm` blocks in
306    /// `scan_match_create`, `scan_match_create_rows`, and `scan_match_mutate`
307    /// so that the index is used consistently across all write-side MATCH paths.
308    pub(crate) fn scan_nodes_for_label_with_index(
309        &self,
310        label_id: u32,
311        node_props: &[sparrowdb_cypher::ast::PropEntry],
312    ) -> Result<Vec<NodeId>> {
313        let hwm = self.snapshot.store.hwm_for_label(label_id)?;
314
315        // Collect filter col_ids up-front (needed for the fallback path too).
316        let filter_col_ids: Vec<u32> = node_props
317            .iter()
318            .map(|p| prop_name_to_col_id(&p.key))
319            .collect();
320
321        // ── Lazy index build ────────────────────────────────────────────────
322        // Ensure the property index is loaded for every column referenced by
323        // inline prop filters.  `build_for` is idempotent (cache-hit no-op
324        // after the first call) and suppresses I/O errors internally.
325        for &col_id in &filter_col_ids {
326            let _ = self
327                .prop_index
328                .borrow_mut()
329                .build_for(&self.snapshot.store, label_id, col_id);
330        }
331
332        // ── Index lookup (single-equality filter, literal value) ────────────
333        let index_slots: Option<Vec<u32>> = {
334            let prop_index_ref = self.prop_index.borrow();
335            try_index_lookup_for_props(node_props, label_id, &prop_index_ref)
336        };
337
338        if let Some(candidate_slots) = index_slots {
339            // O(k) verification over a small candidate set (typically 1 slot).
340            let mut result = Vec::with_capacity(candidate_slots.len());
341            for slot in candidate_slots {
342                let node_id = NodeId(((label_id as u64) << 32) | slot as u64);
343                if self.is_node_tombstoned(node_id) {
344                    continue;
345                }
346                // For multi-prop filters the index only narrowed on one column;
347                // verify the remaining filters here.
348                if !self.node_matches_prop_filter(node_id, &filter_col_ids, node_props) {
349                    continue;
350                }
351                result.push(node_id);
352            }
353            // SPA-200: also check secondary-label hits (not in the property index,
354            // which is keyed on primary label only).
355            self.append_secondary_label_hits(label_id, &filter_col_ids, node_props, &mut result);
356            return Ok(result);
357        }
358
359        // ── Fallback: full O(N) scan ────────────────────────────────────────
360        let mut result = Vec::new();
361        for slot in 0..hwm {
362            let node_id = NodeId(((label_id as u64) << 32) | slot);
363            if self.is_node_tombstoned(node_id) {
364                continue;
365            }
366            if !self.node_matches_prop_filter(node_id, &filter_col_ids, node_props) {
367                continue;
368            }
369            result.push(node_id);
370        }
371
372        // SPA-200: union nodes where label_id is a *secondary* label.
373        self.append_secondary_label_hits(label_id, &filter_col_ids, node_props, &mut result);
374
375        Ok(result)
376    }
377
378    /// Append nodes that have `label_id` as a **secondary** label to `result`,
379    /// applying property filters.  Used by `scan_nodes_for_label_with_index`
380    /// to implement SPA-200 multi-label MATCH semantics.
381    ///
382    /// Nodes already in `result` (primary-label hits) are not duplicated.
383    fn append_secondary_label_hits(
384        &self,
385        label_id: u32,
386        filter_col_ids: &[u32],
387        node_props: &[sparrowdb_cypher::ast::PropEntry],
388        result: &mut Vec<NodeId>,
389    ) {
390        // Build a quick-lookup set of already-found nodes to avoid duplicates.
391        let already_found: HashSet<NodeId> = result.iter().copied().collect();
392
393        let lid = label_id as sparrowdb_catalog::LabelId;
394        for node_id in self.snapshot.catalog.nodes_with_secondary_label(lid) {
395            if already_found.contains(&node_id) {
396                continue;
397            }
398            if self.is_node_tombstoned(node_id) {
399                continue;
400            }
401            // For secondary-label nodes, properties are stored under the
402            // primary-label directory (encoded in node_id).  The col_ids are
403            // the same — we read the stored columns and apply the filter.
404            if !self.node_matches_prop_filter(node_id, filter_col_ids, node_props) {
405                continue;
406            }
407            result.push(node_id);
408        }
409    }
410
411    /// Scan nodes matching the MATCH patterns in a `MatchCreateStatement` and
412    /// return a map of variable name → Vec<NodeId> for each named node pattern.
413    ///
414    /// The caller (GraphDb) uses this to resolve variable bindings before
415    /// calling `WriteTx::create_edge` for each edge in the CREATE clause.
416    pub fn scan_match_create(
417        &self,
418        mc: &MatchCreateStatement,
419    ) -> Result<HashMap<String, Vec<NodeId>>> {
420        let mut var_candidates: HashMap<String, Vec<NodeId>> = HashMap::new();
421
422        for pat in &mc.match_patterns {
423            for node_pat in &pat.nodes {
424                if node_pat.var.is_empty() {
425                    continue;
426                }
427                // Skip if already resolved (same var can appear in multiple patterns).
428                if var_candidates.contains_key(&node_pat.var) {
429                    continue;
430                }
431
432                let label = node_pat.labels.first().cloned().unwrap_or_default();
433                let label_id: u32 = match self.snapshot.catalog.get_label(&label)? {
434                    Some(id) => id as u32,
435                    None => {
436                        // Label not found → no matching nodes for this variable.
437                        var_candidates.insert(node_pat.var.clone(), vec![]);
438                        continue;
439                    }
440                };
441
442                // Use the property index for O(1) equality lookups when possible,
443                // falling back to a full O(N) scan for overflow strings / params.
444                let matching_ids =
445                    self.scan_nodes_for_label_with_index(label_id, &node_pat.props)?;
446
447                var_candidates.insert(node_pat.var.clone(), matching_ids);
448            }
449        }
450
451        Ok(var_candidates)
452    }
453
454    /// Execute the MATCH portion of a `MatchCreateStatement` and return one
455    /// binding map per matched row.
456    ///
457    /// Each element of the returned `Vec` is a `HashMap<variable_name, NodeId>`
458    /// that represents one fully-correlated result row from the MATCH clause.
459    /// The caller uses these to drive `WriteTx::create_edge` — one call per row.
460    ///
461    /// # Algorithm
462    ///
463    /// For each `PathPattern` in `match_patterns`:
464    /// - **No relationships** (node-only pattern): scan the node store applying
465    ///   inline prop filters; collect one candidate set per named variable.
466    ///   Cross-join these sets with the rows accumulated so far.
467    /// - **One relationship hop** (`(a)-[:R]->(b)`): traverse the CSR + delta
468    ///   log to enumerate actual (src, dst) pairs that are connected by an edge,
469    ///   then filter each node against its inline prop predicates.  Only
470    ///   correlated pairs are yielded — this is the key difference from the old
471    ///   `scan_match_create` which treated every node as an independent
472    ///   candidate and then took a full Cartesian product.
473    ///
474    /// Patterns beyond a single hop are not yet supported and return an error.
475    pub fn scan_match_create_rows(
476        &self,
477        mc: &MatchCreateStatement,
478    ) -> Result<Vec<HashMap<String, NodeId>>> {
479        // Start with a single empty row (identity for cross-join).
480        let mut accumulated: Vec<HashMap<String, NodeId>> = vec![HashMap::new()];
481
482        for pat in &mc.match_patterns {
483            if pat.rels.is_empty() {
484                // ── Node-only pattern: collect candidates per variable, then
485                //    cross-join into accumulated rows. ──────────────────────
486                //
487                // Collect each named node variable's candidate list.
488                let mut per_var: Vec<(String, Vec<NodeId>)> = Vec::new();
489
490                for node_pat in &pat.nodes {
491                    if node_pat.var.is_empty() {
492                        continue;
493                    }
494
495                    // SPA-211: when no label is specified, scan all registered
496                    // labels so that unlabeled MATCH patterns find nodes of
497                    // any type (instead of silently returning empty).
498                    let scan_label_ids: Vec<u32> = if node_pat.labels.is_empty() {
499                        self.snapshot
500                            .catalog
501                            .list_labels()?
502                            .into_iter()
503                            .map(|(id, _)| id as u32)
504                            .collect()
505                    } else {
506                        let label = node_pat.labels.first().cloned().unwrap_or_default();
507                        match self.snapshot.catalog.get_label(&label)? {
508                            Some(id) => vec![id as u32],
509                            None => {
510                                // No nodes can match → entire MATCH yields nothing.
511                                return Ok(vec![]);
512                            }
513                        }
514                    };
515
516                    // Use the property index for O(1) equality lookups when possible,
517                    // falling back to a full O(N) scan for overflow strings / params.
518                    let mut matching_ids: Vec<NodeId> = Vec::new();
519                    for label_id in scan_label_ids {
520                        let ids =
521                            self.scan_nodes_for_label_with_index(label_id, &node_pat.props)?;
522                        matching_ids.extend(ids);
523                    }
524
525                    if matching_ids.is_empty() {
526                        // No matching nodes → entire MATCH is empty.
527                        return Ok(vec![]);
528                    }
529
530                    per_var.push((node_pat.var.clone(), matching_ids));
531                }
532
533                // Cross-join the per_var candidates into accumulated.
534                // `candidates` is guaranteed non-empty (checked above), so the result
535                // will be non-empty as long as `accumulated` is non-empty.
536                for (var, candidates) in per_var {
537                    let mut next: Vec<HashMap<String, NodeId>> = Vec::new();
538                    for row in &accumulated {
539                        for &node_id in &candidates {
540                            let mut new_row = row.clone();
541                            new_row.insert(var.clone(), node_id);
542                            next.push(new_row);
543                        }
544                    }
545                    accumulated = next;
546                }
547            } else if pat.rels.len() == 1 && pat.nodes.len() == 2 {
548                // ── Single-hop relationship pattern: traverse CSR + delta edges
549                //    to produce correlated (src, dst) pairs. ─────────────────
550                let src_node_pat = &pat.nodes[0];
551                let dst_node_pat = &pat.nodes[1];
552                let rel_pat = &pat.rels[0];
553
554                // Only outgoing direction is supported for MATCH…CREATE traversal.
555                if rel_pat.dir != sparrowdb_cypher::ast::EdgeDir::Outgoing {
556                    return Err(sparrowdb_common::Error::Unimplemented);
557                }
558
559                let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
560                let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
561
562                let src_label_id: u32 = match self.snapshot.catalog.get_label(&src_label)? {
563                    Some(id) => id as u32,
564                    None => return Ok(vec![]),
565                };
566                let dst_label_id: u32 = match self.snapshot.catalog.get_label(&dst_label)? {
567                    Some(id) => id as u32,
568                    None => return Ok(vec![]),
569                };
570
571                let src_filter_cols: Vec<u32> = src_node_pat
572                    .props
573                    .iter()
574                    .map(|p| prop_name_to_col_id(&p.key))
575                    .collect();
576                let dst_filter_cols: Vec<u32> = dst_node_pat
577                    .props
578                    .iter()
579                    .map(|p| prop_name_to_col_id(&p.key))
580                    .collect();
581
582                // SPA-185: resolve per-type rel table for delta and CSR reads.
583                let rel_lookup =
584                    self.resolve_rel_table_id(src_label_id, dst_label_id, &rel_pat.rel_type);
585                if matches!(rel_lookup, RelTableLookup::NotFound) {
586                    return Ok(vec![]);
587                }
588
589                // Build a src_slot → Vec<dst_slot> adjacency map from the delta log once,
590                // filtering by src_label to avoid O(N*M) scanning inside the outer loop.
591                let delta_adj: HashMap<u64, Vec<u64>> = {
592                    let records: Vec<DeltaRecord> = match rel_lookup {
593                        RelTableLookup::Found(rtid) => self.read_delta_for(rtid),
594                        _ => self.read_delta_all(),
595                    };
596                    let mut adj: HashMap<u64, Vec<u64>> = HashMap::new();
597                    for r in records {
598                        let s = r.src.0;
599                        let s_label = (s >> 32) as u32;
600                        if s_label == src_label_id {
601                            let s_slot = s & 0xFFFF_FFFF;
602                            adj.entry(s_slot).or_default().push(r.dst.0 & 0xFFFF_FFFF);
603                        }
604                    }
605                    adj
606                };
607
608                let hwm_src = self.snapshot.store.hwm_for_label(src_label_id)?;
609
610                // Pairs yielded by this pattern for cross-join below.
611                let mut pattern_rows: Vec<HashMap<String, NodeId>> = Vec::new();
612
613                for src_slot in 0..hwm_src {
614                    // SPA-254: check per-query deadline at every slot boundary.
615                    self.check_deadline()?;
616
617                    let src_node = NodeId(((src_label_id as u64) << 32) | src_slot);
618
619                    if self.is_node_tombstoned(src_node) {
620                        continue;
621                    }
622                    if !self.node_matches_prop_filter(
623                        src_node,
624                        &src_filter_cols,
625                        &src_node_pat.props,
626                    ) {
627                        continue;
628                    }
629
630                    // Collect outgoing neighbours (CSR + delta adjacency map).
631                    let csr_neighbors_vec: Vec<u64> = match rel_lookup {
632                        RelTableLookup::Found(rtid) => self.csr_neighbors(rtid, src_slot),
633                        _ => self.csr_neighbors_all(src_slot),
634                    };
635                    let empty: Vec<u64> = Vec::new();
636                    let delta_neighbors: &[u64] =
637                        delta_adj.get(&src_slot).map_or(&empty, |v| v.as_slice());
638
639                    let mut seen: HashSet<u64> = HashSet::new();
640                    for &dst_slot in csr_neighbors_vec.iter().chain(delta_neighbors.iter()) {
641                        if !seen.insert(dst_slot) {
642                            continue;
643                        }
644                        let dst_node = NodeId(((dst_label_id as u64) << 32) | dst_slot);
645
646                        if self.is_node_tombstoned(dst_node) {
647                            continue;
648                        }
649                        if !self.node_matches_prop_filter(
650                            dst_node,
651                            &dst_filter_cols,
652                            &dst_node_pat.props,
653                        ) {
654                            continue;
655                        }
656
657                        let mut row: HashMap<String, NodeId> = HashMap::new();
658
659                        // When src and dst use the same variable (self-loop pattern),
660                        // the edge must actually be a self-loop (src == dst).
661                        if !src_node_pat.var.is_empty()
662                            && !dst_node_pat.var.is_empty()
663                            && src_node_pat.var == dst_node_pat.var
664                        {
665                            if src_node != dst_node {
666                                continue;
667                            }
668                            row.insert(src_node_pat.var.clone(), src_node);
669                        } else {
670                            if !src_node_pat.var.is_empty() {
671                                row.insert(src_node_pat.var.clone(), src_node);
672                            }
673                            if !dst_node_pat.var.is_empty() {
674                                row.insert(dst_node_pat.var.clone(), dst_node);
675                            }
676                        }
677                        pattern_rows.push(row);
678                    }
679                }
680
681                if pattern_rows.is_empty() {
682                    return Ok(vec![]);
683                }
684
685                // Cross-join pattern_rows into accumulated, enforcing shared-variable
686                // constraints: if a variable appears in both acc_row and pat_row, only
687                // keep combinations where they agree on the same NodeId.
688                let mut next: Vec<HashMap<String, NodeId>> = Vec::new();
689                for acc_row in &accumulated {
690                    'outer: for pat_row in &pattern_rows {
691                        // Reject combinations where shared variables disagree.
692                        for (k, v) in pat_row {
693                            if let Some(existing) = acc_row.get(k) {
694                                if existing != v {
695                                    continue 'outer;
696                                }
697                            }
698                        }
699                        let mut new_row = acc_row.clone();
700                        new_row.extend(pat_row.iter().map(|(k, v)| (k.clone(), *v)));
701                        next.push(new_row);
702                    }
703                }
704                accumulated = next;
705            } else {
706                // Multi-hop patterns not yet supported for MATCH…CREATE.
707                return Err(sparrowdb_common::Error::Unimplemented);
708            }
709        }
710
711        Ok(accumulated)
712    }
713
714    /// Scan the MATCH patterns of a `MatchMergeRelStatement` and return
715    /// correlated `(variable → NodeId)` binding rows — identical semantics to
716    /// `scan_match_create_rows` but taking the MERGE form's match patterns (SPA-233).
717    pub fn scan_match_merge_rel_rows(
718        &self,
719        mm: &MatchMergeRelStatement,
720    ) -> Result<Vec<HashMap<String, NodeId>>> {
721        // Reuse scan_match_create_rows by wrapping the MERGE patterns in a
722        // MatchCreateStatement with an empty (no-op) CREATE body.
723        let proxy = MatchCreateStatement {
724            match_patterns: mm.match_patterns.clone(),
725            match_props: vec![],
726            create: CreateStatement {
727                nodes: vec![],
728                edges: vec![],
729            },
730        };
731        self.scan_match_create_rows(&proxy)
732    }
733
734    // ── UNWIND ─────────────────────────────────────────────────────────────────
735
736    pub(crate) fn execute_unwind(&self, u: &UnwindStatement) -> Result<QueryResult> {
737        use crate::operators::{Operator, UnwindOperator};
738
739        // Evaluate the list expression to a Vec<Value>.
740        let values = eval_list_expr(&u.expr, &self.params)?;
741
742        // Determine the output column name from the RETURN clause.
743        let column_names = extract_return_column_names(&u.return_clause.items);
744
745        if values.is_empty() {
746            return Ok(QueryResult::empty(column_names));
747        }
748
749        let mut op = UnwindOperator::new(u.alias.clone(), values);
750        let chunks = op.collect_all()?;
751
752        // Materialize: for each chunk/group/row, project the RETURN columns.
753        //
754        // Only fall back to the UNWIND alias value when the output column
755        // actually corresponds to the alias variable.  Returning a value for
756        // an unrelated variable (e.g. `RETURN y` when alias is `x`) would
757        // silently produce wrong results instead of NULL.
758        let mut rows: Vec<Vec<Value>> = Vec::new();
759        for chunk in &chunks {
760            for group in &chunk.groups {
761                let n = group.len();
762                for row_idx in 0..n {
763                    let row = u
764                        .return_clause
765                        .items
766                        .iter()
767                        .map(|item| {
768                            // Determine whether this RETURN item refers to the
769                            // alias variable produced by UNWIND.
770                            let is_alias = match &item.expr {
771                                Expr::Var(name) => name == &u.alias,
772                                _ => false,
773                            };
774                            if is_alias {
775                                group.get_value(&u.alias, row_idx).unwrap_or(Value::Null)
776                            } else {
777                                // Variable is not in scope for this UNWIND —
778                                // return NULL rather than leaking the alias value.
779                                Value::Null
780                            }
781                        })
782                        .collect();
783                    rows.push(row);
784                }
785            }
786        }
787
788        Ok(QueryResult {
789            columns: column_names,
790            rows,
791        })
792    }
793
794    // ── CREATE node execution ─────────────────────────────────────────────────
795
796    /// Execute a `CREATE` statement, auto-registering labels as needed (SPA-156).
797    ///
798    /// For each node in the CREATE clause:
799    /// 1. Look up (or create) its **primary** label (first label in the
800    ///    pattern) in the catalog.  The primary label determines the `NodeId`
801    ///    encoding and storage directory.
802    /// 2. Resolve all secondary labels (labels[1..]) and record them in the
803    ///    catalog side table after the node is created (SPA-200).
804    /// 3. Convert inline properties to `(col_id, StoreValue)` pairs using the
805    ///    same FNV-1a hash used by `WriteTx::merge_node`.
806    /// 4. Write the node to the node store.
807    pub(crate) fn execute_create(&mut self, create: &CreateStatement) -> Result<QueryResult> {
808        for node in &create.nodes {
809            // Resolve the primary label, creating it if absent.
810            let label = node.labels.first().cloned().unwrap_or_default();
811
812            // SPA-208: reject reserved __SO_ label prefix.
813            if is_reserved_label(&label) {
814                return Err(sparrowdb_common::Error::InvalidArgument(format!(
815                    "invalid argument: label \"{label}\" is reserved — the __SO_ prefix is for internal use only"
816                )));
817            }
818
819            // SPA-200: reject reserved __SO_ prefix on secondary labels too.
820            for secondary_label_name in node.labels.iter().skip(1) {
821                if is_reserved_label(secondary_label_name) {
822                    return Err(sparrowdb_common::Error::InvalidArgument(format!(
823                        "invalid argument: label \"{secondary_label_name}\" is reserved — the __SO_ prefix is for internal use only"
824                    )));
825                }
826            }
827
828            let label_id: u32 = match self.snapshot.catalog.get_label(&label)? {
829                Some(id) => id as u32,
830                None => self.snapshot.catalog.create_label(&label)? as u32,
831            };
832
833            // Convert AST props to (col_id, StoreValue) pairs.
834            // Property values are full expressions (e.g. `datetime()`),
835            // evaluated with an empty binding map.
836            let empty_bindings: HashMap<String, Value> = HashMap::new();
837            let props: Vec<(u32, StoreValue)> = node
838                .props
839                .iter()
840                .map(|entry| {
841                    let col_id = prop_name_to_col_id(&entry.key);
842                    let val = eval_expr(&entry.value, &empty_bindings);
843                    let store_val = value_to_store_value(val);
844                    (col_id, store_val)
845                })
846                .collect();
847
848            // SPA-234: enforce UNIQUE constraints declared via
849            // `CREATE CONSTRAINT ON (n:Label) ASSERT n.property IS UNIQUE`.
850            // For each constrained (label_id, col_id) pair, check whether the
851            // incoming value already exists in the property index.  If so,
852            // return a constraint-violation error before writing the node.
853            //
854            // Only inline-encodable types (Int64 and short Bytes ≤ 7 bytes)
855            // are checked via the prop_index fast path.  Float values and
856            // long strings require heap storage and cannot be encoded with
857            // to_u64(); for those types we return an explicit error rather
858            // than panicking (StoreValue::Float::to_u64 is documented to
859            // panic for heap-backed values).
860            for (col_id, store_val) in &props {
861                if self.unique_constraints.contains(&(label_id, *col_id)) {
862                    let raw = match store_val {
863                        StoreValue::Int64(_) => store_val.to_u64(),
864                        StoreValue::Bytes(b) if b.len() <= 7 => store_val.to_u64(),
865                        StoreValue::Bytes(_) => {
866                            return Err(sparrowdb_common::Error::InvalidArgument(
867                                "UNIQUE constraints on string values longer than 7 bytes are not yet supported".into(),
868                            ));
869                        }
870                        StoreValue::Float(_) => {
871                            return Err(sparrowdb_common::Error::InvalidArgument(
872                                "UNIQUE constraints on float values are not yet supported".into(),
873                            ));
874                        }
875                    };
876                    if !self
877                        .prop_index
878                        .borrow()
879                        .lookup(label_id, *col_id, raw)
880                        .is_empty()
881                    {
882                        return Err(sparrowdb_common::Error::InvalidArgument(format!(
883                            "unique constraint violation: label \"{label}\" already has a node with the same value for this property"
884                        )));
885                    }
886                }
887            }
888
889            let node_id = self.snapshot.store.create_node(label_id, &props)?;
890            // SPA-234: after writing, insert new values into the prop_index so
891            // that subsequent creates in the same session also respect the
892            // UNIQUE constraint (the index may be stale if built before this
893            // node was written).
894            {
895                let slot =
896                    sparrowdb_storage::property_index::PropertyIndex::node_id_to_slot(node_id);
897                let mut idx = self.prop_index.borrow_mut();
898                for (col_id, store_val) in &props {
899                    if self.unique_constraints.contains(&(label_id, *col_id)) {
900                        // Only insert inline-encodable values; Float/long Bytes
901                        // were already rejected above before create_node was called.
902                        let raw = match store_val {
903                            StoreValue::Int64(_) => store_val.to_u64(),
904                            StoreValue::Bytes(b) if b.len() <= 7 => store_val.to_u64(),
905                            _ => continue,
906                        };
907                        idx.insert(label_id, *col_id, slot, raw);
908                    }
909                }
910            }
911            // SPA-200: record secondary labels in the catalog side table.
912            // The primary label is already encoded in `node_id`; secondary
913            // labels are persisted here so that MATCH on secondary labels
914            // (and labels(n)) return the full label set.
915            if node.labels.len() > 1 {
916                let mut secondary_label_ids: Vec<sparrowdb_catalog::LabelId> = Vec::new();
917                for secondary_name in node.labels.iter().skip(1) {
918                    let sid = match self.snapshot.catalog.get_label(secondary_name)? {
919                        Some(id) => id,
920                        None => self.snapshot.catalog.create_label(secondary_name)?,
921                    };
922                    secondary_label_ids.push(sid);
923                }
924                self.snapshot
925                    .catalog
926                    .record_secondary_labels(node_id, &secondary_label_ids)?;
927            }
928
929            // Update cached row count for the planner (SPA-new).
930            *self
931                .snapshot
932                .label_row_counts
933                .entry(label_id as LabelId)
934                .or_insert(0) += 1;
935        }
936        Ok(QueryResult::empty(vec![]))
937    }
938
939    pub(crate) fn execute_create_index(
940        &mut self,
941        label: &str,
942        property: &str,
943    ) -> Result<QueryResult> {
944        let label_id: u32 = match self.snapshot.catalog.get_label(label)? {
945            Some(id) => id as u32,
946            None => return Ok(QueryResult::empty(vec![])),
947        };
948        let col_id = col_id_of(property);
949        self.prop_index
950            .borrow_mut()
951            .build_for(&self.snapshot.store, label_id, col_id)?;
952        Ok(QueryResult::empty(vec![]))
953    }
954
955    /// Execute `CREATE CONSTRAINT ON (n:Label) ASSERT n.property IS UNIQUE` (SPA-234).
956    ///
957    /// Records `(label_id, col_id)` in `self.unique_constraints` so that
958    /// subsequent `execute_create` calls reject duplicate values.  Also builds
959    /// the backing prop-index for that pair (needed to check existence cheaply).
960    /// If the label does not yet exist in the catalog it is auto-created so that
961    /// later `CREATE` statements can register against the constraint.
962    pub(crate) fn execute_create_constraint(
963        &mut self,
964        label: &str,
965        property: &str,
966    ) -> Result<QueryResult> {
967        let label_id: u32 = match self.snapshot.catalog.get_label(label)? {
968            Some(id) => id as u32,
969            None => self.snapshot.catalog.create_label(label)? as u32,
970        };
971        let col_id = col_id_of(property);
972
973        // Build the property index for this (label_id, col_id) pair so that
974        // uniqueness checks in execute_create can use O(log n) lookups.
975        self.prop_index
976            .borrow_mut()
977            .build_for(&self.snapshot.store, label_id, col_id)?;
978
979        // Register the constraint.
980        self.unique_constraints.insert((label_id, col_id));
981
982        Ok(QueryResult::empty(vec![]))
983    }
984}