Skip to main content

sparrowdb_execution/engine/
mutation.rs

1//! Auto-generated submodule — see engine/mod.rs for context.
2use super::*;
3
4impl Engine {
5    // ── Mutation execution (called by GraphDb with a write transaction) ────────
6
7    /// Scan nodes matching the MATCH patterns in a `MatchMutate` statement and
8    /// return the list of matching `NodeId`s.  The caller is responsible for
9    /// applying the actual mutations inside a write transaction.
10    pub fn scan_match_mutate(&self, mm: &MatchMutateStatement) -> Result<Vec<NodeId>> {
11        if mm.match_patterns.is_empty() {
12            return Ok(vec![]);
13        }
14
15        // Guard: only single-node patterns (no multi-pattern, no relationship hops)
16        // are supported.  Silently ignoring extra patterns would mutate the wrong
17        // nodes; instead we surface a clear error.
18        if mm.match_patterns.len() != 1 || !mm.match_patterns[0].rels.is_empty() {
19            return Err(sparrowdb_common::Error::InvalidArgument(
20                "MATCH...SET/DELETE currently supports only single-node patterns (no relationships)"
21                    .into(),
22            ));
23        }
24
25        let pat = &mm.match_patterns[0];
26        if pat.nodes.is_empty() {
27            return Ok(vec![]);
28        }
29        let node_pat = &pat.nodes[0];
30        let label = node_pat.labels.first().cloned().unwrap_or_default();
31
32        let label_id = match self.snapshot.catalog.get_label(&label)? {
33            Some(id) => id as u32,
34            // SPA-266: unknown label → no nodes can match; return empty result.
35            None => return Ok(vec![]),
36        };
37
38        // Col_ids referenced by the WHERE clause (needed for WHERE evaluation
39        // even after the index narrows candidates by inline prop filter).
40        let mut where_col_ids: Vec<u32> = node_pat
41            .props
42            .iter()
43            .map(|pe| prop_name_to_col_id(&pe.key))
44            .collect();
45        if let Some(ref where_expr) = mm.where_clause {
46            collect_col_ids_from_expr(where_expr, &mut where_col_ids);
47        }
48
49        let var_name = node_pat.var.as_str();
50
51        // Use the property index for O(1) equality lookups on inline prop
52        // filters, falling back to full scan for overflow strings / params.
53        let candidates = self.scan_nodes_for_label_with_index(label_id, &node_pat.props)?;
54
55        let mut matching_ids = Vec::new();
56        for node_id in candidates {
57            // Re-read props needed for WHERE clause evaluation.
58            if mm.where_clause.is_some() {
59                let props = read_node_props(&self.snapshot.store, node_id, &where_col_ids)?;
60                if let Some(ref where_expr) = mm.where_clause {
61                    let mut row_vals =
62                        build_row_vals(&props, var_name, &where_col_ids, &self.snapshot.store);
63                    row_vals.extend(self.dollar_params());
64                    if !self.eval_where_graph(where_expr, &row_vals) {
65                        continue;
66                    }
67                }
68            }
69            matching_ids.push(node_id);
70        }
71
72        Ok(matching_ids)
73    }
74
75    /// Return the mutation carried by a `MatchMutate` statement, exposing it
76    /// to the caller (GraphDb) so it can apply it inside a write transaction.
77    pub fn mutation_from_match_mutate(mm: &MatchMutateStatement) -> &Mutation {
78        &mm.mutation
79    }
80
81    // ── Node-scan helpers (shared by scan_match_create and scan_match_create_rows) ──
82
83    /// Returns `true` if the given node has been tombstoned (col 0 == u64::MAX).
84    ///
85    /// `NotFound` is expected for new/sparse nodes where col_0 has not been
86    /// written yet and is treated as "not tombstoned".  All other errors are
87    /// logged as warnings and also treated as "not tombstoned" so that
88    /// transient storage issues do not suppress valid nodes during a scan.
89    pub(crate) fn is_node_tombstoned(&self, node_id: NodeId) -> bool {
90        match self.snapshot.store.get_node_raw(node_id, &[0u32]) {
91            Ok(col0) => col0.iter().any(|&(c, v)| c == 0 && v == u64::MAX),
92            Err(sparrowdb_common::Error::NotFound) => false,
93            Err(e) => {
94                tracing::warn!(
95                    node_id = node_id.0,
96                    error = ?e,
97                    "tombstone check failed; treating node as not tombstoned"
98                );
99                false
100            }
101        }
102    }
103
104    /// Returns `true` if `node_id` satisfies every inline prop predicate in
105    /// `filter_col_ids` / `props`.
106    ///
107    /// `filter_col_ids` must be pre-computed from `props` with
108    /// `prop_name_to_col_id`.  Pass an empty slice when there are no filters
109    /// (the method returns `true` immediately).
110    pub(crate) fn node_matches_prop_filter(
111        &self,
112        node_id: NodeId,
113        filter_col_ids: &[u32],
114        props: &[sparrowdb_cypher::ast::PropEntry],
115    ) -> bool {
116        if props.is_empty() {
117            return true;
118        }
119        match self.snapshot.store.get_node_raw(node_id, filter_col_ids) {
120            Ok(raw_props) => matches_prop_filter_static(
121                &raw_props,
122                props,
123                &self.dollar_params(),
124                &self.snapshot.store,
125            ),
126            Err(_) => false,
127        }
128    }
129
130    // ── Scan for MATCH…CREATE (called by GraphDb with a write transaction) ──────
131
132    /// Return all live `NodeId`s for `label_id` whose inline prop predicates
133    /// match, using the `PropertyIndex` for O(1) equality lookups when possible.
134    ///
135    /// ## Index path (O(log n) per unique value)
136    ///
137    /// When there is exactly one inline prop filter and the literal is directly
138    /// encodable (integers and strings ≤ 7 bytes), the method:
139    ///   1. Calls `build_for` lazily — reads the column file once and caches it.
140    ///   2. Does a single `BTreeMap::get` to obtain the matching slot list.
141    ///   3. Verifies tombstones on the (usually tiny) candidate set.
142    ///
143    /// ## Fallback (O(n) full scan)
144    ///
145    /// When the filter cannot use the index (overflow string, multiple props,
146    /// parameter expressions, or `build_for` I/O error) the method falls back
147    /// to iterating all `0..hwm` slots — the same behaviour as before this fix.
148    ///
149    /// ## Integration
150    ///
151    /// This replaces the inline `for slot in 0..hwm` blocks in
152    /// `scan_match_create`, `scan_match_create_rows`, and `scan_match_mutate`
153    /// so that the index is used consistently across all write-side MATCH paths.
154    pub(crate) fn scan_nodes_for_label_with_index(
155        &self,
156        label_id: u32,
157        node_props: &[sparrowdb_cypher::ast::PropEntry],
158    ) -> Result<Vec<NodeId>> {
159        let hwm = self.snapshot.store.hwm_for_label(label_id)?;
160
161        // Collect filter col_ids up-front (needed for the fallback path too).
162        let filter_col_ids: Vec<u32> = node_props
163            .iter()
164            .map(|p| prop_name_to_col_id(&p.key))
165            .collect();
166
167        // ── Lazy index build ────────────────────────────────────────────────
168        // Ensure the property index is loaded for every column referenced by
169        // inline prop filters.  `build_for` is idempotent (cache-hit no-op
170        // after the first call) and suppresses I/O errors internally.
171        for &col_id in &filter_col_ids {
172            let _ = self
173                .prop_index
174                .borrow_mut()
175                .build_for(&self.snapshot.store, label_id, col_id);
176        }
177
178        // ── Index lookup (single-equality filter, literal value) ────────────
179        let index_slots: Option<Vec<u32>> = {
180            let prop_index_ref = self.prop_index.borrow();
181            try_index_lookup_for_props(node_props, label_id, &prop_index_ref)
182        };
183
184        if let Some(candidate_slots) = index_slots {
185            // O(k) verification over a small candidate set (typically 1 slot).
186            let mut result = Vec::with_capacity(candidate_slots.len());
187            for slot in candidate_slots {
188                let node_id = NodeId(((label_id as u64) << 32) | slot as u64);
189                if self.is_node_tombstoned(node_id) {
190                    continue;
191                }
192                // For multi-prop filters the index only narrowed on one column;
193                // verify the remaining filters here.
194                if !self.node_matches_prop_filter(node_id, &filter_col_ids, node_props) {
195                    continue;
196                }
197                result.push(node_id);
198            }
199            return Ok(result);
200        }
201
202        // ── Fallback: full O(N) scan ────────────────────────────────────────
203        let mut result = Vec::new();
204        for slot in 0..hwm {
205            let node_id = NodeId(((label_id as u64) << 32) | slot);
206            if self.is_node_tombstoned(node_id) {
207                continue;
208            }
209            if !self.node_matches_prop_filter(node_id, &filter_col_ids, node_props) {
210                continue;
211            }
212            result.push(node_id);
213        }
214        Ok(result)
215    }
216
217    /// Scan nodes matching the MATCH patterns in a `MatchCreateStatement` and
218    /// return a map of variable name → Vec<NodeId> for each named node pattern.
219    ///
220    /// The caller (GraphDb) uses this to resolve variable bindings before
221    /// calling `WriteTx::create_edge` for each edge in the CREATE clause.
222    pub fn scan_match_create(
223        &self,
224        mc: &MatchCreateStatement,
225    ) -> Result<HashMap<String, Vec<NodeId>>> {
226        let mut var_candidates: HashMap<String, Vec<NodeId>> = HashMap::new();
227
228        for pat in &mc.match_patterns {
229            for node_pat in &pat.nodes {
230                if node_pat.var.is_empty() {
231                    continue;
232                }
233                // Skip if already resolved (same var can appear in multiple patterns).
234                if var_candidates.contains_key(&node_pat.var) {
235                    continue;
236                }
237
238                let label = node_pat.labels.first().cloned().unwrap_or_default();
239                let label_id: u32 = match self.snapshot.catalog.get_label(&label)? {
240                    Some(id) => id as u32,
241                    None => {
242                        // Label not found → no matching nodes for this variable.
243                        var_candidates.insert(node_pat.var.clone(), vec![]);
244                        continue;
245                    }
246                };
247
248                // Use the property index for O(1) equality lookups when possible,
249                // falling back to a full O(N) scan for overflow strings / params.
250                let matching_ids =
251                    self.scan_nodes_for_label_with_index(label_id, &node_pat.props)?;
252
253                var_candidates.insert(node_pat.var.clone(), matching_ids);
254            }
255        }
256
257        Ok(var_candidates)
258    }
259
260    /// Execute the MATCH portion of a `MatchCreateStatement` and return one
261    /// binding map per matched row.
262    ///
263    /// Each element of the returned `Vec` is a `HashMap<variable_name, NodeId>`
264    /// that represents one fully-correlated result row from the MATCH clause.
265    /// The caller uses these to drive `WriteTx::create_edge` — one call per row.
266    ///
267    /// # Algorithm
268    ///
269    /// For each `PathPattern` in `match_patterns`:
270    /// - **No relationships** (node-only pattern): scan the node store applying
271    ///   inline prop filters; collect one candidate set per named variable.
272    ///   Cross-join these sets with the rows accumulated so far.
273    /// - **One relationship hop** (`(a)-[:R]->(b)`): traverse the CSR + delta
274    ///   log to enumerate actual (src, dst) pairs that are connected by an edge,
275    ///   then filter each node against its inline prop predicates.  Only
276    ///   correlated pairs are yielded — this is the key difference from the old
277    ///   `scan_match_create` which treated every node as an independent
278    ///   candidate and then took a full Cartesian product.
279    ///
280    /// Patterns beyond a single hop are not yet supported and return an error.
281    pub fn scan_match_create_rows(
282        &self,
283        mc: &MatchCreateStatement,
284    ) -> Result<Vec<HashMap<String, NodeId>>> {
285        // Start with a single empty row (identity for cross-join).
286        let mut accumulated: Vec<HashMap<String, NodeId>> = vec![HashMap::new()];
287
288        for pat in &mc.match_patterns {
289            if pat.rels.is_empty() {
290                // ── Node-only pattern: collect candidates per variable, then
291                //    cross-join into accumulated rows. ──────────────────────
292                //
293                // Collect each named node variable's candidate list.
294                let mut per_var: Vec<(String, Vec<NodeId>)> = Vec::new();
295
296                for node_pat in &pat.nodes {
297                    if node_pat.var.is_empty() {
298                        continue;
299                    }
300
301                    // SPA-211: when no label is specified, scan all registered
302                    // labels so that unlabeled MATCH patterns find nodes of
303                    // any type (instead of silently returning empty).
304                    let scan_label_ids: Vec<u32> = if node_pat.labels.is_empty() {
305                        self.snapshot
306                            .catalog
307                            .list_labels()?
308                            .into_iter()
309                            .map(|(id, _)| id as u32)
310                            .collect()
311                    } else {
312                        let label = node_pat.labels.first().cloned().unwrap_or_default();
313                        match self.snapshot.catalog.get_label(&label)? {
314                            Some(id) => vec![id as u32],
315                            None => {
316                                // No nodes can match → entire MATCH yields nothing.
317                                return Ok(vec![]);
318                            }
319                        }
320                    };
321
322                    // Use the property index for O(1) equality lookups when possible,
323                    // falling back to a full O(N) scan for overflow strings / params.
324                    let mut matching_ids: Vec<NodeId> = Vec::new();
325                    for label_id in scan_label_ids {
326                        let ids =
327                            self.scan_nodes_for_label_with_index(label_id, &node_pat.props)?;
328                        matching_ids.extend(ids);
329                    }
330
331                    if matching_ids.is_empty() {
332                        // No matching nodes → entire MATCH is empty.
333                        return Ok(vec![]);
334                    }
335
336                    per_var.push((node_pat.var.clone(), matching_ids));
337                }
338
339                // Cross-join the per_var candidates into accumulated.
340                // `candidates` is guaranteed non-empty (checked above), so the result
341                // will be non-empty as long as `accumulated` is non-empty.
342                for (var, candidates) in per_var {
343                    let mut next: Vec<HashMap<String, NodeId>> = Vec::new();
344                    for row in &accumulated {
345                        for &node_id in &candidates {
346                            let mut new_row = row.clone();
347                            new_row.insert(var.clone(), node_id);
348                            next.push(new_row);
349                        }
350                    }
351                    accumulated = next;
352                }
353            } else if pat.rels.len() == 1 && pat.nodes.len() == 2 {
354                // ── Single-hop relationship pattern: traverse CSR + delta edges
355                //    to produce correlated (src, dst) pairs. ─────────────────
356                let src_node_pat = &pat.nodes[0];
357                let dst_node_pat = &pat.nodes[1];
358                let rel_pat = &pat.rels[0];
359
360                // Only outgoing direction is supported for MATCH…CREATE traversal.
361                if rel_pat.dir != sparrowdb_cypher::ast::EdgeDir::Outgoing {
362                    return Err(sparrowdb_common::Error::Unimplemented);
363                }
364
365                let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
366                let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
367
368                let src_label_id: u32 = match self.snapshot.catalog.get_label(&src_label)? {
369                    Some(id) => id as u32,
370                    None => return Ok(vec![]),
371                };
372                let dst_label_id: u32 = match self.snapshot.catalog.get_label(&dst_label)? {
373                    Some(id) => id as u32,
374                    None => return Ok(vec![]),
375                };
376
377                let src_filter_cols: Vec<u32> = src_node_pat
378                    .props
379                    .iter()
380                    .map(|p| prop_name_to_col_id(&p.key))
381                    .collect();
382                let dst_filter_cols: Vec<u32> = dst_node_pat
383                    .props
384                    .iter()
385                    .map(|p| prop_name_to_col_id(&p.key))
386                    .collect();
387
388                // SPA-185: resolve per-type rel table for delta and CSR reads.
389                let rel_lookup =
390                    self.resolve_rel_table_id(src_label_id, dst_label_id, &rel_pat.rel_type);
391                if matches!(rel_lookup, RelTableLookup::NotFound) {
392                    return Ok(vec![]);
393                }
394
395                // Build a src_slot → Vec<dst_slot> adjacency map from the delta log once,
396                // filtering by src_label to avoid O(N*M) scanning inside the outer loop.
397                let delta_adj: HashMap<u64, Vec<u64>> = {
398                    let records: Vec<DeltaRecord> = match rel_lookup {
399                        RelTableLookup::Found(rtid) => self.read_delta_for(rtid),
400                        _ => self.read_delta_all(),
401                    };
402                    let mut adj: HashMap<u64, Vec<u64>> = HashMap::new();
403                    for r in records {
404                        let s = r.src.0;
405                        let s_label = (s >> 32) as u32;
406                        if s_label == src_label_id {
407                            let s_slot = s & 0xFFFF_FFFF;
408                            adj.entry(s_slot).or_default().push(r.dst.0 & 0xFFFF_FFFF);
409                        }
410                    }
411                    adj
412                };
413
414                let hwm_src = self.snapshot.store.hwm_for_label(src_label_id)?;
415
416                // Pairs yielded by this pattern for cross-join below.
417                let mut pattern_rows: Vec<HashMap<String, NodeId>> = Vec::new();
418
419                for src_slot in 0..hwm_src {
420                    // SPA-254: check per-query deadline at every slot boundary.
421                    self.check_deadline()?;
422
423                    let src_node = NodeId(((src_label_id as u64) << 32) | src_slot);
424
425                    if self.is_node_tombstoned(src_node) {
426                        continue;
427                    }
428                    if !self.node_matches_prop_filter(
429                        src_node,
430                        &src_filter_cols,
431                        &src_node_pat.props,
432                    ) {
433                        continue;
434                    }
435
436                    // Collect outgoing neighbours (CSR + delta adjacency map).
437                    let csr_neighbors_vec: Vec<u64> = match rel_lookup {
438                        RelTableLookup::Found(rtid) => self.csr_neighbors(rtid, src_slot),
439                        _ => self.csr_neighbors_all(src_slot),
440                    };
441                    let empty: Vec<u64> = Vec::new();
442                    let delta_neighbors: &[u64] =
443                        delta_adj.get(&src_slot).map_or(&empty, |v| v.as_slice());
444
445                    let mut seen: HashSet<u64> = HashSet::new();
446                    for &dst_slot in csr_neighbors_vec.iter().chain(delta_neighbors.iter()) {
447                        if !seen.insert(dst_slot) {
448                            continue;
449                        }
450                        let dst_node = NodeId(((dst_label_id as u64) << 32) | dst_slot);
451
452                        if self.is_node_tombstoned(dst_node) {
453                            continue;
454                        }
455                        if !self.node_matches_prop_filter(
456                            dst_node,
457                            &dst_filter_cols,
458                            &dst_node_pat.props,
459                        ) {
460                            continue;
461                        }
462
463                        let mut row: HashMap<String, NodeId> = HashMap::new();
464
465                        // When src and dst use the same variable (self-loop pattern),
466                        // the edge must actually be a self-loop (src == dst).
467                        if !src_node_pat.var.is_empty()
468                            && !dst_node_pat.var.is_empty()
469                            && src_node_pat.var == dst_node_pat.var
470                        {
471                            if src_node != dst_node {
472                                continue;
473                            }
474                            row.insert(src_node_pat.var.clone(), src_node);
475                        } else {
476                            if !src_node_pat.var.is_empty() {
477                                row.insert(src_node_pat.var.clone(), src_node);
478                            }
479                            if !dst_node_pat.var.is_empty() {
480                                row.insert(dst_node_pat.var.clone(), dst_node);
481                            }
482                        }
483                        pattern_rows.push(row);
484                    }
485                }
486
487                if pattern_rows.is_empty() {
488                    return Ok(vec![]);
489                }
490
491                // Cross-join pattern_rows into accumulated, enforcing shared-variable
492                // constraints: if a variable appears in both acc_row and pat_row, only
493                // keep combinations where they agree on the same NodeId.
494                let mut next: Vec<HashMap<String, NodeId>> = Vec::new();
495                for acc_row in &accumulated {
496                    'outer: for pat_row in &pattern_rows {
497                        // Reject combinations where shared variables disagree.
498                        for (k, v) in pat_row {
499                            if let Some(existing) = acc_row.get(k) {
500                                if existing != v {
501                                    continue 'outer;
502                                }
503                            }
504                        }
505                        let mut new_row = acc_row.clone();
506                        new_row.extend(pat_row.iter().map(|(k, v)| (k.clone(), *v)));
507                        next.push(new_row);
508                    }
509                }
510                accumulated = next;
511            } else {
512                // Multi-hop patterns not yet supported for MATCH…CREATE.
513                return Err(sparrowdb_common::Error::Unimplemented);
514            }
515        }
516
517        Ok(accumulated)
518    }
519
520    /// Scan the MATCH patterns of a `MatchMergeRelStatement` and return
521    /// correlated `(variable → NodeId)` binding rows — identical semantics to
522    /// `scan_match_create_rows` but taking the MERGE form's match patterns (SPA-233).
523    pub fn scan_match_merge_rel_rows(
524        &self,
525        mm: &MatchMergeRelStatement,
526    ) -> Result<Vec<HashMap<String, NodeId>>> {
527        // Reuse scan_match_create_rows by wrapping the MERGE patterns in a
528        // MatchCreateStatement with an empty (no-op) CREATE body.
529        let proxy = MatchCreateStatement {
530            match_patterns: mm.match_patterns.clone(),
531            match_props: vec![],
532            create: CreateStatement {
533                nodes: vec![],
534                edges: vec![],
535            },
536        };
537        self.scan_match_create_rows(&proxy)
538    }
539
540    // ── UNWIND ─────────────────────────────────────────────────────────────────
541
542    pub(crate) fn execute_unwind(&self, u: &UnwindStatement) -> Result<QueryResult> {
543        use crate::operators::{Operator, UnwindOperator};
544
545        // Evaluate the list expression to a Vec<Value>.
546        let values = eval_list_expr(&u.expr, &self.params)?;
547
548        // Determine the output column name from the RETURN clause.
549        let column_names = extract_return_column_names(&u.return_clause.items);
550
551        if values.is_empty() {
552            return Ok(QueryResult::empty(column_names));
553        }
554
555        let mut op = UnwindOperator::new(u.alias.clone(), values);
556        let chunks = op.collect_all()?;
557
558        // Materialize: for each chunk/group/row, project the RETURN columns.
559        //
560        // Only fall back to the UNWIND alias value when the output column
561        // actually corresponds to the alias variable.  Returning a value for
562        // an unrelated variable (e.g. `RETURN y` when alias is `x`) would
563        // silently produce wrong results instead of NULL.
564        let mut rows: Vec<Vec<Value>> = Vec::new();
565        for chunk in &chunks {
566            for group in &chunk.groups {
567                let n = group.len();
568                for row_idx in 0..n {
569                    let row = u
570                        .return_clause
571                        .items
572                        .iter()
573                        .map(|item| {
574                            // Determine whether this RETURN item refers to the
575                            // alias variable produced by UNWIND.
576                            let is_alias = match &item.expr {
577                                Expr::Var(name) => name == &u.alias,
578                                _ => false,
579                            };
580                            if is_alias {
581                                group.get_value(&u.alias, row_idx).unwrap_or(Value::Null)
582                            } else {
583                                // Variable is not in scope for this UNWIND —
584                                // return NULL rather than leaking the alias value.
585                                Value::Null
586                            }
587                        })
588                        .collect();
589                    rows.push(row);
590                }
591            }
592        }
593
594        Ok(QueryResult {
595            columns: column_names,
596            rows,
597        })
598    }
599
600    // ── CREATE node execution ─────────────────────────────────────────────────
601
602    /// Execute a `CREATE` statement, auto-registering labels as needed (SPA-156).
603    ///
604    /// For each node in the CREATE clause:
605    /// 1. Look up (or create) its primary label in the catalog.
606    /// 2. Convert inline properties to `(col_id, StoreValue)` pairs using the
607    ///    same FNV-1a hash used by `WriteTx::merge_node`.
608    /// 3. Write the node to the node store.
609    pub(crate) fn execute_create(&mut self, create: &CreateStatement) -> Result<QueryResult> {
610        for node in &create.nodes {
611            // Resolve the primary label, creating it if absent.
612            let label = node.labels.first().cloned().unwrap_or_default();
613
614            // SPA-208: reject reserved __SO_ label prefix.
615            if is_reserved_label(&label) {
616                return Err(sparrowdb_common::Error::InvalidArgument(format!(
617                    "invalid argument: label \"{label}\" is reserved — the __SO_ prefix is for internal use only"
618                )));
619            }
620
621            let label_id: u32 = match self.snapshot.catalog.get_label(&label)? {
622                Some(id) => id as u32,
623                None => self.snapshot.catalog.create_label(&label)? as u32,
624            };
625
626            // Convert AST props to (col_id, StoreValue) pairs.
627            // Property values are full expressions (e.g. `datetime()`),
628            // evaluated with an empty binding map.
629            let empty_bindings: HashMap<String, Value> = HashMap::new();
630            let props: Vec<(u32, StoreValue)> = node
631                .props
632                .iter()
633                .map(|entry| {
634                    let col_id = prop_name_to_col_id(&entry.key);
635                    let val = eval_expr(&entry.value, &empty_bindings);
636                    let store_val = value_to_store_value(val);
637                    (col_id, store_val)
638                })
639                .collect();
640
641            // SPA-234: enforce UNIQUE constraints declared via
642            // `CREATE CONSTRAINT ON (n:Label) ASSERT n.property IS UNIQUE`.
643            // For each constrained (label_id, col_id) pair, check whether the
644            // incoming value already exists in the property index.  If so,
645            // return a constraint-violation error before writing the node.
646            //
647            // Only inline-encodable types (Int64 and short Bytes ≤ 7 bytes)
648            // are checked via the prop_index fast path.  Float values and
649            // long strings require heap storage and cannot be encoded with
650            // to_u64(); for those types we return an explicit error rather
651            // than panicking (StoreValue::Float::to_u64 is documented to
652            // panic for heap-backed values).
653            for (col_id, store_val) in &props {
654                if self.unique_constraints.contains(&(label_id, *col_id)) {
655                    let raw = match store_val {
656                        StoreValue::Int64(_) => store_val.to_u64(),
657                        StoreValue::Bytes(b) if b.len() <= 7 => store_val.to_u64(),
658                        StoreValue::Bytes(_) => {
659                            return Err(sparrowdb_common::Error::InvalidArgument(
660                                "UNIQUE constraints on string values longer than 7 bytes are not yet supported".into(),
661                            ));
662                        }
663                        StoreValue::Float(_) => {
664                            return Err(sparrowdb_common::Error::InvalidArgument(
665                                "UNIQUE constraints on float values are not yet supported".into(),
666                            ));
667                        }
668                    };
669                    if !self
670                        .prop_index
671                        .borrow()
672                        .lookup(label_id, *col_id, raw)
673                        .is_empty()
674                    {
675                        return Err(sparrowdb_common::Error::InvalidArgument(format!(
676                            "unique constraint violation: label \"{label}\" already has a node with the same value for this property"
677                        )));
678                    }
679                }
680            }
681
682            let node_id = self.snapshot.store.create_node(label_id, &props)?;
683            // SPA-234: after writing, insert new values into the prop_index so
684            // that subsequent creates in the same session also respect the
685            // UNIQUE constraint (the index may be stale if built before this
686            // node was written).
687            {
688                let slot =
689                    sparrowdb_storage::property_index::PropertyIndex::node_id_to_slot(node_id);
690                let mut idx = self.prop_index.borrow_mut();
691                for (col_id, store_val) in &props {
692                    if self.unique_constraints.contains(&(label_id, *col_id)) {
693                        // Only insert inline-encodable values; Float/long Bytes
694                        // were already rejected above before create_node was called.
695                        let raw = match store_val {
696                            StoreValue::Int64(_) => store_val.to_u64(),
697                            StoreValue::Bytes(b) if b.len() <= 7 => store_val.to_u64(),
698                            _ => continue,
699                        };
700                        idx.insert(label_id, *col_id, slot, raw);
701                    }
702                }
703            }
704            // Update cached row count for the planner (SPA-new).
705            *self
706                .snapshot
707                .label_row_counts
708                .entry(label_id as LabelId)
709                .or_insert(0) += 1;
710        }
711        Ok(QueryResult::empty(vec![]))
712    }
713
714    pub(crate) fn execute_create_index(
715        &mut self,
716        label: &str,
717        property: &str,
718    ) -> Result<QueryResult> {
719        let label_id: u32 = match self.snapshot.catalog.get_label(label)? {
720            Some(id) => id as u32,
721            None => return Ok(QueryResult::empty(vec![])),
722        };
723        let col_id = col_id_of(property);
724        self.prop_index
725            .borrow_mut()
726            .build_for(&self.snapshot.store, label_id, col_id)?;
727        Ok(QueryResult::empty(vec![]))
728    }
729
730    /// Execute `CREATE CONSTRAINT ON (n:Label) ASSERT n.property IS UNIQUE` (SPA-234).
731    ///
732    /// Records `(label_id, col_id)` in `self.unique_constraints` so that
733    /// subsequent `execute_create` calls reject duplicate values.  Also builds
734    /// the backing prop-index for that pair (needed to check existence cheaply).
735    /// If the label does not yet exist in the catalog it is auto-created so that
736    /// later `CREATE` statements can register against the constraint.
737    pub(crate) fn execute_create_constraint(
738        &mut self,
739        label: &str,
740        property: &str,
741    ) -> Result<QueryResult> {
742        let label_id: u32 = match self.snapshot.catalog.get_label(label)? {
743            Some(id) => id as u32,
744            None => self.snapshot.catalog.create_label(label)? as u32,
745        };
746        let col_id = col_id_of(property);
747
748        // Build the property index for this (label_id, col_id) pair so that
749        // uniqueness checks in execute_create can use O(log n) lookups.
750        self.prop_index
751            .borrow_mut()
752            .build_for(&self.snapshot.store, label_id, col_id)?;
753
754        // Register the constraint.
755        self.unique_constraints.insert((label_id, col_id));
756
757        Ok(QueryResult::empty(vec![]))
758    }
759}