sparrowdb_execution/engine/mutation.rs
1//! Auto-generated submodule — see engine/mod.rs for context.
2use super::*;
3
4impl Engine {
5 // ── Mutation execution (called by GraphDb with a write transaction) ────────
6
7 /// Scan nodes matching the MATCH patterns in a `MatchMutate` statement and
8 /// return the list of matching `NodeId`s. The caller is responsible for
9 /// applying the actual mutations inside a write transaction.
10 pub fn scan_match_mutate(&self, mm: &MatchMutateStatement) -> Result<Vec<NodeId>> {
11 if mm.match_patterns.is_empty() {
12 return Ok(vec![]);
13 }
14
15 // Guard: only single-node patterns (no multi-pattern, no relationship hops)
16 // are supported. Silently ignoring extra patterns would mutate the wrong
17 // nodes; instead we surface a clear error.
18 if mm.match_patterns.len() != 1 || !mm.match_patterns[0].rels.is_empty() {
19 return Err(sparrowdb_common::Error::InvalidArgument(
20 "MATCH...SET/DELETE currently supports only single-node patterns (no relationships)"
21 .into(),
22 ));
23 }
24
25 let pat = &mm.match_patterns[0];
26 if pat.nodes.is_empty() {
27 return Ok(vec![]);
28 }
29 let node_pat = &pat.nodes[0];
30 let label = node_pat.labels.first().cloned().unwrap_or_default();
31
32 let label_id = match self.snapshot.catalog.get_label(&label)? {
33 Some(id) => id as u32,
34 // SPA-266: unknown label → no nodes can match; return empty result.
35 None => return Ok(vec![]),
36 };
37
38 // Col_ids referenced by the WHERE clause (needed for WHERE evaluation
39 // even after the index narrows candidates by inline prop filter).
40 let mut where_col_ids: Vec<u32> = node_pat
41 .props
42 .iter()
43 .map(|pe| prop_name_to_col_id(&pe.key))
44 .collect();
45 if let Some(ref where_expr) = mm.where_clause {
46 collect_col_ids_from_expr(where_expr, &mut where_col_ids);
47 }
48
49 let var_name = node_pat.var.as_str();
50
51 // Use the property index for O(1) equality lookups on inline prop
52 // filters, falling back to full scan for overflow strings / params.
53 let candidates = self.scan_nodes_for_label_with_index(label_id, &node_pat.props)?;
54
55 let mut matching_ids = Vec::new();
56 for node_id in candidates {
57 // Re-read props needed for WHERE clause evaluation.
58 if mm.where_clause.is_some() {
59 let props = read_node_props(&self.snapshot.store, node_id, &where_col_ids)?;
60 if let Some(ref where_expr) = mm.where_clause {
61 let mut row_vals =
62 build_row_vals(&props, var_name, &where_col_ids, &self.snapshot.store);
63 row_vals.extend(self.dollar_params());
64 if !self.eval_where_graph(where_expr, &row_vals) {
65 continue;
66 }
67 }
68 }
69 matching_ids.push(node_id);
70 }
71
72 Ok(matching_ids)
73 }
74
75 /// Return the mutation carried by a `MatchMutate` statement, exposing it
76 /// to the caller (GraphDb) so it can apply it inside a write transaction.
77 pub fn mutation_from_match_mutate(mm: &MatchMutateStatement) -> &Mutation {
78 &mm.mutation
79 }
80
81 // ── Node-scan helpers (shared by scan_match_create and scan_match_create_rows) ──
82
83 /// Returns `true` if the given node has been tombstoned (col 0 == u64::MAX).
84 ///
85 /// `NotFound` is expected for new/sparse nodes where col_0 has not been
86 /// written yet and is treated as "not tombstoned". All other errors are
87 /// logged as warnings and also treated as "not tombstoned" so that
88 /// transient storage issues do not suppress valid nodes during a scan.
89 pub(crate) fn is_node_tombstoned(&self, node_id: NodeId) -> bool {
90 match self.snapshot.store.get_node_raw(node_id, &[0u32]) {
91 Ok(col0) => col0.iter().any(|&(c, v)| c == 0 && v == u64::MAX),
92 Err(sparrowdb_common::Error::NotFound) => false,
93 Err(e) => {
94 tracing::warn!(
95 node_id = node_id.0,
96 error = ?e,
97 "tombstone check failed; treating node as not tombstoned"
98 );
99 false
100 }
101 }
102 }
103
104 /// Returns `true` if `node_id` satisfies every inline prop predicate in
105 /// `filter_col_ids` / `props`.
106 ///
107 /// `filter_col_ids` must be pre-computed from `props` with
108 /// `prop_name_to_col_id`. Pass an empty slice when there are no filters
109 /// (the method returns `true` immediately).
110 pub(crate) fn node_matches_prop_filter(
111 &self,
112 node_id: NodeId,
113 filter_col_ids: &[u32],
114 props: &[sparrowdb_cypher::ast::PropEntry],
115 ) -> bool {
116 if props.is_empty() {
117 return true;
118 }
119 match self.snapshot.store.get_node_raw(node_id, filter_col_ids) {
120 Ok(raw_props) => matches_prop_filter_static(
121 &raw_props,
122 props,
123 &self.dollar_params(),
124 &self.snapshot.store,
125 ),
126 Err(_) => false,
127 }
128 }
129
130 // ── Scan for MATCH…CREATE (called by GraphDb with a write transaction) ──────
131
132 /// Return all live `NodeId`s for `label_id` whose inline prop predicates
133 /// match, using the `PropertyIndex` for O(1) equality lookups when possible.
134 ///
135 /// ## Index path (O(log n) per unique value)
136 ///
137 /// When there is exactly one inline prop filter and the literal is directly
138 /// encodable (integers and strings ≤ 7 bytes), the method:
139 /// 1. Calls `build_for` lazily — reads the column file once and caches it.
140 /// 2. Does a single `BTreeMap::get` to obtain the matching slot list.
141 /// 3. Verifies tombstones on the (usually tiny) candidate set.
142 ///
143 /// ## Fallback (O(n) full scan)
144 ///
145 /// When the filter cannot use the index (overflow string, multiple props,
146 /// parameter expressions, or `build_for` I/O error) the method falls back
147 /// to iterating all `0..hwm` slots — the same behaviour as before this fix.
148 ///
149 /// ## Integration
150 ///
151 /// This replaces the inline `for slot in 0..hwm` blocks in
152 /// `scan_match_create`, `scan_match_create_rows`, and `scan_match_mutate`
153 /// so that the index is used consistently across all write-side MATCH paths.
154 pub(crate) fn scan_nodes_for_label_with_index(
155 &self,
156 label_id: u32,
157 node_props: &[sparrowdb_cypher::ast::PropEntry],
158 ) -> Result<Vec<NodeId>> {
159 let hwm = self.snapshot.store.hwm_for_label(label_id)?;
160
161 // Collect filter col_ids up-front (needed for the fallback path too).
162 let filter_col_ids: Vec<u32> = node_props
163 .iter()
164 .map(|p| prop_name_to_col_id(&p.key))
165 .collect();
166
167 // ── Lazy index build ────────────────────────────────────────────────
168 // Ensure the property index is loaded for every column referenced by
169 // inline prop filters. `build_for` is idempotent (cache-hit no-op
170 // after the first call) and suppresses I/O errors internally.
171 for &col_id in &filter_col_ids {
172 let _ = self
173 .prop_index
174 .borrow_mut()
175 .build_for(&self.snapshot.store, label_id, col_id);
176 }
177
178 // ── Index lookup (single-equality filter, literal value) ────────────
179 let index_slots: Option<Vec<u32>> = {
180 let prop_index_ref = self.prop_index.borrow();
181 try_index_lookup_for_props(node_props, label_id, &prop_index_ref)
182 };
183
184 if let Some(candidate_slots) = index_slots {
185 // O(k) verification over a small candidate set (typically 1 slot).
186 let mut result = Vec::with_capacity(candidate_slots.len());
187 for slot in candidate_slots {
188 let node_id = NodeId(((label_id as u64) << 32) | slot as u64);
189 if self.is_node_tombstoned(node_id) {
190 continue;
191 }
192 // For multi-prop filters the index only narrowed on one column;
193 // verify the remaining filters here.
194 if !self.node_matches_prop_filter(node_id, &filter_col_ids, node_props) {
195 continue;
196 }
197 result.push(node_id);
198 }
199 return Ok(result);
200 }
201
202 // ── Fallback: full O(N) scan ────────────────────────────────────────
203 let mut result = Vec::new();
204 for slot in 0..hwm {
205 let node_id = NodeId(((label_id as u64) << 32) | slot);
206 if self.is_node_tombstoned(node_id) {
207 continue;
208 }
209 if !self.node_matches_prop_filter(node_id, &filter_col_ids, node_props) {
210 continue;
211 }
212 result.push(node_id);
213 }
214 Ok(result)
215 }
216
217 /// Scan nodes matching the MATCH patterns in a `MatchCreateStatement` and
218 /// return a map of variable name → Vec<NodeId> for each named node pattern.
219 ///
220 /// The caller (GraphDb) uses this to resolve variable bindings before
221 /// calling `WriteTx::create_edge` for each edge in the CREATE clause.
222 pub fn scan_match_create(
223 &self,
224 mc: &MatchCreateStatement,
225 ) -> Result<HashMap<String, Vec<NodeId>>> {
226 let mut var_candidates: HashMap<String, Vec<NodeId>> = HashMap::new();
227
228 for pat in &mc.match_patterns {
229 for node_pat in &pat.nodes {
230 if node_pat.var.is_empty() {
231 continue;
232 }
233 // Skip if already resolved (same var can appear in multiple patterns).
234 if var_candidates.contains_key(&node_pat.var) {
235 continue;
236 }
237
238 let label = node_pat.labels.first().cloned().unwrap_or_default();
239 let label_id: u32 = match self.snapshot.catalog.get_label(&label)? {
240 Some(id) => id as u32,
241 None => {
242 // Label not found → no matching nodes for this variable.
243 var_candidates.insert(node_pat.var.clone(), vec![]);
244 continue;
245 }
246 };
247
248 // Use the property index for O(1) equality lookups when possible,
249 // falling back to a full O(N) scan for overflow strings / params.
250 let matching_ids =
251 self.scan_nodes_for_label_with_index(label_id, &node_pat.props)?;
252
253 var_candidates.insert(node_pat.var.clone(), matching_ids);
254 }
255 }
256
257 Ok(var_candidates)
258 }
259
260 /// Execute the MATCH portion of a `MatchCreateStatement` and return one
261 /// binding map per matched row.
262 ///
263 /// Each element of the returned `Vec` is a `HashMap<variable_name, NodeId>`
264 /// that represents one fully-correlated result row from the MATCH clause.
265 /// The caller uses these to drive `WriteTx::create_edge` — one call per row.
266 ///
267 /// # Algorithm
268 ///
269 /// For each `PathPattern` in `match_patterns`:
270 /// - **No relationships** (node-only pattern): scan the node store applying
271 /// inline prop filters; collect one candidate set per named variable.
272 /// Cross-join these sets with the rows accumulated so far.
273 /// - **One relationship hop** (`(a)-[:R]->(b)`): traverse the CSR + delta
274 /// log to enumerate actual (src, dst) pairs that are connected by an edge,
275 /// then filter each node against its inline prop predicates. Only
276 /// correlated pairs are yielded — this is the key difference from the old
277 /// `scan_match_create` which treated every node as an independent
278 /// candidate and then took a full Cartesian product.
279 ///
280 /// Patterns beyond a single hop are not yet supported and return an error.
281 pub fn scan_match_create_rows(
282 &self,
283 mc: &MatchCreateStatement,
284 ) -> Result<Vec<HashMap<String, NodeId>>> {
285 // Start with a single empty row (identity for cross-join).
286 let mut accumulated: Vec<HashMap<String, NodeId>> = vec![HashMap::new()];
287
288 for pat in &mc.match_patterns {
289 if pat.rels.is_empty() {
290 // ── Node-only pattern: collect candidates per variable, then
291 // cross-join into accumulated rows. ──────────────────────
292 //
293 // Collect each named node variable's candidate list.
294 let mut per_var: Vec<(String, Vec<NodeId>)> = Vec::new();
295
296 for node_pat in &pat.nodes {
297 if node_pat.var.is_empty() {
298 continue;
299 }
300
301 // SPA-211: when no label is specified, scan all registered
302 // labels so that unlabeled MATCH patterns find nodes of
303 // any type (instead of silently returning empty).
304 let scan_label_ids: Vec<u32> = if node_pat.labels.is_empty() {
305 self.snapshot
306 .catalog
307 .list_labels()?
308 .into_iter()
309 .map(|(id, _)| id as u32)
310 .collect()
311 } else {
312 let label = node_pat.labels.first().cloned().unwrap_or_default();
313 match self.snapshot.catalog.get_label(&label)? {
314 Some(id) => vec![id as u32],
315 None => {
316 // No nodes can match → entire MATCH yields nothing.
317 return Ok(vec![]);
318 }
319 }
320 };
321
322 // Use the property index for O(1) equality lookups when possible,
323 // falling back to a full O(N) scan for overflow strings / params.
324 let mut matching_ids: Vec<NodeId> = Vec::new();
325 for label_id in scan_label_ids {
326 let ids =
327 self.scan_nodes_for_label_with_index(label_id, &node_pat.props)?;
328 matching_ids.extend(ids);
329 }
330
331 if matching_ids.is_empty() {
332 // No matching nodes → entire MATCH is empty.
333 return Ok(vec![]);
334 }
335
336 per_var.push((node_pat.var.clone(), matching_ids));
337 }
338
339 // Cross-join the per_var candidates into accumulated.
340 // `candidates` is guaranteed non-empty (checked above), so the result
341 // will be non-empty as long as `accumulated` is non-empty.
342 for (var, candidates) in per_var {
343 let mut next: Vec<HashMap<String, NodeId>> = Vec::new();
344 for row in &accumulated {
345 for &node_id in &candidates {
346 let mut new_row = row.clone();
347 new_row.insert(var.clone(), node_id);
348 next.push(new_row);
349 }
350 }
351 accumulated = next;
352 }
353 } else if pat.rels.len() == 1 && pat.nodes.len() == 2 {
354 // ── Single-hop relationship pattern: traverse CSR + delta edges
355 // to produce correlated (src, dst) pairs. ─────────────────
356 let src_node_pat = &pat.nodes[0];
357 let dst_node_pat = &pat.nodes[1];
358 let rel_pat = &pat.rels[0];
359
360 // Only outgoing direction is supported for MATCH…CREATE traversal.
361 if rel_pat.dir != sparrowdb_cypher::ast::EdgeDir::Outgoing {
362 return Err(sparrowdb_common::Error::Unimplemented);
363 }
364
365 let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
366 let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
367
368 let src_label_id: u32 = match self.snapshot.catalog.get_label(&src_label)? {
369 Some(id) => id as u32,
370 None => return Ok(vec![]),
371 };
372 let dst_label_id: u32 = match self.snapshot.catalog.get_label(&dst_label)? {
373 Some(id) => id as u32,
374 None => return Ok(vec![]),
375 };
376
377 let src_filter_cols: Vec<u32> = src_node_pat
378 .props
379 .iter()
380 .map(|p| prop_name_to_col_id(&p.key))
381 .collect();
382 let dst_filter_cols: Vec<u32> = dst_node_pat
383 .props
384 .iter()
385 .map(|p| prop_name_to_col_id(&p.key))
386 .collect();
387
388 // SPA-185: resolve per-type rel table for delta and CSR reads.
389 let rel_lookup =
390 self.resolve_rel_table_id(src_label_id, dst_label_id, &rel_pat.rel_type);
391 if matches!(rel_lookup, RelTableLookup::NotFound) {
392 return Ok(vec![]);
393 }
394
395 // Build a src_slot → Vec<dst_slot> adjacency map from the delta log once,
396 // filtering by src_label to avoid O(N*M) scanning inside the outer loop.
397 let delta_adj: HashMap<u64, Vec<u64>> = {
398 let records: Vec<DeltaRecord> = match rel_lookup {
399 RelTableLookup::Found(rtid) => self.read_delta_for(rtid),
400 _ => self.read_delta_all(),
401 };
402 let mut adj: HashMap<u64, Vec<u64>> = HashMap::new();
403 for r in records {
404 let s = r.src.0;
405 let s_label = (s >> 32) as u32;
406 if s_label == src_label_id {
407 let s_slot = s & 0xFFFF_FFFF;
408 adj.entry(s_slot).or_default().push(r.dst.0 & 0xFFFF_FFFF);
409 }
410 }
411 adj
412 };
413
414 let hwm_src = self.snapshot.store.hwm_for_label(src_label_id)?;
415
416 // Pairs yielded by this pattern for cross-join below.
417 let mut pattern_rows: Vec<HashMap<String, NodeId>> = Vec::new();
418
419 for src_slot in 0..hwm_src {
420 // SPA-254: check per-query deadline at every slot boundary.
421 self.check_deadline()?;
422
423 let src_node = NodeId(((src_label_id as u64) << 32) | src_slot);
424
425 if self.is_node_tombstoned(src_node) {
426 continue;
427 }
428 if !self.node_matches_prop_filter(
429 src_node,
430 &src_filter_cols,
431 &src_node_pat.props,
432 ) {
433 continue;
434 }
435
436 // Collect outgoing neighbours (CSR + delta adjacency map).
437 let csr_neighbors_vec: Vec<u64> = match rel_lookup {
438 RelTableLookup::Found(rtid) => self.csr_neighbors(rtid, src_slot),
439 _ => self.csr_neighbors_all(src_slot),
440 };
441 let empty: Vec<u64> = Vec::new();
442 let delta_neighbors: &[u64] =
443 delta_adj.get(&src_slot).map_or(&empty, |v| v.as_slice());
444
445 let mut seen: HashSet<u64> = HashSet::new();
446 for &dst_slot in csr_neighbors_vec.iter().chain(delta_neighbors.iter()) {
447 if !seen.insert(dst_slot) {
448 continue;
449 }
450 let dst_node = NodeId(((dst_label_id as u64) << 32) | dst_slot);
451
452 if self.is_node_tombstoned(dst_node) {
453 continue;
454 }
455 if !self.node_matches_prop_filter(
456 dst_node,
457 &dst_filter_cols,
458 &dst_node_pat.props,
459 ) {
460 continue;
461 }
462
463 let mut row: HashMap<String, NodeId> = HashMap::new();
464
465 // When src and dst use the same variable (self-loop pattern),
466 // the edge must actually be a self-loop (src == dst).
467 if !src_node_pat.var.is_empty()
468 && !dst_node_pat.var.is_empty()
469 && src_node_pat.var == dst_node_pat.var
470 {
471 if src_node != dst_node {
472 continue;
473 }
474 row.insert(src_node_pat.var.clone(), src_node);
475 } else {
476 if !src_node_pat.var.is_empty() {
477 row.insert(src_node_pat.var.clone(), src_node);
478 }
479 if !dst_node_pat.var.is_empty() {
480 row.insert(dst_node_pat.var.clone(), dst_node);
481 }
482 }
483 pattern_rows.push(row);
484 }
485 }
486
487 if pattern_rows.is_empty() {
488 return Ok(vec![]);
489 }
490
491 // Cross-join pattern_rows into accumulated, enforcing shared-variable
492 // constraints: if a variable appears in both acc_row and pat_row, only
493 // keep combinations where they agree on the same NodeId.
494 let mut next: Vec<HashMap<String, NodeId>> = Vec::new();
495 for acc_row in &accumulated {
496 'outer: for pat_row in &pattern_rows {
497 // Reject combinations where shared variables disagree.
498 for (k, v) in pat_row {
499 if let Some(existing) = acc_row.get(k) {
500 if existing != v {
501 continue 'outer;
502 }
503 }
504 }
505 let mut new_row = acc_row.clone();
506 new_row.extend(pat_row.iter().map(|(k, v)| (k.clone(), *v)));
507 next.push(new_row);
508 }
509 }
510 accumulated = next;
511 } else {
512 // Multi-hop patterns not yet supported for MATCH…CREATE.
513 return Err(sparrowdb_common::Error::Unimplemented);
514 }
515 }
516
517 Ok(accumulated)
518 }
519
520 /// Scan the MATCH patterns of a `MatchMergeRelStatement` and return
521 /// correlated `(variable → NodeId)` binding rows — identical semantics to
522 /// `scan_match_create_rows` but taking the MERGE form's match patterns (SPA-233).
523 pub fn scan_match_merge_rel_rows(
524 &self,
525 mm: &MatchMergeRelStatement,
526 ) -> Result<Vec<HashMap<String, NodeId>>> {
527 // Reuse scan_match_create_rows by wrapping the MERGE patterns in a
528 // MatchCreateStatement with an empty (no-op) CREATE body.
529 let proxy = MatchCreateStatement {
530 match_patterns: mm.match_patterns.clone(),
531 match_props: vec![],
532 create: CreateStatement {
533 nodes: vec![],
534 edges: vec![],
535 },
536 };
537 self.scan_match_create_rows(&proxy)
538 }
539
540 // ── UNWIND ─────────────────────────────────────────────────────────────────
541
542 pub(crate) fn execute_unwind(&self, u: &UnwindStatement) -> Result<QueryResult> {
543 use crate::operators::{Operator, UnwindOperator};
544
545 // Evaluate the list expression to a Vec<Value>.
546 let values = eval_list_expr(&u.expr, &self.params)?;
547
548 // Determine the output column name from the RETURN clause.
549 let column_names = extract_return_column_names(&u.return_clause.items);
550
551 if values.is_empty() {
552 return Ok(QueryResult::empty(column_names));
553 }
554
555 let mut op = UnwindOperator::new(u.alias.clone(), values);
556 let chunks = op.collect_all()?;
557
558 // Materialize: for each chunk/group/row, project the RETURN columns.
559 //
560 // Only fall back to the UNWIND alias value when the output column
561 // actually corresponds to the alias variable. Returning a value for
562 // an unrelated variable (e.g. `RETURN y` when alias is `x`) would
563 // silently produce wrong results instead of NULL.
564 let mut rows: Vec<Vec<Value>> = Vec::new();
565 for chunk in &chunks {
566 for group in &chunk.groups {
567 let n = group.len();
568 for row_idx in 0..n {
569 let row = u
570 .return_clause
571 .items
572 .iter()
573 .map(|item| {
574 // Determine whether this RETURN item refers to the
575 // alias variable produced by UNWIND.
576 let is_alias = match &item.expr {
577 Expr::Var(name) => name == &u.alias,
578 _ => false,
579 };
580 if is_alias {
581 group.get_value(&u.alias, row_idx).unwrap_or(Value::Null)
582 } else {
583 // Variable is not in scope for this UNWIND —
584 // return NULL rather than leaking the alias value.
585 Value::Null
586 }
587 })
588 .collect();
589 rows.push(row);
590 }
591 }
592 }
593
594 Ok(QueryResult {
595 columns: column_names,
596 rows,
597 })
598 }
599
600 // ── CREATE node execution ─────────────────────────────────────────────────
601
602 /// Execute a `CREATE` statement, auto-registering labels as needed (SPA-156).
603 ///
604 /// For each node in the CREATE clause:
605 /// 1. Look up (or create) its primary label in the catalog.
606 /// 2. Convert inline properties to `(col_id, StoreValue)` pairs using the
607 /// same FNV-1a hash used by `WriteTx::merge_node`.
608 /// 3. Write the node to the node store.
609 pub(crate) fn execute_create(&mut self, create: &CreateStatement) -> Result<QueryResult> {
610 for node in &create.nodes {
611 // Resolve the primary label, creating it if absent.
612 let label = node.labels.first().cloned().unwrap_or_default();
613
614 // SPA-208: reject reserved __SO_ label prefix.
615 if is_reserved_label(&label) {
616 return Err(sparrowdb_common::Error::InvalidArgument(format!(
617 "invalid argument: label \"{label}\" is reserved — the __SO_ prefix is for internal use only"
618 )));
619 }
620
621 let label_id: u32 = match self.snapshot.catalog.get_label(&label)? {
622 Some(id) => id as u32,
623 None => self.snapshot.catalog.create_label(&label)? as u32,
624 };
625
626 // Convert AST props to (col_id, StoreValue) pairs.
627 // Property values are full expressions (e.g. `datetime()`),
628 // evaluated with an empty binding map.
629 let empty_bindings: HashMap<String, Value> = HashMap::new();
630 let props: Vec<(u32, StoreValue)> = node
631 .props
632 .iter()
633 .map(|entry| {
634 let col_id = prop_name_to_col_id(&entry.key);
635 let val = eval_expr(&entry.value, &empty_bindings);
636 let store_val = value_to_store_value(val);
637 (col_id, store_val)
638 })
639 .collect();
640
641 // SPA-234: enforce UNIQUE constraints declared via
642 // `CREATE CONSTRAINT ON (n:Label) ASSERT n.property IS UNIQUE`.
643 // For each constrained (label_id, col_id) pair, check whether the
644 // incoming value already exists in the property index. If so,
645 // return a constraint-violation error before writing the node.
646 //
647 // Only inline-encodable types (Int64 and short Bytes ≤ 7 bytes)
648 // are checked via the prop_index fast path. Float values and
649 // long strings require heap storage and cannot be encoded with
650 // to_u64(); for those types we return an explicit error rather
651 // than panicking (StoreValue::Float::to_u64 is documented to
652 // panic for heap-backed values).
653 for (col_id, store_val) in &props {
654 if self.unique_constraints.contains(&(label_id, *col_id)) {
655 let raw = match store_val {
656 StoreValue::Int64(_) => store_val.to_u64(),
657 StoreValue::Bytes(b) if b.len() <= 7 => store_val.to_u64(),
658 StoreValue::Bytes(_) => {
659 return Err(sparrowdb_common::Error::InvalidArgument(
660 "UNIQUE constraints on string values longer than 7 bytes are not yet supported".into(),
661 ));
662 }
663 StoreValue::Float(_) => {
664 return Err(sparrowdb_common::Error::InvalidArgument(
665 "UNIQUE constraints on float values are not yet supported".into(),
666 ));
667 }
668 };
669 if !self
670 .prop_index
671 .borrow()
672 .lookup(label_id, *col_id, raw)
673 .is_empty()
674 {
675 return Err(sparrowdb_common::Error::InvalidArgument(format!(
676 "unique constraint violation: label \"{label}\" already has a node with the same value for this property"
677 )));
678 }
679 }
680 }
681
682 let node_id = self.snapshot.store.create_node(label_id, &props)?;
683 // SPA-234: after writing, insert new values into the prop_index so
684 // that subsequent creates in the same session also respect the
685 // UNIQUE constraint (the index may be stale if built before this
686 // node was written).
687 {
688 let slot =
689 sparrowdb_storage::property_index::PropertyIndex::node_id_to_slot(node_id);
690 let mut idx = self.prop_index.borrow_mut();
691 for (col_id, store_val) in &props {
692 if self.unique_constraints.contains(&(label_id, *col_id)) {
693 // Only insert inline-encodable values; Float/long Bytes
694 // were already rejected above before create_node was called.
695 let raw = match store_val {
696 StoreValue::Int64(_) => store_val.to_u64(),
697 StoreValue::Bytes(b) if b.len() <= 7 => store_val.to_u64(),
698 _ => continue,
699 };
700 idx.insert(label_id, *col_id, slot, raw);
701 }
702 }
703 }
704 // Update cached row count for the planner (SPA-new).
705 *self
706 .snapshot
707 .label_row_counts
708 .entry(label_id as LabelId)
709 .or_insert(0) += 1;
710 }
711 Ok(QueryResult::empty(vec![]))
712 }
713
714 pub(crate) fn execute_create_index(
715 &mut self,
716 label: &str,
717 property: &str,
718 ) -> Result<QueryResult> {
719 let label_id: u32 = match self.snapshot.catalog.get_label(label)? {
720 Some(id) => id as u32,
721 None => return Ok(QueryResult::empty(vec![])),
722 };
723 let col_id = col_id_of(property);
724 self.prop_index
725 .borrow_mut()
726 .build_for(&self.snapshot.store, label_id, col_id)?;
727 Ok(QueryResult::empty(vec![]))
728 }
729
730 /// Execute `CREATE CONSTRAINT ON (n:Label) ASSERT n.property IS UNIQUE` (SPA-234).
731 ///
732 /// Records `(label_id, col_id)` in `self.unique_constraints` so that
733 /// subsequent `execute_create` calls reject duplicate values. Also builds
734 /// the backing prop-index for that pair (needed to check existence cheaply).
735 /// If the label does not yet exist in the catalog it is auto-created so that
736 /// later `CREATE` statements can register against the constraint.
737 pub(crate) fn execute_create_constraint(
738 &mut self,
739 label: &str,
740 property: &str,
741 ) -> Result<QueryResult> {
742 let label_id: u32 = match self.snapshot.catalog.get_label(label)? {
743 Some(id) => id as u32,
744 None => self.snapshot.catalog.create_label(label)? as u32,
745 };
746 let col_id = col_id_of(property);
747
748 // Build the property index for this (label_id, col_id) pair so that
749 // uniqueness checks in execute_create can use O(log n) lookups.
750 self.prop_index
751 .borrow_mut()
752 .build_for(&self.snapshot.store, label_id, col_id)?;
753
754 // Register the constraint.
755 self.unique_constraints.insert((label_id, col_id));
756
757 Ok(QueryResult::empty(vec![]))
758 }
759}