sparrowdb_execution/engine/mutation.rs
1//! Auto-generated submodule — see engine/mod.rs for context.
2use super::*;
3
4impl Engine {
5 // ── Mutation execution (called by GraphDb with a write transaction) ────────
6
7 /// Scan nodes matching the MATCH patterns in a `MatchMutate` statement and
8 /// return the list of matching `NodeId`s. The caller is responsible for
9 /// applying the actual mutations inside a write transaction.
10 pub fn scan_match_mutate(&self, mm: &MatchMutateStatement) -> Result<Vec<NodeId>> {
11 if mm.match_patterns.is_empty() {
12 return Ok(vec![]);
13 }
14
15 // Guard: only single-node patterns (no multi-pattern, no relationship hops)
16 // are supported. Silently ignoring extra patterns would mutate the wrong
17 // nodes; instead we surface a clear error.
18 if mm.match_patterns.len() != 1 || !mm.match_patterns[0].rels.is_empty() {
19 return Err(sparrowdb_common::Error::InvalidArgument(
20 "MATCH...SET/DELETE currently supports only single-node patterns (no relationships)"
21 .into(),
22 ));
23 }
24
25 let pat = &mm.match_patterns[0];
26 if pat.nodes.is_empty() {
27 return Ok(vec![]);
28 }
29 let node_pat = &pat.nodes[0];
30 let label = node_pat.labels.first().cloned().unwrap_or_default();
31
32 let label_id = match self.snapshot.catalog.get_label(&label)? {
33 Some(id) => id as u32,
34 // SPA-266: unknown label → no nodes can match; return empty result.
35 None => return Ok(vec![]),
36 };
37
38 // Col_ids referenced by the WHERE clause (needed for WHERE evaluation
39 // even after the index narrows candidates by inline prop filter).
40 let mut where_col_ids: Vec<u32> = node_pat
41 .props
42 .iter()
43 .map(|pe| prop_name_to_col_id(&pe.key))
44 .collect();
45 if let Some(ref where_expr) = mm.where_clause {
46 collect_col_ids_from_expr(where_expr, &mut where_col_ids);
47 }
48
49 let var_name = node_pat.var.as_str();
50
51 // Use the property index for O(1) equality lookups on inline prop
52 // filters, falling back to full scan for overflow strings / params.
53 let candidates = self.scan_nodes_for_label_with_index(label_id, &node_pat.props)?;
54
55 let mut matching_ids = Vec::new();
56 for node_id in candidates {
57 // Re-read props needed for WHERE clause evaluation.
58 if mm.where_clause.is_some() {
59 let props = read_node_props(&self.snapshot.store, node_id, &where_col_ids)?;
60 if let Some(ref where_expr) = mm.where_clause {
61 let mut row_vals =
62 build_row_vals(&props, var_name, &where_col_ids, &self.snapshot.store);
63 row_vals.extend(self.dollar_params());
64 if !self.eval_where_graph(where_expr, &row_vals) {
65 continue;
66 }
67 }
68 }
69 matching_ids.push(node_id);
70 }
71
72 Ok(matching_ids)
73 }
74
75 /// Return the mutation carried by a `MatchMutate` statement, exposing it
76 /// to the caller (GraphDb) so it can apply it inside a write transaction.
77 pub fn mutation_from_match_mutate(mm: &MatchMutateStatement) -> &Mutation {
78 &mm.mutation
79 }
80
81 /// Scan edges matching a MATCH pattern with exactly one hop and return
82 /// `(src, dst, rel_type)` tuples for edge deletion.
83 ///
84 /// Supports `MATCH (a:Label)-[r:REL]->(b:Label) DELETE r` with optional
85 /// inline property filters on source and destination node patterns.
86 ///
87 /// Includes both checkpointed (CSR) and uncheckpointed (delta) edges.
88 pub fn scan_match_mutate_edges(
89 &self,
90 mm: &MatchMutateStatement,
91 ) -> Result<Vec<(NodeId, NodeId, String)>> {
92 if mm.match_patterns.len() != 1 {
93 return Err(sparrowdb_common::Error::InvalidArgument(
94 "MATCH...DELETE edge: only single-path patterns are supported".into(),
95 ));
96 }
97 let pat = &mm.match_patterns[0];
98 if pat.rels.len() != 1 || pat.nodes.len() != 2 {
99 return Err(sparrowdb_common::Error::InvalidArgument(
100 "MATCH...DELETE edge: pattern must have exactly one relationship hop".into(),
101 ));
102 }
103
104 let src_node_pat = &pat.nodes[0];
105 let dst_node_pat = &pat.nodes[1];
106 let rel_pat = &pat.rels[0];
107
108 let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
109 let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
110
111 // Resolve optional label-id constraints.
112 let src_label_id_opt: Option<u32> = if src_label.is_empty() {
113 None
114 } else {
115 match self.snapshot.catalog.get_label(&src_label)? {
116 Some(id) => Some(id as u32),
117 None => return Ok(vec![]), // unknown label → no matches
118 }
119 };
120 let dst_label_id_opt: Option<u32> = if dst_label.is_empty() {
121 None
122 } else {
123 match self.snapshot.catalog.get_label(&dst_label)? {
124 Some(id) => Some(id as u32),
125 None => return Ok(vec![]), // unknown label → no matches
126 }
127 };
128
129 // Filter registered rel tables by rel type and src/dst label.
130 let rel_tables: Vec<(u64, u32, u32, String)> = self
131 .snapshot
132 .catalog
133 .list_rel_tables_with_ids()
134 .into_iter()
135 .filter(|(_, sid, did, rt)| {
136 let type_ok = rel_pat.rel_type.is_empty() || rt == &rel_pat.rel_type;
137 let src_ok = src_label_id_opt.map(|id| id == *sid as u32).unwrap_or(true);
138 let dst_ok = dst_label_id_opt.map(|id| id == *did as u32).unwrap_or(true);
139 type_ok && src_ok && dst_ok
140 })
141 .map(|(cid, sid, did, rt)| (cid, sid as u32, did as u32, rt))
142 .collect();
143
144 // Pre-compute col_ids for inline prop filters (avoid re-computing per slot).
145 let src_filter_col_ids: Vec<u32> = src_node_pat
146 .props
147 .iter()
148 .map(|p| prop_name_to_col_id(&p.key))
149 .collect();
150 let dst_filter_col_ids: Vec<u32> = dst_node_pat
151 .props
152 .iter()
153 .map(|p| prop_name_to_col_id(&p.key))
154 .collect();
155
156 let mut result: Vec<(NodeId, NodeId, String)> = Vec::new();
157
158 for (catalog_rel_id, effective_src_lid, effective_dst_lid, rel_type) in &rel_tables {
159 let catalog_rel_id_u32 =
160 u32::try_from(*catalog_rel_id).expect("catalog_rel_id overflowed u32");
161
162 // ── Checkpointed edges (CSR) ──────────────────────────────────────
163 let hwm_src = match self.snapshot.store.hwm_for_label(*effective_src_lid) {
164 Ok(hwm) => hwm,
165 Err(_) => continue,
166 };
167 for src_slot in 0..hwm_src {
168 let src_node = NodeId(((*effective_src_lid as u64) << 32) | src_slot);
169 if self.is_node_tombstoned(src_node) {
170 continue;
171 }
172 if !self.node_matches_prop_filter(
173 src_node,
174 &src_filter_col_ids,
175 &src_node_pat.props,
176 ) {
177 continue;
178 }
179 for dst_slot in self.csr_neighbors(catalog_rel_id_u32, src_slot) {
180 let dst_node = NodeId(((*effective_dst_lid as u64) << 32) | dst_slot);
181 if self.is_node_tombstoned(dst_node) {
182 continue;
183 }
184 if !self.node_matches_prop_filter(
185 dst_node,
186 &dst_filter_col_ids,
187 &dst_node_pat.props,
188 ) {
189 continue;
190 }
191 result.push((src_node, dst_node, rel_type.clone()));
192 }
193 }
194
195 // ── Uncheckpointed edges (delta log) ──────────────────────────────
196 for rec in self.read_delta_for(catalog_rel_id_u32) {
197 let r_src_label = (rec.src.0 >> 32) as u32;
198 let r_dst_label = (rec.dst.0 >> 32) as u32;
199 if src_label_id_opt
200 .map(|id| id != r_src_label)
201 .unwrap_or(false)
202 {
203 continue;
204 }
205 if dst_label_id_opt
206 .map(|id| id != r_dst_label)
207 .unwrap_or(false)
208 {
209 continue;
210 }
211 if self.is_node_tombstoned(rec.src) || self.is_node_tombstoned(rec.dst) {
212 continue;
213 }
214 if !self.node_matches_prop_filter(rec.src, &src_filter_col_ids, &src_node_pat.props)
215 {
216 continue;
217 }
218 if !self.node_matches_prop_filter(rec.dst, &dst_filter_col_ids, &dst_node_pat.props)
219 {
220 continue;
221 }
222 result.push((rec.src, rec.dst, rel_type.clone()));
223 }
224 }
225
226 Ok(result)
227 }
228
229 // ── Node-scan helpers (shared by scan_match_create and scan_match_create_rows) ──
230
231 /// Returns `true` if the given node has been tombstoned (col 0 == u64::MAX).
232 ///
233 /// `NotFound` is expected for new/sparse nodes where col_0 has not been
234 /// written yet and is treated as "not tombstoned". All other errors are
235 /// logged as warnings and also treated as "not tombstoned" so that
236 /// transient storage issues do not suppress valid nodes during a scan.
237 pub(crate) fn is_node_tombstoned(&self, node_id: NodeId) -> bool {
238 match self.snapshot.store.get_node_raw(node_id, &[0u32]) {
239 Ok(col0) => col0.iter().any(|&(c, v)| c == 0 && v == u64::MAX),
240 Err(sparrowdb_common::Error::NotFound) => false,
241 Err(e) => {
242 tracing::warn!(
243 node_id = node_id.0,
244 error = ?e,
245 "tombstone check failed; treating node as not tombstoned"
246 );
247 false
248 }
249 }
250 }
251
252 /// Returns `true` if `node_id` satisfies every inline prop predicate in
253 /// `filter_col_ids` / `props`.
254 ///
255 /// `filter_col_ids` must be pre-computed from `props` with
256 /// `prop_name_to_col_id`. Pass an empty slice when there are no filters
257 /// (the method returns `true` immediately).
258 pub(crate) fn node_matches_prop_filter(
259 &self,
260 node_id: NodeId,
261 filter_col_ids: &[u32],
262 props: &[sparrowdb_cypher::ast::PropEntry],
263 ) -> bool {
264 if props.is_empty() {
265 return true;
266 }
267 match self.snapshot.store.get_node_raw(node_id, filter_col_ids) {
268 Ok(raw_props) => matches_prop_filter_static(
269 &raw_props,
270 props,
271 &self.dollar_params(),
272 &self.snapshot.store,
273 ),
274 Err(_) => false,
275 }
276 }
277
278 // ── Scan for MATCH…CREATE (called by GraphDb with a write transaction) ──────
279
280 /// Return all live `NodeId`s for `label_id` whose inline prop predicates
281 /// match, using the `PropertyIndex` for O(1) equality lookups when possible.
282 ///
283 /// ## Index path (O(log n) per unique value)
284 ///
285 /// When there is exactly one inline prop filter and the literal is directly
286 /// encodable (integers and strings ≤ 7 bytes), the method:
287 /// 1. Calls `build_for` lazily — reads the column file once and caches it.
288 /// 2. Does a single `BTreeMap::get` to obtain the matching slot list.
289 /// 3. Verifies tombstones on the (usually tiny) candidate set.
290 ///
291 /// ## Fallback (O(n) full scan)
292 ///
293 /// When the filter cannot use the index (overflow string, multiple props,
294 /// parameter expressions, or `build_for` I/O error) the method falls back
295 /// to iterating all `0..hwm` slots — the same behaviour as before this fix.
296 ///
297 /// ## Integration
298 ///
299 /// This replaces the inline `for slot in 0..hwm` blocks in
300 /// `scan_match_create`, `scan_match_create_rows`, and `scan_match_mutate`
301 /// so that the index is used consistently across all write-side MATCH paths.
302 pub(crate) fn scan_nodes_for_label_with_index(
303 &self,
304 label_id: u32,
305 node_props: &[sparrowdb_cypher::ast::PropEntry],
306 ) -> Result<Vec<NodeId>> {
307 let hwm = self.snapshot.store.hwm_for_label(label_id)?;
308
309 // Collect filter col_ids up-front (needed for the fallback path too).
310 let filter_col_ids: Vec<u32> = node_props
311 .iter()
312 .map(|p| prop_name_to_col_id(&p.key))
313 .collect();
314
315 // ── Lazy index build ────────────────────────────────────────────────
316 // Ensure the property index is loaded for every column referenced by
317 // inline prop filters. `build_for` is idempotent (cache-hit no-op
318 // after the first call) and suppresses I/O errors internally.
319 for &col_id in &filter_col_ids {
320 let _ = self
321 .prop_index
322 .borrow_mut()
323 .build_for(&self.snapshot.store, label_id, col_id);
324 }
325
326 // ── Index lookup (single-equality filter, literal value) ────────────
327 let index_slots: Option<Vec<u32>> = {
328 let prop_index_ref = self.prop_index.borrow();
329 try_index_lookup_for_props(node_props, label_id, &prop_index_ref)
330 };
331
332 if let Some(candidate_slots) = index_slots {
333 // O(k) verification over a small candidate set (typically 1 slot).
334 let mut result = Vec::with_capacity(candidate_slots.len());
335 for slot in candidate_slots {
336 let node_id = NodeId(((label_id as u64) << 32) | slot as u64);
337 if self.is_node_tombstoned(node_id) {
338 continue;
339 }
340 // For multi-prop filters the index only narrowed on one column;
341 // verify the remaining filters here.
342 if !self.node_matches_prop_filter(node_id, &filter_col_ids, node_props) {
343 continue;
344 }
345 result.push(node_id);
346 }
347 return Ok(result);
348 }
349
350 // ── Fallback: full O(N) scan ────────────────────────────────────────
351 let mut result = Vec::new();
352 for slot in 0..hwm {
353 let node_id = NodeId(((label_id as u64) << 32) | slot);
354 if self.is_node_tombstoned(node_id) {
355 continue;
356 }
357 if !self.node_matches_prop_filter(node_id, &filter_col_ids, node_props) {
358 continue;
359 }
360 result.push(node_id);
361 }
362 Ok(result)
363 }
364
365 /// Scan nodes matching the MATCH patterns in a `MatchCreateStatement` and
366 /// return a map of variable name → Vec<NodeId> for each named node pattern.
367 ///
368 /// The caller (GraphDb) uses this to resolve variable bindings before
369 /// calling `WriteTx::create_edge` for each edge in the CREATE clause.
370 pub fn scan_match_create(
371 &self,
372 mc: &MatchCreateStatement,
373 ) -> Result<HashMap<String, Vec<NodeId>>> {
374 let mut var_candidates: HashMap<String, Vec<NodeId>> = HashMap::new();
375
376 for pat in &mc.match_patterns {
377 for node_pat in &pat.nodes {
378 if node_pat.var.is_empty() {
379 continue;
380 }
381 // Skip if already resolved (same var can appear in multiple patterns).
382 if var_candidates.contains_key(&node_pat.var) {
383 continue;
384 }
385
386 let label = node_pat.labels.first().cloned().unwrap_or_default();
387 let label_id: u32 = match self.snapshot.catalog.get_label(&label)? {
388 Some(id) => id as u32,
389 None => {
390 // Label not found → no matching nodes for this variable.
391 var_candidates.insert(node_pat.var.clone(), vec![]);
392 continue;
393 }
394 };
395
396 // Use the property index for O(1) equality lookups when possible,
397 // falling back to a full O(N) scan for overflow strings / params.
398 let matching_ids =
399 self.scan_nodes_for_label_with_index(label_id, &node_pat.props)?;
400
401 var_candidates.insert(node_pat.var.clone(), matching_ids);
402 }
403 }
404
405 Ok(var_candidates)
406 }
407
408 /// Execute the MATCH portion of a `MatchCreateStatement` and return one
409 /// binding map per matched row.
410 ///
411 /// Each element of the returned `Vec` is a `HashMap<variable_name, NodeId>`
412 /// that represents one fully-correlated result row from the MATCH clause.
413 /// The caller uses these to drive `WriteTx::create_edge` — one call per row.
414 ///
415 /// # Algorithm
416 ///
417 /// For each `PathPattern` in `match_patterns`:
418 /// - **No relationships** (node-only pattern): scan the node store applying
419 /// inline prop filters; collect one candidate set per named variable.
420 /// Cross-join these sets with the rows accumulated so far.
421 /// - **One relationship hop** (`(a)-[:R]->(b)`): traverse the CSR + delta
422 /// log to enumerate actual (src, dst) pairs that are connected by an edge,
423 /// then filter each node against its inline prop predicates. Only
424 /// correlated pairs are yielded — this is the key difference from the old
425 /// `scan_match_create` which treated every node as an independent
426 /// candidate and then took a full Cartesian product.
427 ///
428 /// Patterns beyond a single hop are not yet supported and return an error.
429 pub fn scan_match_create_rows(
430 &self,
431 mc: &MatchCreateStatement,
432 ) -> Result<Vec<HashMap<String, NodeId>>> {
433 // Start with a single empty row (identity for cross-join).
434 let mut accumulated: Vec<HashMap<String, NodeId>> = vec![HashMap::new()];
435
436 for pat in &mc.match_patterns {
437 if pat.rels.is_empty() {
438 // ── Node-only pattern: collect candidates per variable, then
439 // cross-join into accumulated rows. ──────────────────────
440 //
441 // Collect each named node variable's candidate list.
442 let mut per_var: Vec<(String, Vec<NodeId>)> = Vec::new();
443
444 for node_pat in &pat.nodes {
445 if node_pat.var.is_empty() {
446 continue;
447 }
448
449 // SPA-211: when no label is specified, scan all registered
450 // labels so that unlabeled MATCH patterns find nodes of
451 // any type (instead of silently returning empty).
452 let scan_label_ids: Vec<u32> = if node_pat.labels.is_empty() {
453 self.snapshot
454 .catalog
455 .list_labels()?
456 .into_iter()
457 .map(|(id, _)| id as u32)
458 .collect()
459 } else {
460 let label = node_pat.labels.first().cloned().unwrap_or_default();
461 match self.snapshot.catalog.get_label(&label)? {
462 Some(id) => vec![id as u32],
463 None => {
464 // No nodes can match → entire MATCH yields nothing.
465 return Ok(vec![]);
466 }
467 }
468 };
469
470 // Use the property index for O(1) equality lookups when possible,
471 // falling back to a full O(N) scan for overflow strings / params.
472 let mut matching_ids: Vec<NodeId> = Vec::new();
473 for label_id in scan_label_ids {
474 let ids =
475 self.scan_nodes_for_label_with_index(label_id, &node_pat.props)?;
476 matching_ids.extend(ids);
477 }
478
479 if matching_ids.is_empty() {
480 // No matching nodes → entire MATCH is empty.
481 return Ok(vec![]);
482 }
483
484 per_var.push((node_pat.var.clone(), matching_ids));
485 }
486
487 // Cross-join the per_var candidates into accumulated.
488 // `candidates` is guaranteed non-empty (checked above), so the result
489 // will be non-empty as long as `accumulated` is non-empty.
490 for (var, candidates) in per_var {
491 let mut next: Vec<HashMap<String, NodeId>> = Vec::new();
492 for row in &accumulated {
493 for &node_id in &candidates {
494 let mut new_row = row.clone();
495 new_row.insert(var.clone(), node_id);
496 next.push(new_row);
497 }
498 }
499 accumulated = next;
500 }
501 } else if pat.rels.len() == 1 && pat.nodes.len() == 2 {
502 // ── Single-hop relationship pattern: traverse CSR + delta edges
503 // to produce correlated (src, dst) pairs. ─────────────────
504 let src_node_pat = &pat.nodes[0];
505 let dst_node_pat = &pat.nodes[1];
506 let rel_pat = &pat.rels[0];
507
508 // Only outgoing direction is supported for MATCH…CREATE traversal.
509 if rel_pat.dir != sparrowdb_cypher::ast::EdgeDir::Outgoing {
510 return Err(sparrowdb_common::Error::Unimplemented);
511 }
512
513 let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
514 let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
515
516 let src_label_id: u32 = match self.snapshot.catalog.get_label(&src_label)? {
517 Some(id) => id as u32,
518 None => return Ok(vec![]),
519 };
520 let dst_label_id: u32 = match self.snapshot.catalog.get_label(&dst_label)? {
521 Some(id) => id as u32,
522 None => return Ok(vec![]),
523 };
524
525 let src_filter_cols: Vec<u32> = src_node_pat
526 .props
527 .iter()
528 .map(|p| prop_name_to_col_id(&p.key))
529 .collect();
530 let dst_filter_cols: Vec<u32> = dst_node_pat
531 .props
532 .iter()
533 .map(|p| prop_name_to_col_id(&p.key))
534 .collect();
535
536 // SPA-185: resolve per-type rel table for delta and CSR reads.
537 let rel_lookup =
538 self.resolve_rel_table_id(src_label_id, dst_label_id, &rel_pat.rel_type);
539 if matches!(rel_lookup, RelTableLookup::NotFound) {
540 return Ok(vec![]);
541 }
542
543 // Build a src_slot → Vec<dst_slot> adjacency map from the delta log once,
544 // filtering by src_label to avoid O(N*M) scanning inside the outer loop.
545 let delta_adj: HashMap<u64, Vec<u64>> = {
546 let records: Vec<DeltaRecord> = match rel_lookup {
547 RelTableLookup::Found(rtid) => self.read_delta_for(rtid),
548 _ => self.read_delta_all(),
549 };
550 let mut adj: HashMap<u64, Vec<u64>> = HashMap::new();
551 for r in records {
552 let s = r.src.0;
553 let s_label = (s >> 32) as u32;
554 if s_label == src_label_id {
555 let s_slot = s & 0xFFFF_FFFF;
556 adj.entry(s_slot).or_default().push(r.dst.0 & 0xFFFF_FFFF);
557 }
558 }
559 adj
560 };
561
562 let hwm_src = self.snapshot.store.hwm_for_label(src_label_id)?;
563
564 // Pairs yielded by this pattern for cross-join below.
565 let mut pattern_rows: Vec<HashMap<String, NodeId>> = Vec::new();
566
567 for src_slot in 0..hwm_src {
568 // SPA-254: check per-query deadline at every slot boundary.
569 self.check_deadline()?;
570
571 let src_node = NodeId(((src_label_id as u64) << 32) | src_slot);
572
573 if self.is_node_tombstoned(src_node) {
574 continue;
575 }
576 if !self.node_matches_prop_filter(
577 src_node,
578 &src_filter_cols,
579 &src_node_pat.props,
580 ) {
581 continue;
582 }
583
584 // Collect outgoing neighbours (CSR + delta adjacency map).
585 let csr_neighbors_vec: Vec<u64> = match rel_lookup {
586 RelTableLookup::Found(rtid) => self.csr_neighbors(rtid, src_slot),
587 _ => self.csr_neighbors_all(src_slot),
588 };
589 let empty: Vec<u64> = Vec::new();
590 let delta_neighbors: &[u64] =
591 delta_adj.get(&src_slot).map_or(&empty, |v| v.as_slice());
592
593 let mut seen: HashSet<u64> = HashSet::new();
594 for &dst_slot in csr_neighbors_vec.iter().chain(delta_neighbors.iter()) {
595 if !seen.insert(dst_slot) {
596 continue;
597 }
598 let dst_node = NodeId(((dst_label_id as u64) << 32) | dst_slot);
599
600 if self.is_node_tombstoned(dst_node) {
601 continue;
602 }
603 if !self.node_matches_prop_filter(
604 dst_node,
605 &dst_filter_cols,
606 &dst_node_pat.props,
607 ) {
608 continue;
609 }
610
611 let mut row: HashMap<String, NodeId> = HashMap::new();
612
613 // When src and dst use the same variable (self-loop pattern),
614 // the edge must actually be a self-loop (src == dst).
615 if !src_node_pat.var.is_empty()
616 && !dst_node_pat.var.is_empty()
617 && src_node_pat.var == dst_node_pat.var
618 {
619 if src_node != dst_node {
620 continue;
621 }
622 row.insert(src_node_pat.var.clone(), src_node);
623 } else {
624 if !src_node_pat.var.is_empty() {
625 row.insert(src_node_pat.var.clone(), src_node);
626 }
627 if !dst_node_pat.var.is_empty() {
628 row.insert(dst_node_pat.var.clone(), dst_node);
629 }
630 }
631 pattern_rows.push(row);
632 }
633 }
634
635 if pattern_rows.is_empty() {
636 return Ok(vec![]);
637 }
638
639 // Cross-join pattern_rows into accumulated, enforcing shared-variable
640 // constraints: if a variable appears in both acc_row and pat_row, only
641 // keep combinations where they agree on the same NodeId.
642 let mut next: Vec<HashMap<String, NodeId>> = Vec::new();
643 for acc_row in &accumulated {
644 'outer: for pat_row in &pattern_rows {
645 // Reject combinations where shared variables disagree.
646 for (k, v) in pat_row {
647 if let Some(existing) = acc_row.get(k) {
648 if existing != v {
649 continue 'outer;
650 }
651 }
652 }
653 let mut new_row = acc_row.clone();
654 new_row.extend(pat_row.iter().map(|(k, v)| (k.clone(), *v)));
655 next.push(new_row);
656 }
657 }
658 accumulated = next;
659 } else {
660 // Multi-hop patterns not yet supported for MATCH…CREATE.
661 return Err(sparrowdb_common::Error::Unimplemented);
662 }
663 }
664
665 Ok(accumulated)
666 }
667
668 /// Scan the MATCH patterns of a `MatchMergeRelStatement` and return
669 /// correlated `(variable → NodeId)` binding rows — identical semantics to
670 /// `scan_match_create_rows` but taking the MERGE form's match patterns (SPA-233).
671 pub fn scan_match_merge_rel_rows(
672 &self,
673 mm: &MatchMergeRelStatement,
674 ) -> Result<Vec<HashMap<String, NodeId>>> {
675 // Reuse scan_match_create_rows by wrapping the MERGE patterns in a
676 // MatchCreateStatement with an empty (no-op) CREATE body.
677 let proxy = MatchCreateStatement {
678 match_patterns: mm.match_patterns.clone(),
679 match_props: vec![],
680 create: CreateStatement {
681 nodes: vec![],
682 edges: vec![],
683 },
684 };
685 self.scan_match_create_rows(&proxy)
686 }
687
688 // ── UNWIND ─────────────────────────────────────────────────────────────────
689
690 pub(crate) fn execute_unwind(&self, u: &UnwindStatement) -> Result<QueryResult> {
691 use crate::operators::{Operator, UnwindOperator};
692
693 // Evaluate the list expression to a Vec<Value>.
694 let values = eval_list_expr(&u.expr, &self.params)?;
695
696 // Determine the output column name from the RETURN clause.
697 let column_names = extract_return_column_names(&u.return_clause.items);
698
699 if values.is_empty() {
700 return Ok(QueryResult::empty(column_names));
701 }
702
703 let mut op = UnwindOperator::new(u.alias.clone(), values);
704 let chunks = op.collect_all()?;
705
706 // Materialize: for each chunk/group/row, project the RETURN columns.
707 //
708 // Only fall back to the UNWIND alias value when the output column
709 // actually corresponds to the alias variable. Returning a value for
710 // an unrelated variable (e.g. `RETURN y` when alias is `x`) would
711 // silently produce wrong results instead of NULL.
712 let mut rows: Vec<Vec<Value>> = Vec::new();
713 for chunk in &chunks {
714 for group in &chunk.groups {
715 let n = group.len();
716 for row_idx in 0..n {
717 let row = u
718 .return_clause
719 .items
720 .iter()
721 .map(|item| {
722 // Determine whether this RETURN item refers to the
723 // alias variable produced by UNWIND.
724 let is_alias = match &item.expr {
725 Expr::Var(name) => name == &u.alias,
726 _ => false,
727 };
728 if is_alias {
729 group.get_value(&u.alias, row_idx).unwrap_or(Value::Null)
730 } else {
731 // Variable is not in scope for this UNWIND —
732 // return NULL rather than leaking the alias value.
733 Value::Null
734 }
735 })
736 .collect();
737 rows.push(row);
738 }
739 }
740 }
741
742 Ok(QueryResult {
743 columns: column_names,
744 rows,
745 })
746 }
747
748 // ── CREATE node execution ─────────────────────────────────────────────────
749
750 /// Execute a `CREATE` statement, auto-registering labels as needed (SPA-156).
751 ///
752 /// For each node in the CREATE clause:
753 /// 1. Look up (or create) its primary label in the catalog.
754 /// 2. Convert inline properties to `(col_id, StoreValue)` pairs using the
755 /// same FNV-1a hash used by `WriteTx::merge_node`.
756 /// 3. Write the node to the node store.
757 pub(crate) fn execute_create(&mut self, create: &CreateStatement) -> Result<QueryResult> {
758 for node in &create.nodes {
759 // Resolve the primary label, creating it if absent.
760 let label = node.labels.first().cloned().unwrap_or_default();
761
762 // SPA-208: reject reserved __SO_ label prefix.
763 if is_reserved_label(&label) {
764 return Err(sparrowdb_common::Error::InvalidArgument(format!(
765 "invalid argument: label \"{label}\" is reserved — the __SO_ prefix is for internal use only"
766 )));
767 }
768
769 let label_id: u32 = match self.snapshot.catalog.get_label(&label)? {
770 Some(id) => id as u32,
771 None => self.snapshot.catalog.create_label(&label)? as u32,
772 };
773
774 // Convert AST props to (col_id, StoreValue) pairs.
775 // Property values are full expressions (e.g. `datetime()`),
776 // evaluated with an empty binding map.
777 let empty_bindings: HashMap<String, Value> = HashMap::new();
778 let props: Vec<(u32, StoreValue)> = node
779 .props
780 .iter()
781 .map(|entry| {
782 let col_id = prop_name_to_col_id(&entry.key);
783 let val = eval_expr(&entry.value, &empty_bindings);
784 let store_val = value_to_store_value(val);
785 (col_id, store_val)
786 })
787 .collect();
788
789 // SPA-234: enforce UNIQUE constraints declared via
790 // `CREATE CONSTRAINT ON (n:Label) ASSERT n.property IS UNIQUE`.
791 // For each constrained (label_id, col_id) pair, check whether the
792 // incoming value already exists in the property index. If so,
793 // return a constraint-violation error before writing the node.
794 //
795 // Only inline-encodable types (Int64 and short Bytes ≤ 7 bytes)
796 // are checked via the prop_index fast path. Float values and
797 // long strings require heap storage and cannot be encoded with
798 // to_u64(); for those types we return an explicit error rather
799 // than panicking (StoreValue::Float::to_u64 is documented to
800 // panic for heap-backed values).
801 for (col_id, store_val) in &props {
802 if self.unique_constraints.contains(&(label_id, *col_id)) {
803 let raw = match store_val {
804 StoreValue::Int64(_) => store_val.to_u64(),
805 StoreValue::Bytes(b) if b.len() <= 7 => store_val.to_u64(),
806 StoreValue::Bytes(_) => {
807 return Err(sparrowdb_common::Error::InvalidArgument(
808 "UNIQUE constraints on string values longer than 7 bytes are not yet supported".into(),
809 ));
810 }
811 StoreValue::Float(_) => {
812 return Err(sparrowdb_common::Error::InvalidArgument(
813 "UNIQUE constraints on float values are not yet supported".into(),
814 ));
815 }
816 };
817 if !self
818 .prop_index
819 .borrow()
820 .lookup(label_id, *col_id, raw)
821 .is_empty()
822 {
823 return Err(sparrowdb_common::Error::InvalidArgument(format!(
824 "unique constraint violation: label \"{label}\" already has a node with the same value for this property"
825 )));
826 }
827 }
828 }
829
830 let node_id = self.snapshot.store.create_node(label_id, &props)?;
831 // SPA-234: after writing, insert new values into the prop_index so
832 // that subsequent creates in the same session also respect the
833 // UNIQUE constraint (the index may be stale if built before this
834 // node was written).
835 {
836 let slot =
837 sparrowdb_storage::property_index::PropertyIndex::node_id_to_slot(node_id);
838 let mut idx = self.prop_index.borrow_mut();
839 for (col_id, store_val) in &props {
840 if self.unique_constraints.contains(&(label_id, *col_id)) {
841 // Only insert inline-encodable values; Float/long Bytes
842 // were already rejected above before create_node was called.
843 let raw = match store_val {
844 StoreValue::Int64(_) => store_val.to_u64(),
845 StoreValue::Bytes(b) if b.len() <= 7 => store_val.to_u64(),
846 _ => continue,
847 };
848 idx.insert(label_id, *col_id, slot, raw);
849 }
850 }
851 }
852 // Update cached row count for the planner (SPA-new).
853 *self
854 .snapshot
855 .label_row_counts
856 .entry(label_id as LabelId)
857 .or_insert(0) += 1;
858 }
859 Ok(QueryResult::empty(vec![]))
860 }
861
862 pub(crate) fn execute_create_index(
863 &mut self,
864 label: &str,
865 property: &str,
866 ) -> Result<QueryResult> {
867 let label_id: u32 = match self.snapshot.catalog.get_label(label)? {
868 Some(id) => id as u32,
869 None => return Ok(QueryResult::empty(vec![])),
870 };
871 let col_id = col_id_of(property);
872 self.prop_index
873 .borrow_mut()
874 .build_for(&self.snapshot.store, label_id, col_id)?;
875 Ok(QueryResult::empty(vec![]))
876 }
877
878 /// Execute `CREATE CONSTRAINT ON (n:Label) ASSERT n.property IS UNIQUE` (SPA-234).
879 ///
880 /// Records `(label_id, col_id)` in `self.unique_constraints` so that
881 /// subsequent `execute_create` calls reject duplicate values. Also builds
882 /// the backing prop-index for that pair (needed to check existence cheaply).
883 /// If the label does not yet exist in the catalog it is auto-created so that
884 /// later `CREATE` statements can register against the constraint.
885 pub(crate) fn execute_create_constraint(
886 &mut self,
887 label: &str,
888 property: &str,
889 ) -> Result<QueryResult> {
890 let label_id: u32 = match self.snapshot.catalog.get_label(label)? {
891 Some(id) => id as u32,
892 None => self.snapshot.catalog.create_label(label)? as u32,
893 };
894 let col_id = col_id_of(property);
895
896 // Build the property index for this (label_id, col_id) pair so that
897 // uniqueness checks in execute_create can use O(log n) lookups.
898 self.prop_index
899 .borrow_mut()
900 .build_for(&self.snapshot.store, label_id, col_id)?;
901
902 // Register the constraint.
903 self.unique_constraints.insert((label_id, col_id));
904
905 Ok(QueryResult::empty(vec![]))
906 }
907}