Skip to main content

sparrowdb_execution/engine/
mutation.rs

1//! Auto-generated submodule — see engine/mod.rs for context.
2use super::*;
3
4impl Engine {
5    // ── Mutation execution (called by GraphDb with a write transaction) ────────
6
7    /// Scan nodes matching the MATCH patterns in a `MatchMutate` statement and
8    /// return the list of matching `NodeId`s.  The caller is responsible for
9    /// applying the actual mutations inside a write transaction.
10    pub fn scan_match_mutate(&self, mm: &MatchMutateStatement) -> Result<Vec<NodeId>> {
11        if mm.match_patterns.is_empty() {
12            return Ok(vec![]);
13        }
14
15        // Guard: only single-node patterns (no multi-pattern, no relationship hops)
16        // are supported.  Silently ignoring extra patterns would mutate the wrong
17        // nodes; instead we surface a clear error.
18        if mm.match_patterns.len() != 1 || !mm.match_patterns[0].rels.is_empty() {
19            return Err(sparrowdb_common::Error::InvalidArgument(
20                "MATCH...SET/DELETE currently supports only single-node patterns (no relationships)"
21                    .into(),
22            ));
23        }
24
25        let pat = &mm.match_patterns[0];
26        if pat.nodes.is_empty() {
27            return Ok(vec![]);
28        }
29        let node_pat = &pat.nodes[0];
30        let label = node_pat.labels.first().cloned().unwrap_or_default();
31
32        let label_id = match self.snapshot.catalog.get_label(&label)? {
33            Some(id) => id as u32,
34            // SPA-266: unknown label → no nodes can match; return empty result.
35            None => return Ok(vec![]),
36        };
37
38        // Col_ids referenced by the WHERE clause (needed for WHERE evaluation
39        // even after the index narrows candidates by inline prop filter).
40        let mut where_col_ids: Vec<u32> = node_pat
41            .props
42            .iter()
43            .map(|pe| prop_name_to_col_id(&pe.key))
44            .collect();
45        if let Some(ref where_expr) = mm.where_clause {
46            collect_col_ids_from_expr(where_expr, &mut where_col_ids);
47        }
48
49        let var_name = node_pat.var.as_str();
50
51        // Use the property index for O(1) equality lookups on inline prop
52        // filters, falling back to full scan for overflow strings / params.
53        let candidates = self.scan_nodes_for_label_with_index(label_id, &node_pat.props)?;
54
55        let mut matching_ids = Vec::new();
56        for node_id in candidates {
57            // Re-read props needed for WHERE clause evaluation.
58            if mm.where_clause.is_some() {
59                let props = read_node_props(&self.snapshot.store, node_id, &where_col_ids)?;
60                if let Some(ref where_expr) = mm.where_clause {
61                    let mut row_vals =
62                        build_row_vals(&props, var_name, &where_col_ids, &self.snapshot.store);
63                    row_vals.extend(self.dollar_params());
64                    if !self.eval_where_graph(where_expr, &row_vals) {
65                        continue;
66                    }
67                }
68            }
69            matching_ids.push(node_id);
70        }
71
72        Ok(matching_ids)
73    }
74
75    /// Return the mutation carried by a `MatchMutate` statement, exposing it
76    /// to the caller (GraphDb) so it can apply it inside a write transaction.
77    pub fn mutation_from_match_mutate(mm: &MatchMutateStatement) -> &Mutation {
78        &mm.mutation
79    }
80
81    /// Scan edges matching a MATCH pattern with exactly one hop and return
82    /// `(src, dst, rel_type)` tuples for edge deletion.
83    ///
84    /// Supports `MATCH (a:Label)-[r:REL]->(b:Label) DELETE r` with optional
85    /// inline property filters on source and destination node patterns.
86    ///
87    /// Includes both checkpointed (CSR) and uncheckpointed (delta) edges.
88    pub fn scan_match_mutate_edges(
89        &self,
90        mm: &MatchMutateStatement,
91    ) -> Result<Vec<(NodeId, NodeId, String)>> {
92        if mm.match_patterns.len() != 1 {
93            return Err(sparrowdb_common::Error::InvalidArgument(
94                "MATCH...DELETE edge: only single-path patterns are supported".into(),
95            ));
96        }
97        let pat = &mm.match_patterns[0];
98        if pat.rels.len() != 1 || pat.nodes.len() != 2 {
99            return Err(sparrowdb_common::Error::InvalidArgument(
100                "MATCH...DELETE edge: pattern must have exactly one relationship hop".into(),
101            ));
102        }
103
104        let src_node_pat = &pat.nodes[0];
105        let dst_node_pat = &pat.nodes[1];
106        let rel_pat = &pat.rels[0];
107
108        let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
109        let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
110
111        // Resolve optional label-id constraints.
112        let src_label_id_opt: Option<u32> = if src_label.is_empty() {
113            None
114        } else {
115            match self.snapshot.catalog.get_label(&src_label)? {
116                Some(id) => Some(id as u32),
117                None => return Ok(vec![]), // unknown label → no matches
118            }
119        };
120        let dst_label_id_opt: Option<u32> = if dst_label.is_empty() {
121            None
122        } else {
123            match self.snapshot.catalog.get_label(&dst_label)? {
124                Some(id) => Some(id as u32),
125                None => return Ok(vec![]), // unknown label → no matches
126            }
127        };
128
129        // Filter registered rel tables by rel type and src/dst label.
130        let rel_tables: Vec<(u64, u32, u32, String)> = self
131            .snapshot
132            .catalog
133            .list_rel_tables_with_ids()
134            .into_iter()
135            .filter(|(_, sid, did, rt)| {
136                let type_ok = rel_pat.rel_type.is_empty() || rt == &rel_pat.rel_type;
137                let src_ok = src_label_id_opt.map(|id| id == *sid as u32).unwrap_or(true);
138                let dst_ok = dst_label_id_opt.map(|id| id == *did as u32).unwrap_or(true);
139                type_ok && src_ok && dst_ok
140            })
141            .map(|(cid, sid, did, rt)| (cid, sid as u32, did as u32, rt))
142            .collect();
143
144        // Pre-compute col_ids for inline prop filters (avoid re-computing per slot).
145        let src_filter_col_ids: Vec<u32> = src_node_pat
146            .props
147            .iter()
148            .map(|p| prop_name_to_col_id(&p.key))
149            .collect();
150        let dst_filter_col_ids: Vec<u32> = dst_node_pat
151            .props
152            .iter()
153            .map(|p| prop_name_to_col_id(&p.key))
154            .collect();
155
156        let mut result: Vec<(NodeId, NodeId, String)> = Vec::new();
157
158        for (catalog_rel_id, effective_src_lid, effective_dst_lid, rel_type) in &rel_tables {
159            let catalog_rel_id_u32 =
160                u32::try_from(*catalog_rel_id).expect("catalog_rel_id overflowed u32");
161
162            // ── Checkpointed edges (CSR) ──────────────────────────────────────
163            let hwm_src = match self.snapshot.store.hwm_for_label(*effective_src_lid) {
164                Ok(hwm) => hwm,
165                Err(_) => continue,
166            };
167            for src_slot in 0..hwm_src {
168                let src_node = NodeId(((*effective_src_lid as u64) << 32) | src_slot);
169                if self.is_node_tombstoned(src_node) {
170                    continue;
171                }
172                if !self.node_matches_prop_filter(
173                    src_node,
174                    &src_filter_col_ids,
175                    &src_node_pat.props,
176                ) {
177                    continue;
178                }
179                for dst_slot in self.csr_neighbors(catalog_rel_id_u32, src_slot) {
180                    let dst_node = NodeId(((*effective_dst_lid as u64) << 32) | dst_slot);
181                    if self.is_node_tombstoned(dst_node) {
182                        continue;
183                    }
184                    if !self.node_matches_prop_filter(
185                        dst_node,
186                        &dst_filter_col_ids,
187                        &dst_node_pat.props,
188                    ) {
189                        continue;
190                    }
191                    result.push((src_node, dst_node, rel_type.clone()));
192                }
193            }
194
195            // ── Uncheckpointed edges (delta log) ──────────────────────────────
196            for rec in self.read_delta_for(catalog_rel_id_u32) {
197                let r_src_label = (rec.src.0 >> 32) as u32;
198                let r_dst_label = (rec.dst.0 >> 32) as u32;
199                if src_label_id_opt
200                    .map(|id| id != r_src_label)
201                    .unwrap_or(false)
202                {
203                    continue;
204                }
205                if dst_label_id_opt
206                    .map(|id| id != r_dst_label)
207                    .unwrap_or(false)
208                {
209                    continue;
210                }
211                if self.is_node_tombstoned(rec.src) || self.is_node_tombstoned(rec.dst) {
212                    continue;
213                }
214                if !self.node_matches_prop_filter(rec.src, &src_filter_col_ids, &src_node_pat.props)
215                {
216                    continue;
217                }
218                if !self.node_matches_prop_filter(rec.dst, &dst_filter_col_ids, &dst_node_pat.props)
219                {
220                    continue;
221                }
222                result.push((rec.src, rec.dst, rel_type.clone()));
223            }
224        }
225
226        Ok(result)
227    }
228
229    // ── Node-scan helpers (shared by scan_match_create and scan_match_create_rows) ──
230
231    /// Returns `true` if the given node has been tombstoned (col 0 == u64::MAX).
232    ///
233    /// `NotFound` is expected for new/sparse nodes where col_0 has not been
234    /// written yet and is treated as "not tombstoned".  All other errors are
235    /// logged as warnings and also treated as "not tombstoned" so that
236    /// transient storage issues do not suppress valid nodes during a scan.
237    pub(crate) fn is_node_tombstoned(&self, node_id: NodeId) -> bool {
238        match self.snapshot.store.get_node_raw(node_id, &[0u32]) {
239            Ok(col0) => col0.iter().any(|&(c, v)| c == 0 && v == u64::MAX),
240            Err(sparrowdb_common::Error::NotFound) => false,
241            Err(e) => {
242                tracing::warn!(
243                    node_id = node_id.0,
244                    error = ?e,
245                    "tombstone check failed; treating node as not tombstoned"
246                );
247                false
248            }
249        }
250    }
251
252    /// Returns `true` if `node_id` satisfies every inline prop predicate in
253    /// `filter_col_ids` / `props`.
254    ///
255    /// `filter_col_ids` must be pre-computed from `props` with
256    /// `prop_name_to_col_id`.  Pass an empty slice when there are no filters
257    /// (the method returns `true` immediately).
258    pub(crate) fn node_matches_prop_filter(
259        &self,
260        node_id: NodeId,
261        filter_col_ids: &[u32],
262        props: &[sparrowdb_cypher::ast::PropEntry],
263    ) -> bool {
264        if props.is_empty() {
265            return true;
266        }
267        match self.snapshot.store.get_node_raw(node_id, filter_col_ids) {
268            Ok(raw_props) => matches_prop_filter_static(
269                &raw_props,
270                props,
271                &self.dollar_params(),
272                &self.snapshot.store,
273            ),
274            Err(_) => false,
275        }
276    }
277
278    // ── Scan for MATCH…CREATE (called by GraphDb with a write transaction) ──────
279
280    /// Return all live `NodeId`s for `label_id` whose inline prop predicates
281    /// match, using the `PropertyIndex` for O(1) equality lookups when possible.
282    ///
283    /// ## Index path (O(log n) per unique value)
284    ///
285    /// When there is exactly one inline prop filter and the literal is directly
286    /// encodable (integers and strings ≤ 7 bytes), the method:
287    ///   1. Calls `build_for` lazily — reads the column file once and caches it.
288    ///   2. Does a single `BTreeMap::get` to obtain the matching slot list.
289    ///   3. Verifies tombstones on the (usually tiny) candidate set.
290    ///
291    /// ## Fallback (O(n) full scan)
292    ///
293    /// When the filter cannot use the index (overflow string, multiple props,
294    /// parameter expressions, or `build_for` I/O error) the method falls back
295    /// to iterating all `0..hwm` slots — the same behaviour as before this fix.
296    ///
297    /// ## Integration
298    ///
299    /// This replaces the inline `for slot in 0..hwm` blocks in
300    /// `scan_match_create`, `scan_match_create_rows`, and `scan_match_mutate`
301    /// so that the index is used consistently across all write-side MATCH paths.
302    pub(crate) fn scan_nodes_for_label_with_index(
303        &self,
304        label_id: u32,
305        node_props: &[sparrowdb_cypher::ast::PropEntry],
306    ) -> Result<Vec<NodeId>> {
307        let hwm = self.snapshot.store.hwm_for_label(label_id)?;
308
309        // Collect filter col_ids up-front (needed for the fallback path too).
310        let filter_col_ids: Vec<u32> = node_props
311            .iter()
312            .map(|p| prop_name_to_col_id(&p.key))
313            .collect();
314
315        // ── Lazy index build ────────────────────────────────────────────────
316        // Ensure the property index is loaded for every column referenced by
317        // inline prop filters.  `build_for` is idempotent (cache-hit no-op
318        // after the first call) and suppresses I/O errors internally.
319        for &col_id in &filter_col_ids {
320            let _ = self
321                .prop_index
322                .borrow_mut()
323                .build_for(&self.snapshot.store, label_id, col_id);
324        }
325
326        // ── Index lookup (single-equality filter, literal value) ────────────
327        let index_slots: Option<Vec<u32>> = {
328            let prop_index_ref = self.prop_index.borrow();
329            try_index_lookup_for_props(node_props, label_id, &prop_index_ref)
330        };
331
332        if let Some(candidate_slots) = index_slots {
333            // O(k) verification over a small candidate set (typically 1 slot).
334            let mut result = Vec::with_capacity(candidate_slots.len());
335            for slot in candidate_slots {
336                let node_id = NodeId(((label_id as u64) << 32) | slot as u64);
337                if self.is_node_tombstoned(node_id) {
338                    continue;
339                }
340                // For multi-prop filters the index only narrowed on one column;
341                // verify the remaining filters here.
342                if !self.node_matches_prop_filter(node_id, &filter_col_ids, node_props) {
343                    continue;
344                }
345                result.push(node_id);
346            }
347            return Ok(result);
348        }
349
350        // ── Fallback: full O(N) scan ────────────────────────────────────────
351        let mut result = Vec::new();
352        for slot in 0..hwm {
353            let node_id = NodeId(((label_id as u64) << 32) | slot);
354            if self.is_node_tombstoned(node_id) {
355                continue;
356            }
357            if !self.node_matches_prop_filter(node_id, &filter_col_ids, node_props) {
358                continue;
359            }
360            result.push(node_id);
361        }
362        Ok(result)
363    }
364
365    /// Scan nodes matching the MATCH patterns in a `MatchCreateStatement` and
366    /// return a map of variable name → Vec<NodeId> for each named node pattern.
367    ///
368    /// The caller (GraphDb) uses this to resolve variable bindings before
369    /// calling `WriteTx::create_edge` for each edge in the CREATE clause.
370    pub fn scan_match_create(
371        &self,
372        mc: &MatchCreateStatement,
373    ) -> Result<HashMap<String, Vec<NodeId>>> {
374        let mut var_candidates: HashMap<String, Vec<NodeId>> = HashMap::new();
375
376        for pat in &mc.match_patterns {
377            for node_pat in &pat.nodes {
378                if node_pat.var.is_empty() {
379                    continue;
380                }
381                // Skip if already resolved (same var can appear in multiple patterns).
382                if var_candidates.contains_key(&node_pat.var) {
383                    continue;
384                }
385
386                let label = node_pat.labels.first().cloned().unwrap_or_default();
387                let label_id: u32 = match self.snapshot.catalog.get_label(&label)? {
388                    Some(id) => id as u32,
389                    None => {
390                        // Label not found → no matching nodes for this variable.
391                        var_candidates.insert(node_pat.var.clone(), vec![]);
392                        continue;
393                    }
394                };
395
396                // Use the property index for O(1) equality lookups when possible,
397                // falling back to a full O(N) scan for overflow strings / params.
398                let matching_ids =
399                    self.scan_nodes_for_label_with_index(label_id, &node_pat.props)?;
400
401                var_candidates.insert(node_pat.var.clone(), matching_ids);
402            }
403        }
404
405        Ok(var_candidates)
406    }
407
408    /// Execute the MATCH portion of a `MatchCreateStatement` and return one
409    /// binding map per matched row.
410    ///
411    /// Each element of the returned `Vec` is a `HashMap<variable_name, NodeId>`
412    /// that represents one fully-correlated result row from the MATCH clause.
413    /// The caller uses these to drive `WriteTx::create_edge` — one call per row.
414    ///
415    /// # Algorithm
416    ///
417    /// For each `PathPattern` in `match_patterns`:
418    /// - **No relationships** (node-only pattern): scan the node store applying
419    ///   inline prop filters; collect one candidate set per named variable.
420    ///   Cross-join these sets with the rows accumulated so far.
421    /// - **One relationship hop** (`(a)-[:R]->(b)`): traverse the CSR + delta
422    ///   log to enumerate actual (src, dst) pairs that are connected by an edge,
423    ///   then filter each node against its inline prop predicates.  Only
424    ///   correlated pairs are yielded — this is the key difference from the old
425    ///   `scan_match_create` which treated every node as an independent
426    ///   candidate and then took a full Cartesian product.
427    ///
428    /// Patterns beyond a single hop are not yet supported and return an error.
429    pub fn scan_match_create_rows(
430        &self,
431        mc: &MatchCreateStatement,
432    ) -> Result<Vec<HashMap<String, NodeId>>> {
433        // Start with a single empty row (identity for cross-join).
434        let mut accumulated: Vec<HashMap<String, NodeId>> = vec![HashMap::new()];
435
436        for pat in &mc.match_patterns {
437            if pat.rels.is_empty() {
438                // ── Node-only pattern: collect candidates per variable, then
439                //    cross-join into accumulated rows. ──────────────────────
440                //
441                // Collect each named node variable's candidate list.
442                let mut per_var: Vec<(String, Vec<NodeId>)> = Vec::new();
443
444                for node_pat in &pat.nodes {
445                    if node_pat.var.is_empty() {
446                        continue;
447                    }
448
449                    // SPA-211: when no label is specified, scan all registered
450                    // labels so that unlabeled MATCH patterns find nodes of
451                    // any type (instead of silently returning empty).
452                    let scan_label_ids: Vec<u32> = if node_pat.labels.is_empty() {
453                        self.snapshot
454                            .catalog
455                            .list_labels()?
456                            .into_iter()
457                            .map(|(id, _)| id as u32)
458                            .collect()
459                    } else {
460                        let label = node_pat.labels.first().cloned().unwrap_or_default();
461                        match self.snapshot.catalog.get_label(&label)? {
462                            Some(id) => vec![id as u32],
463                            None => {
464                                // No nodes can match → entire MATCH yields nothing.
465                                return Ok(vec![]);
466                            }
467                        }
468                    };
469
470                    // Use the property index for O(1) equality lookups when possible,
471                    // falling back to a full O(N) scan for overflow strings / params.
472                    let mut matching_ids: Vec<NodeId> = Vec::new();
473                    for label_id in scan_label_ids {
474                        let ids =
475                            self.scan_nodes_for_label_with_index(label_id, &node_pat.props)?;
476                        matching_ids.extend(ids);
477                    }
478
479                    if matching_ids.is_empty() {
480                        // No matching nodes → entire MATCH is empty.
481                        return Ok(vec![]);
482                    }
483
484                    per_var.push((node_pat.var.clone(), matching_ids));
485                }
486
487                // Cross-join the per_var candidates into accumulated.
488                // `candidates` is guaranteed non-empty (checked above), so the result
489                // will be non-empty as long as `accumulated` is non-empty.
490                for (var, candidates) in per_var {
491                    let mut next: Vec<HashMap<String, NodeId>> = Vec::new();
492                    for row in &accumulated {
493                        for &node_id in &candidates {
494                            let mut new_row = row.clone();
495                            new_row.insert(var.clone(), node_id);
496                            next.push(new_row);
497                        }
498                    }
499                    accumulated = next;
500                }
501            } else if pat.rels.len() == 1 && pat.nodes.len() == 2 {
502                // ── Single-hop relationship pattern: traverse CSR + delta edges
503                //    to produce correlated (src, dst) pairs. ─────────────────
504                let src_node_pat = &pat.nodes[0];
505                let dst_node_pat = &pat.nodes[1];
506                let rel_pat = &pat.rels[0];
507
508                // Only outgoing direction is supported for MATCH…CREATE traversal.
509                if rel_pat.dir != sparrowdb_cypher::ast::EdgeDir::Outgoing {
510                    return Err(sparrowdb_common::Error::Unimplemented);
511                }
512
513                let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
514                let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
515
516                let src_label_id: u32 = match self.snapshot.catalog.get_label(&src_label)? {
517                    Some(id) => id as u32,
518                    None => return Ok(vec![]),
519                };
520                let dst_label_id: u32 = match self.snapshot.catalog.get_label(&dst_label)? {
521                    Some(id) => id as u32,
522                    None => return Ok(vec![]),
523                };
524
525                let src_filter_cols: Vec<u32> = src_node_pat
526                    .props
527                    .iter()
528                    .map(|p| prop_name_to_col_id(&p.key))
529                    .collect();
530                let dst_filter_cols: Vec<u32> = dst_node_pat
531                    .props
532                    .iter()
533                    .map(|p| prop_name_to_col_id(&p.key))
534                    .collect();
535
536                // SPA-185: resolve per-type rel table for delta and CSR reads.
537                let rel_lookup =
538                    self.resolve_rel_table_id(src_label_id, dst_label_id, &rel_pat.rel_type);
539                if matches!(rel_lookup, RelTableLookup::NotFound) {
540                    return Ok(vec![]);
541                }
542
543                // Build a src_slot → Vec<dst_slot> adjacency map from the delta log once,
544                // filtering by src_label to avoid O(N*M) scanning inside the outer loop.
545                let delta_adj: HashMap<u64, Vec<u64>> = {
546                    let records: Vec<DeltaRecord> = match rel_lookup {
547                        RelTableLookup::Found(rtid) => self.read_delta_for(rtid),
548                        _ => self.read_delta_all(),
549                    };
550                    let mut adj: HashMap<u64, Vec<u64>> = HashMap::new();
551                    for r in records {
552                        let s = r.src.0;
553                        let s_label = (s >> 32) as u32;
554                        if s_label == src_label_id {
555                            let s_slot = s & 0xFFFF_FFFF;
556                            adj.entry(s_slot).or_default().push(r.dst.0 & 0xFFFF_FFFF);
557                        }
558                    }
559                    adj
560                };
561
562                let hwm_src = self.snapshot.store.hwm_for_label(src_label_id)?;
563
564                // Pairs yielded by this pattern for cross-join below.
565                let mut pattern_rows: Vec<HashMap<String, NodeId>> = Vec::new();
566
567                for src_slot in 0..hwm_src {
568                    // SPA-254: check per-query deadline at every slot boundary.
569                    self.check_deadline()?;
570
571                    let src_node = NodeId(((src_label_id as u64) << 32) | src_slot);
572
573                    if self.is_node_tombstoned(src_node) {
574                        continue;
575                    }
576                    if !self.node_matches_prop_filter(
577                        src_node,
578                        &src_filter_cols,
579                        &src_node_pat.props,
580                    ) {
581                        continue;
582                    }
583
584                    // Collect outgoing neighbours (CSR + delta adjacency map).
585                    let csr_neighbors_vec: Vec<u64> = match rel_lookup {
586                        RelTableLookup::Found(rtid) => self.csr_neighbors(rtid, src_slot),
587                        _ => self.csr_neighbors_all(src_slot),
588                    };
589                    let empty: Vec<u64> = Vec::new();
590                    let delta_neighbors: &[u64] =
591                        delta_adj.get(&src_slot).map_or(&empty, |v| v.as_slice());
592
593                    let mut seen: HashSet<u64> = HashSet::new();
594                    for &dst_slot in csr_neighbors_vec.iter().chain(delta_neighbors.iter()) {
595                        if !seen.insert(dst_slot) {
596                            continue;
597                        }
598                        let dst_node = NodeId(((dst_label_id as u64) << 32) | dst_slot);
599
600                        if self.is_node_tombstoned(dst_node) {
601                            continue;
602                        }
603                        if !self.node_matches_prop_filter(
604                            dst_node,
605                            &dst_filter_cols,
606                            &dst_node_pat.props,
607                        ) {
608                            continue;
609                        }
610
611                        let mut row: HashMap<String, NodeId> = HashMap::new();
612
613                        // When src and dst use the same variable (self-loop pattern),
614                        // the edge must actually be a self-loop (src == dst).
615                        if !src_node_pat.var.is_empty()
616                            && !dst_node_pat.var.is_empty()
617                            && src_node_pat.var == dst_node_pat.var
618                        {
619                            if src_node != dst_node {
620                                continue;
621                            }
622                            row.insert(src_node_pat.var.clone(), src_node);
623                        } else {
624                            if !src_node_pat.var.is_empty() {
625                                row.insert(src_node_pat.var.clone(), src_node);
626                            }
627                            if !dst_node_pat.var.is_empty() {
628                                row.insert(dst_node_pat.var.clone(), dst_node);
629                            }
630                        }
631                        pattern_rows.push(row);
632                    }
633                }
634
635                if pattern_rows.is_empty() {
636                    return Ok(vec![]);
637                }
638
639                // Cross-join pattern_rows into accumulated, enforcing shared-variable
640                // constraints: if a variable appears in both acc_row and pat_row, only
641                // keep combinations where they agree on the same NodeId.
642                let mut next: Vec<HashMap<String, NodeId>> = Vec::new();
643                for acc_row in &accumulated {
644                    'outer: for pat_row in &pattern_rows {
645                        // Reject combinations where shared variables disagree.
646                        for (k, v) in pat_row {
647                            if let Some(existing) = acc_row.get(k) {
648                                if existing != v {
649                                    continue 'outer;
650                                }
651                            }
652                        }
653                        let mut new_row = acc_row.clone();
654                        new_row.extend(pat_row.iter().map(|(k, v)| (k.clone(), *v)));
655                        next.push(new_row);
656                    }
657                }
658                accumulated = next;
659            } else {
660                // Multi-hop patterns not yet supported for MATCH…CREATE.
661                return Err(sparrowdb_common::Error::Unimplemented);
662            }
663        }
664
665        Ok(accumulated)
666    }
667
668    /// Scan the MATCH patterns of a `MatchMergeRelStatement` and return
669    /// correlated `(variable → NodeId)` binding rows — identical semantics to
670    /// `scan_match_create_rows` but taking the MERGE form's match patterns (SPA-233).
671    pub fn scan_match_merge_rel_rows(
672        &self,
673        mm: &MatchMergeRelStatement,
674    ) -> Result<Vec<HashMap<String, NodeId>>> {
675        // Reuse scan_match_create_rows by wrapping the MERGE patterns in a
676        // MatchCreateStatement with an empty (no-op) CREATE body.
677        let proxy = MatchCreateStatement {
678            match_patterns: mm.match_patterns.clone(),
679            match_props: vec![],
680            create: CreateStatement {
681                nodes: vec![],
682                edges: vec![],
683            },
684        };
685        self.scan_match_create_rows(&proxy)
686    }
687
688    // ── UNWIND ─────────────────────────────────────────────────────────────────
689
690    pub(crate) fn execute_unwind(&self, u: &UnwindStatement) -> Result<QueryResult> {
691        use crate::operators::{Operator, UnwindOperator};
692
693        // Evaluate the list expression to a Vec<Value>.
694        let values = eval_list_expr(&u.expr, &self.params)?;
695
696        // Determine the output column name from the RETURN clause.
697        let column_names = extract_return_column_names(&u.return_clause.items);
698
699        if values.is_empty() {
700            return Ok(QueryResult::empty(column_names));
701        }
702
703        let mut op = UnwindOperator::new(u.alias.clone(), values);
704        let chunks = op.collect_all()?;
705
706        // Materialize: for each chunk/group/row, project the RETURN columns.
707        //
708        // Only fall back to the UNWIND alias value when the output column
709        // actually corresponds to the alias variable.  Returning a value for
710        // an unrelated variable (e.g. `RETURN y` when alias is `x`) would
711        // silently produce wrong results instead of NULL.
712        let mut rows: Vec<Vec<Value>> = Vec::new();
713        for chunk in &chunks {
714            for group in &chunk.groups {
715                let n = group.len();
716                for row_idx in 0..n {
717                    let row = u
718                        .return_clause
719                        .items
720                        .iter()
721                        .map(|item| {
722                            // Determine whether this RETURN item refers to the
723                            // alias variable produced by UNWIND.
724                            let is_alias = match &item.expr {
725                                Expr::Var(name) => name == &u.alias,
726                                _ => false,
727                            };
728                            if is_alias {
729                                group.get_value(&u.alias, row_idx).unwrap_or(Value::Null)
730                            } else {
731                                // Variable is not in scope for this UNWIND —
732                                // return NULL rather than leaking the alias value.
733                                Value::Null
734                            }
735                        })
736                        .collect();
737                    rows.push(row);
738                }
739            }
740        }
741
742        Ok(QueryResult {
743            columns: column_names,
744            rows,
745        })
746    }
747
748    // ── CREATE node execution ─────────────────────────────────────────────────
749
750    /// Execute a `CREATE` statement, auto-registering labels as needed (SPA-156).
751    ///
752    /// For each node in the CREATE clause:
753    /// 1. Look up (or create) its primary label in the catalog.
754    /// 2. Convert inline properties to `(col_id, StoreValue)` pairs using the
755    ///    same FNV-1a hash used by `WriteTx::merge_node`.
756    /// 3. Write the node to the node store.
757    pub(crate) fn execute_create(&mut self, create: &CreateStatement) -> Result<QueryResult> {
758        for node in &create.nodes {
759            // Resolve the primary label, creating it if absent.
760            let label = node.labels.first().cloned().unwrap_or_default();
761
762            // SPA-208: reject reserved __SO_ label prefix.
763            if is_reserved_label(&label) {
764                return Err(sparrowdb_common::Error::InvalidArgument(format!(
765                    "invalid argument: label \"{label}\" is reserved — the __SO_ prefix is for internal use only"
766                )));
767            }
768
769            let label_id: u32 = match self.snapshot.catalog.get_label(&label)? {
770                Some(id) => id as u32,
771                None => self.snapshot.catalog.create_label(&label)? as u32,
772            };
773
774            // Convert AST props to (col_id, StoreValue) pairs.
775            // Property values are full expressions (e.g. `datetime()`),
776            // evaluated with an empty binding map.
777            let empty_bindings: HashMap<String, Value> = HashMap::new();
778            let props: Vec<(u32, StoreValue)> = node
779                .props
780                .iter()
781                .map(|entry| {
782                    let col_id = prop_name_to_col_id(&entry.key);
783                    let val = eval_expr(&entry.value, &empty_bindings);
784                    let store_val = value_to_store_value(val);
785                    (col_id, store_val)
786                })
787                .collect();
788
789            // SPA-234: enforce UNIQUE constraints declared via
790            // `CREATE CONSTRAINT ON (n:Label) ASSERT n.property IS UNIQUE`.
791            // For each constrained (label_id, col_id) pair, check whether the
792            // incoming value already exists in the property index.  If so,
793            // return a constraint-violation error before writing the node.
794            //
795            // Only inline-encodable types (Int64 and short Bytes ≤ 7 bytes)
796            // are checked via the prop_index fast path.  Float values and
797            // long strings require heap storage and cannot be encoded with
798            // to_u64(); for those types we return an explicit error rather
799            // than panicking (StoreValue::Float::to_u64 is documented to
800            // panic for heap-backed values).
801            for (col_id, store_val) in &props {
802                if self.unique_constraints.contains(&(label_id, *col_id)) {
803                    let raw = match store_val {
804                        StoreValue::Int64(_) => store_val.to_u64(),
805                        StoreValue::Bytes(b) if b.len() <= 7 => store_val.to_u64(),
806                        StoreValue::Bytes(_) => {
807                            return Err(sparrowdb_common::Error::InvalidArgument(
808                                "UNIQUE constraints on string values longer than 7 bytes are not yet supported".into(),
809                            ));
810                        }
811                        StoreValue::Float(_) => {
812                            return Err(sparrowdb_common::Error::InvalidArgument(
813                                "UNIQUE constraints on float values are not yet supported".into(),
814                            ));
815                        }
816                    };
817                    if !self
818                        .prop_index
819                        .borrow()
820                        .lookup(label_id, *col_id, raw)
821                        .is_empty()
822                    {
823                        return Err(sparrowdb_common::Error::InvalidArgument(format!(
824                            "unique constraint violation: label \"{label}\" already has a node with the same value for this property"
825                        )));
826                    }
827                }
828            }
829
830            let node_id = self.snapshot.store.create_node(label_id, &props)?;
831            // SPA-234: after writing, insert new values into the prop_index so
832            // that subsequent creates in the same session also respect the
833            // UNIQUE constraint (the index may be stale if built before this
834            // node was written).
835            {
836                let slot =
837                    sparrowdb_storage::property_index::PropertyIndex::node_id_to_slot(node_id);
838                let mut idx = self.prop_index.borrow_mut();
839                for (col_id, store_val) in &props {
840                    if self.unique_constraints.contains(&(label_id, *col_id)) {
841                        // Only insert inline-encodable values; Float/long Bytes
842                        // were already rejected above before create_node was called.
843                        let raw = match store_val {
844                            StoreValue::Int64(_) => store_val.to_u64(),
845                            StoreValue::Bytes(b) if b.len() <= 7 => store_val.to_u64(),
846                            _ => continue,
847                        };
848                        idx.insert(label_id, *col_id, slot, raw);
849                    }
850                }
851            }
852            // Update cached row count for the planner (SPA-new).
853            *self
854                .snapshot
855                .label_row_counts
856                .entry(label_id as LabelId)
857                .or_insert(0) += 1;
858        }
859        Ok(QueryResult::empty(vec![]))
860    }
861
862    pub(crate) fn execute_create_index(
863        &mut self,
864        label: &str,
865        property: &str,
866    ) -> Result<QueryResult> {
867        let label_id: u32 = match self.snapshot.catalog.get_label(label)? {
868            Some(id) => id as u32,
869            None => return Ok(QueryResult::empty(vec![])),
870        };
871        let col_id = col_id_of(property);
872        self.prop_index
873            .borrow_mut()
874            .build_for(&self.snapshot.store, label_id, col_id)?;
875        Ok(QueryResult::empty(vec![]))
876    }
877
878    /// Execute `CREATE CONSTRAINT ON (n:Label) ASSERT n.property IS UNIQUE` (SPA-234).
879    ///
880    /// Records `(label_id, col_id)` in `self.unique_constraints` so that
881    /// subsequent `execute_create` calls reject duplicate values.  Also builds
882    /// the backing prop-index for that pair (needed to check existence cheaply).
883    /// If the label does not yet exist in the catalog it is auto-created so that
884    /// later `CREATE` statements can register against the constraint.
885    pub(crate) fn execute_create_constraint(
886        &mut self,
887        label: &str,
888        property: &str,
889    ) -> Result<QueryResult> {
890        let label_id: u32 = match self.snapshot.catalog.get_label(label)? {
891            Some(id) => id as u32,
892            None => self.snapshot.catalog.create_label(label)? as u32,
893        };
894        let col_id = col_id_of(property);
895
896        // Build the property index for this (label_id, col_id) pair so that
897        // uniqueness checks in execute_create can use O(log n) lookups.
898        self.prop_index
899            .borrow_mut()
900            .build_for(&self.snapshot.store, label_id, col_id)?;
901
902        // Register the constraint.
903        self.unique_constraints.insert((label_id, col_id));
904
905        Ok(QueryResult::empty(vec![]))
906    }
907}