sparrowdb_execution/engine/mutation.rs
1//! Auto-generated submodule — see engine/mod.rs for context.
2use super::*;
3
4impl Engine {
5 // ── Mutation execution (called by GraphDb with a write transaction) ────────
6
7 /// Scan nodes matching the MATCH patterns in a `MatchMutate` statement and
8 /// return the list of matching `NodeId`s. The caller is responsible for
9 /// applying the actual mutations inside a write transaction.
10 pub fn scan_match_mutate(&self, mm: &MatchMutateStatement) -> Result<Vec<NodeId>> {
11 if mm.match_patterns.is_empty() {
12 return Ok(vec![]);
13 }
14
15 // Guard: only single-node patterns (no multi-pattern, no relationship hops)
16 // are supported. Silently ignoring extra patterns would mutate the wrong
17 // nodes; instead we surface a clear error.
18 if mm.match_patterns.len() != 1 || !mm.match_patterns[0].rels.is_empty() {
19 return Err(sparrowdb_common::Error::InvalidArgument(
20 "MATCH...SET/DELETE currently supports only single-node patterns (no relationships)"
21 .into(),
22 ));
23 }
24
25 let pat = &mm.match_patterns[0];
26 if pat.nodes.is_empty() {
27 return Ok(vec![]);
28 }
29 let node_pat = &pat.nodes[0];
30 let label = node_pat.labels.first().cloned().unwrap_or_default();
31
32 let label_id = match self.snapshot.catalog.get_label(&label)? {
33 Some(id) => id as u32,
34 // SPA-266: unknown label → no nodes can match; return empty result.
35 None => return Ok(vec![]),
36 };
37
38 // Col_ids referenced by the WHERE clause (needed for WHERE evaluation
39 // even after the index narrows candidates by inline prop filter).
40 let mut where_col_ids: Vec<u32> = node_pat
41 .props
42 .iter()
43 .map(|pe| prop_name_to_col_id(&pe.key))
44 .collect();
45 if let Some(ref where_expr) = mm.where_clause {
46 collect_col_ids_from_expr(where_expr, &mut where_col_ids);
47 }
48
49 let var_name = node_pat.var.as_str();
50
51 // Use the property index for O(1) equality lookups on inline prop
52 // filters, falling back to full scan for overflow strings / params.
53 let candidates = self.scan_nodes_for_label_with_index(label_id, &node_pat.props)?;
54
55 let mut matching_ids = Vec::new();
56 for node_id in candidates {
57 // Re-read props needed for WHERE clause evaluation.
58 if mm.where_clause.is_some() {
59 let props = read_node_props(&self.snapshot.store, node_id, &where_col_ids)?;
60 if let Some(ref where_expr) = mm.where_clause {
61 let mut row_vals =
62 build_row_vals(&props, var_name, &where_col_ids, &self.snapshot.store);
63 row_vals.extend(self.dollar_params());
64 if !self.eval_where_graph(where_expr, &row_vals) {
65 continue;
66 }
67 }
68 }
69 matching_ids.push(node_id);
70 }
71
72 Ok(matching_ids)
73 }
74
75 /// Return the mutation carried by a `MatchMutate` statement, exposing it
76 /// to the caller (GraphDb) so it can apply it inside a write transaction.
77 pub fn mutation_from_match_mutate(mm: &MatchMutateStatement) -> &Mutation {
78 &mm.mutation
79 }
80
81 /// Scan edges matching a MATCH pattern with exactly one hop and return
82 /// `(src, dst, rel_type)` tuples for edge deletion.
83 ///
84 /// Supports `MATCH (a:Label)-[r:REL]->(b:Label) DELETE r` with optional
85 /// inline property filters on source and destination node patterns.
86 ///
87 /// Includes both checkpointed (CSR) and uncheckpointed (delta) edges.
88 pub fn scan_match_mutate_edges(
89 &self,
90 mm: &MatchMutateStatement,
91 ) -> Result<Vec<(NodeId, NodeId, String)>> {
92 if mm.match_patterns.len() != 1 {
93 return Err(sparrowdb_common::Error::InvalidArgument(
94 "MATCH...DELETE edge: only single-path patterns are supported".into(),
95 ));
96 }
97 let pat = &mm.match_patterns[0];
98 if pat.rels.len() != 1 || pat.nodes.len() != 2 {
99 return Err(sparrowdb_common::Error::InvalidArgument(
100 "MATCH...DELETE edge: pattern must have exactly one relationship hop".into(),
101 ));
102 }
103
104 let src_node_pat = &pat.nodes[0];
105 let dst_node_pat = &pat.nodes[1];
106 let rel_pat = &pat.rels[0];
107
108 let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
109 let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
110
111 // Resolve optional label-id constraints.
112 let src_label_id_opt: Option<u32> = if src_label.is_empty() {
113 None
114 } else {
115 match self.snapshot.catalog.get_label(&src_label)? {
116 Some(id) => Some(id as u32),
117 None => return Ok(vec![]), // unknown label → no matches
118 }
119 };
120 let dst_label_id_opt: Option<u32> = if dst_label.is_empty() {
121 None
122 } else {
123 match self.snapshot.catalog.get_label(&dst_label)? {
124 Some(id) => Some(id as u32),
125 None => return Ok(vec![]), // unknown label → no matches
126 }
127 };
128
129 // Filter registered rel tables by rel type and src/dst label.
130 let rel_tables: Vec<(u64, u32, u32, String)> = self
131 .snapshot
132 .catalog
133 .list_rel_tables_with_ids()
134 .into_iter()
135 .filter(|(_, sid, did, rt)| {
136 let type_ok = rel_pat.rel_type.is_empty() || rt == &rel_pat.rel_type;
137 let src_ok = src_label_id_opt.map(|id| id == *sid as u32).unwrap_or(true);
138 let dst_ok = dst_label_id_opt.map(|id| id == *did as u32).unwrap_or(true);
139 type_ok && src_ok && dst_ok
140 })
141 .map(|(cid, sid, did, rt)| (cid, sid as u32, did as u32, rt))
142 .collect();
143
144 // Pre-compute col_ids for inline prop filters (avoid re-computing per slot).
145 let src_filter_col_ids: Vec<u32> = src_node_pat
146 .props
147 .iter()
148 .map(|p| prop_name_to_col_id(&p.key))
149 .collect();
150 let dst_filter_col_ids: Vec<u32> = dst_node_pat
151 .props
152 .iter()
153 .map(|p| prop_name_to_col_id(&p.key))
154 .collect();
155
156 let mut result: Vec<(NodeId, NodeId, String)> = Vec::new();
157
158 for (catalog_rel_id, effective_src_lid, effective_dst_lid, rel_type) in &rel_tables {
159 let catalog_rel_id_u32 =
160 u32::try_from(*catalog_rel_id).expect("catalog_rel_id overflowed u32");
161
162 // ── Checkpointed edges (CSR) ──────────────────────────────────────
163 let hwm_src = match self.snapshot.store.hwm_for_label(*effective_src_lid) {
164 Ok(hwm) => hwm,
165 Err(_) => continue,
166 };
167 for src_slot in 0..hwm_src {
168 let src_node = NodeId(((*effective_src_lid as u64) << 32) | src_slot);
169 if self.is_node_tombstoned(src_node) {
170 continue;
171 }
172 if !self.node_matches_prop_filter(
173 src_node,
174 &src_filter_col_ids,
175 &src_node_pat.props,
176 ) {
177 continue;
178 }
179 for dst_slot in self.csr_neighbors(catalog_rel_id_u32, src_slot) {
180 let dst_node = NodeId(((*effective_dst_lid as u64) << 32) | dst_slot);
181 if self.is_node_tombstoned(dst_node) {
182 continue;
183 }
184 if !self.node_matches_prop_filter(
185 dst_node,
186 &dst_filter_col_ids,
187 &dst_node_pat.props,
188 ) {
189 continue;
190 }
191 result.push((src_node, dst_node, rel_type.clone()));
192 }
193 }
194
195 // ── Uncheckpointed edges (delta log) ──────────────────────────────
196 for rec in self.read_delta_for(catalog_rel_id_u32) {
197 let r_src_label = (rec.src.0 >> 32) as u32;
198 let r_dst_label = (rec.dst.0 >> 32) as u32;
199 if src_label_id_opt
200 .map(|id| id != r_src_label)
201 .unwrap_or(false)
202 {
203 continue;
204 }
205 if dst_label_id_opt
206 .map(|id| id != r_dst_label)
207 .unwrap_or(false)
208 {
209 continue;
210 }
211 if self.is_node_tombstoned(rec.src) || self.is_node_tombstoned(rec.dst) {
212 continue;
213 }
214 if !self.node_matches_prop_filter(rec.src, &src_filter_col_ids, &src_node_pat.props)
215 {
216 continue;
217 }
218 if !self.node_matches_prop_filter(rec.dst, &dst_filter_col_ids, &dst_node_pat.props)
219 {
220 continue;
221 }
222 result.push((rec.src, rec.dst, rel_type.clone()));
223 }
224 }
225
226 Ok(result)
227 }
228
229 // ── Node-scan helpers (shared by scan_match_create and scan_match_create_rows) ──
230
231 /// Returns `true` if the given node has been tombstoned (col 0 == u64::MAX).
232 ///
233 /// `NotFound` is expected for new/sparse nodes where col_0 has not been
234 /// written yet and is treated as "not tombstoned". All other errors are
235 /// logged as warnings and also treated as "not tombstoned" so that
236 /// transient storage issues do not suppress valid nodes during a scan.
237 pub(crate) fn is_node_tombstoned(&self, node_id: NodeId) -> bool {
238 match self.snapshot.store.get_node_raw(node_id, &[0u32]) {
239 Ok(col0) => col0.iter().any(|&(c, v)| c == 0 && v == u64::MAX),
240 Err(sparrowdb_common::Error::NotFound) => false,
241 Err(e) => {
242 tracing::warn!(
243 node_id = node_id.0,
244 error = ?e,
245 "tombstone check failed; treating node as not tombstoned"
246 );
247 false
248 }
249 }
250 }
251
252 /// Returns `true` if `node_id` satisfies every inline prop predicate in
253 /// `filter_col_ids` / `props`.
254 ///
255 /// `filter_col_ids` must be pre-computed from `props` with
256 /// `prop_name_to_col_id`. Pass an empty slice when there are no filters
257 /// (the method returns `true` immediately).
258 pub(crate) fn node_matches_prop_filter(
259 &self,
260 node_id: NodeId,
261 filter_col_ids: &[u32],
262 props: &[sparrowdb_cypher::ast::PropEntry],
263 ) -> bool {
264 if props.is_empty() {
265 return true;
266 }
267 match self.snapshot.store.get_node_raw(node_id, filter_col_ids) {
268 Ok(raw_props) => matches_prop_filter_static(
269 &raw_props,
270 props,
271 &self.dollar_params(),
272 &self.snapshot.store,
273 ),
274 Err(_) => false,
275 }
276 }
277
278 // ── Scan for MATCH…CREATE (called by GraphDb with a write transaction) ──────
279
280 /// Return all live `NodeId`s for `label_id` whose inline prop predicates
281 /// match, using the `PropertyIndex` for O(1) equality lookups when possible.
282 ///
283 /// ## Index path (O(log n) per unique value)
284 ///
285 /// When there is exactly one inline prop filter and the literal is directly
286 /// encodable (integers and strings ≤ 7 bytes), the method:
287 /// 1. Calls `build_for` lazily — reads the column file once and caches it.
288 /// 2. Does a single `BTreeMap::get` to obtain the matching slot list.
289 /// 3. Verifies tombstones on the (usually tiny) candidate set.
290 ///
291 /// ## Fallback (O(n) full scan)
292 ///
293 /// When the filter cannot use the index (overflow string, multiple props,
294 /// parameter expressions, or `build_for` I/O error) the method falls back
295 /// to iterating all `0..hwm` slots — the same behaviour as before this fix.
296 ///
297 /// ## Multi-label support (SPA-200)
298 ///
299 /// After the primary-label scan, nodes where `label_id` is a **secondary**
300 /// label are also added from the catalog reverse index. This ensures
301 /// `MATCH (n:A)` finds nodes where `:A` is secondary (e.g. `CREATE (n:B:A)`).
302 ///
303 /// ## Integration
304 ///
305 /// This replaces the inline `for slot in 0..hwm` blocks in
306 /// `scan_match_create`, `scan_match_create_rows`, and `scan_match_mutate`
307 /// so that the index is used consistently across all write-side MATCH paths.
308 pub(crate) fn scan_nodes_for_label_with_index(
309 &self,
310 label_id: u32,
311 node_props: &[sparrowdb_cypher::ast::PropEntry],
312 ) -> Result<Vec<NodeId>> {
313 let hwm = self.snapshot.store.hwm_for_label(label_id)?;
314
315 // Collect filter col_ids up-front (needed for the fallback path too).
316 let filter_col_ids: Vec<u32> = node_props
317 .iter()
318 .map(|p| prop_name_to_col_id(&p.key))
319 .collect();
320
321 // ── Lazy index build ────────────────────────────────────────────────
322 // Ensure the property index is loaded for every column referenced by
323 // inline prop filters. `build_for` is idempotent (cache-hit no-op
324 // after the first call) and suppresses I/O errors internally.
325 for &col_id in &filter_col_ids {
326 let _ = self
327 .prop_index
328 .borrow_mut()
329 .build_for(&self.snapshot.store, label_id, col_id);
330 }
331
332 // ── Index lookup (single-equality filter, literal value) ────────────
333 let index_slots: Option<Vec<u32>> = {
334 let prop_index_ref = self.prop_index.borrow();
335 try_index_lookup_for_props(node_props, label_id, &prop_index_ref)
336 };
337
338 if let Some(candidate_slots) = index_slots {
339 // O(k) verification over a small candidate set (typically 1 slot).
340 let mut result = Vec::with_capacity(candidate_slots.len());
341 for slot in candidate_slots {
342 let node_id = NodeId(((label_id as u64) << 32) | slot as u64);
343 if self.is_node_tombstoned(node_id) {
344 continue;
345 }
346 // For multi-prop filters the index only narrowed on one column;
347 // verify the remaining filters here.
348 if !self.node_matches_prop_filter(node_id, &filter_col_ids, node_props) {
349 continue;
350 }
351 result.push(node_id);
352 }
353 // SPA-200: also check secondary-label hits (not in the property index,
354 // which is keyed on primary label only).
355 self.append_secondary_label_hits(label_id, &filter_col_ids, node_props, &mut result);
356 return Ok(result);
357 }
358
359 // ── Fallback: full O(N) scan ────────────────────────────────────────
360 let mut result = Vec::new();
361 for slot in 0..hwm {
362 let node_id = NodeId(((label_id as u64) << 32) | slot);
363 if self.is_node_tombstoned(node_id) {
364 continue;
365 }
366 if !self.node_matches_prop_filter(node_id, &filter_col_ids, node_props) {
367 continue;
368 }
369 result.push(node_id);
370 }
371
372 // SPA-200: union nodes where label_id is a *secondary* label.
373 self.append_secondary_label_hits(label_id, &filter_col_ids, node_props, &mut result);
374
375 Ok(result)
376 }
377
378 /// Append nodes that have `label_id` as a **secondary** label to `result`,
379 /// applying property filters. Used by `scan_nodes_for_label_with_index`
380 /// to implement SPA-200 multi-label MATCH semantics.
381 ///
382 /// Nodes already in `result` (primary-label hits) are not duplicated.
383 fn append_secondary_label_hits(
384 &self,
385 label_id: u32,
386 filter_col_ids: &[u32],
387 node_props: &[sparrowdb_cypher::ast::PropEntry],
388 result: &mut Vec<NodeId>,
389 ) {
390 // Build a quick-lookup set of already-found nodes to avoid duplicates.
391 let already_found: HashSet<NodeId> = result.iter().copied().collect();
392
393 let lid = label_id as sparrowdb_catalog::LabelId;
394 for node_id in self.snapshot.catalog.nodes_with_secondary_label(lid) {
395 if already_found.contains(&node_id) {
396 continue;
397 }
398 if self.is_node_tombstoned(node_id) {
399 continue;
400 }
401 // For secondary-label nodes, properties are stored under the
402 // primary-label directory (encoded in node_id). The col_ids are
403 // the same — we read the stored columns and apply the filter.
404 if !self.node_matches_prop_filter(node_id, filter_col_ids, node_props) {
405 continue;
406 }
407 result.push(node_id);
408 }
409 }
410
411 /// Scan nodes matching the MATCH patterns in a `MatchCreateStatement` and
412 /// return a map of variable name → Vec<NodeId> for each named node pattern.
413 ///
414 /// The caller (GraphDb) uses this to resolve variable bindings before
415 /// calling `WriteTx::create_edge` for each edge in the CREATE clause.
416 pub fn scan_match_create(
417 &self,
418 mc: &MatchCreateStatement,
419 ) -> Result<HashMap<String, Vec<NodeId>>> {
420 let mut var_candidates: HashMap<String, Vec<NodeId>> = HashMap::new();
421
422 for pat in &mc.match_patterns {
423 for node_pat in &pat.nodes {
424 if node_pat.var.is_empty() {
425 continue;
426 }
427 // Skip if already resolved (same var can appear in multiple patterns).
428 if var_candidates.contains_key(&node_pat.var) {
429 continue;
430 }
431
432 let label = node_pat.labels.first().cloned().unwrap_or_default();
433 let label_id: u32 = match self.snapshot.catalog.get_label(&label)? {
434 Some(id) => id as u32,
435 None => {
436 // Label not found → no matching nodes for this variable.
437 var_candidates.insert(node_pat.var.clone(), vec![]);
438 continue;
439 }
440 };
441
442 // Use the property index for O(1) equality lookups when possible,
443 // falling back to a full O(N) scan for overflow strings / params.
444 let matching_ids =
445 self.scan_nodes_for_label_with_index(label_id, &node_pat.props)?;
446
447 var_candidates.insert(node_pat.var.clone(), matching_ids);
448 }
449 }
450
451 Ok(var_candidates)
452 }
453
454 /// Execute the MATCH portion of a `MatchCreateStatement` and return one
455 /// binding map per matched row.
456 ///
457 /// Each element of the returned `Vec` is a `HashMap<variable_name, NodeId>`
458 /// that represents one fully-correlated result row from the MATCH clause.
459 /// The caller uses these to drive `WriteTx::create_edge` — one call per row.
460 ///
461 /// # Algorithm
462 ///
463 /// For each `PathPattern` in `match_patterns`:
464 /// - **No relationships** (node-only pattern): scan the node store applying
465 /// inline prop filters; collect one candidate set per named variable.
466 /// Cross-join these sets with the rows accumulated so far.
467 /// - **One relationship hop** (`(a)-[:R]->(b)`): traverse the CSR + delta
468 /// log to enumerate actual (src, dst) pairs that are connected by an edge,
469 /// then filter each node against its inline prop predicates. Only
470 /// correlated pairs are yielded — this is the key difference from the old
471 /// `scan_match_create` which treated every node as an independent
472 /// candidate and then took a full Cartesian product.
473 ///
474 /// Patterns beyond a single hop are not yet supported and return an error.
475 pub fn scan_match_create_rows(
476 &self,
477 mc: &MatchCreateStatement,
478 ) -> Result<Vec<HashMap<String, NodeId>>> {
479 // Start with a single empty row (identity for cross-join).
480 let mut accumulated: Vec<HashMap<String, NodeId>> = vec![HashMap::new()];
481
482 for pat in &mc.match_patterns {
483 if pat.rels.is_empty() {
484 // ── Node-only pattern: collect candidates per variable, then
485 // cross-join into accumulated rows. ──────────────────────
486 //
487 // Collect each named node variable's candidate list.
488 let mut per_var: Vec<(String, Vec<NodeId>)> = Vec::new();
489
490 for node_pat in &pat.nodes {
491 if node_pat.var.is_empty() {
492 continue;
493 }
494
495 // SPA-211: when no label is specified, scan all registered
496 // labels so that unlabeled MATCH patterns find nodes of
497 // any type (instead of silently returning empty).
498 let scan_label_ids: Vec<u32> = if node_pat.labels.is_empty() {
499 self.snapshot
500 .catalog
501 .list_labels()?
502 .into_iter()
503 .map(|(id, _)| id as u32)
504 .collect()
505 } else {
506 let label = node_pat.labels.first().cloned().unwrap_or_default();
507 match self.snapshot.catalog.get_label(&label)? {
508 Some(id) => vec![id as u32],
509 None => {
510 // No nodes can match → entire MATCH yields nothing.
511 return Ok(vec![]);
512 }
513 }
514 };
515
516 // Use the property index for O(1) equality lookups when possible,
517 // falling back to a full O(N) scan for overflow strings / params.
518 let mut matching_ids: Vec<NodeId> = Vec::new();
519 for label_id in scan_label_ids {
520 let ids =
521 self.scan_nodes_for_label_with_index(label_id, &node_pat.props)?;
522 matching_ids.extend(ids);
523 }
524
525 if matching_ids.is_empty() {
526 // No matching nodes → entire MATCH is empty.
527 return Ok(vec![]);
528 }
529
530 per_var.push((node_pat.var.clone(), matching_ids));
531 }
532
533 // Cross-join the per_var candidates into accumulated.
534 // `candidates` is guaranteed non-empty (checked above), so the result
535 // will be non-empty as long as `accumulated` is non-empty.
536 for (var, candidates) in per_var {
537 let mut next: Vec<HashMap<String, NodeId>> = Vec::new();
538 for row in &accumulated {
539 for &node_id in &candidates {
540 let mut new_row = row.clone();
541 new_row.insert(var.clone(), node_id);
542 next.push(new_row);
543 }
544 }
545 accumulated = next;
546 }
547 } else if pat.rels.len() == 1 && pat.nodes.len() == 2 {
548 // ── Single-hop relationship pattern: traverse CSR + delta edges
549 // to produce correlated (src, dst) pairs. ─────────────────
550 let src_node_pat = &pat.nodes[0];
551 let dst_node_pat = &pat.nodes[1];
552 let rel_pat = &pat.rels[0];
553
554 // Only outgoing direction is supported for MATCH…CREATE traversal.
555 if rel_pat.dir != sparrowdb_cypher::ast::EdgeDir::Outgoing {
556 return Err(sparrowdb_common::Error::Unimplemented);
557 }
558
559 let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
560 let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
561
562 let src_label_id: u32 = match self.snapshot.catalog.get_label(&src_label)? {
563 Some(id) => id as u32,
564 None => return Ok(vec![]),
565 };
566 let dst_label_id: u32 = match self.snapshot.catalog.get_label(&dst_label)? {
567 Some(id) => id as u32,
568 None => return Ok(vec![]),
569 };
570
571 let src_filter_cols: Vec<u32> = src_node_pat
572 .props
573 .iter()
574 .map(|p| prop_name_to_col_id(&p.key))
575 .collect();
576 let dst_filter_cols: Vec<u32> = dst_node_pat
577 .props
578 .iter()
579 .map(|p| prop_name_to_col_id(&p.key))
580 .collect();
581
582 // SPA-185: resolve per-type rel table for delta and CSR reads.
583 let rel_lookup =
584 self.resolve_rel_table_id(src_label_id, dst_label_id, &rel_pat.rel_type);
585 if matches!(rel_lookup, RelTableLookup::NotFound) {
586 return Ok(vec![]);
587 }
588
589 // Build a src_slot → Vec<dst_slot> adjacency map from the delta log once,
590 // filtering by src_label to avoid O(N*M) scanning inside the outer loop.
591 let delta_adj: HashMap<u64, Vec<u64>> = {
592 let records: Vec<DeltaRecord> = match rel_lookup {
593 RelTableLookup::Found(rtid) => self.read_delta_for(rtid),
594 _ => self.read_delta_all(),
595 };
596 let mut adj: HashMap<u64, Vec<u64>> = HashMap::new();
597 for r in records {
598 let s = r.src.0;
599 let s_label = (s >> 32) as u32;
600 if s_label == src_label_id {
601 let s_slot = s & 0xFFFF_FFFF;
602 adj.entry(s_slot).or_default().push(r.dst.0 & 0xFFFF_FFFF);
603 }
604 }
605 adj
606 };
607
608 let hwm_src = self.snapshot.store.hwm_for_label(src_label_id)?;
609
610 // Pairs yielded by this pattern for cross-join below.
611 let mut pattern_rows: Vec<HashMap<String, NodeId>> = Vec::new();
612
613 for src_slot in 0..hwm_src {
614 // SPA-254: check per-query deadline at every slot boundary.
615 self.check_deadline()?;
616
617 let src_node = NodeId(((src_label_id as u64) << 32) | src_slot);
618
619 if self.is_node_tombstoned(src_node) {
620 continue;
621 }
622 if !self.node_matches_prop_filter(
623 src_node,
624 &src_filter_cols,
625 &src_node_pat.props,
626 ) {
627 continue;
628 }
629
630 // Collect outgoing neighbours (CSR + delta adjacency map).
631 let csr_neighbors_vec: Vec<u64> = match rel_lookup {
632 RelTableLookup::Found(rtid) => self.csr_neighbors(rtid, src_slot),
633 _ => self.csr_neighbors_all(src_slot),
634 };
635 let empty: Vec<u64> = Vec::new();
636 let delta_neighbors: &[u64] =
637 delta_adj.get(&src_slot).map_or(&empty, |v| v.as_slice());
638
639 let mut seen: HashSet<u64> = HashSet::new();
640 for &dst_slot in csr_neighbors_vec.iter().chain(delta_neighbors.iter()) {
641 if !seen.insert(dst_slot) {
642 continue;
643 }
644 let dst_node = NodeId(((dst_label_id as u64) << 32) | dst_slot);
645
646 if self.is_node_tombstoned(dst_node) {
647 continue;
648 }
649 if !self.node_matches_prop_filter(
650 dst_node,
651 &dst_filter_cols,
652 &dst_node_pat.props,
653 ) {
654 continue;
655 }
656
657 let mut row: HashMap<String, NodeId> = HashMap::new();
658
659 // When src and dst use the same variable (self-loop pattern),
660 // the edge must actually be a self-loop (src == dst).
661 if !src_node_pat.var.is_empty()
662 && !dst_node_pat.var.is_empty()
663 && src_node_pat.var == dst_node_pat.var
664 {
665 if src_node != dst_node {
666 continue;
667 }
668 row.insert(src_node_pat.var.clone(), src_node);
669 } else {
670 if !src_node_pat.var.is_empty() {
671 row.insert(src_node_pat.var.clone(), src_node);
672 }
673 if !dst_node_pat.var.is_empty() {
674 row.insert(dst_node_pat.var.clone(), dst_node);
675 }
676 }
677 pattern_rows.push(row);
678 }
679 }
680
681 if pattern_rows.is_empty() {
682 return Ok(vec![]);
683 }
684
685 // Cross-join pattern_rows into accumulated, enforcing shared-variable
686 // constraints: if a variable appears in both acc_row and pat_row, only
687 // keep combinations where they agree on the same NodeId.
688 let mut next: Vec<HashMap<String, NodeId>> = Vec::new();
689 for acc_row in &accumulated {
690 'outer: for pat_row in &pattern_rows {
691 // Reject combinations where shared variables disagree.
692 for (k, v) in pat_row {
693 if let Some(existing) = acc_row.get(k) {
694 if existing != v {
695 continue 'outer;
696 }
697 }
698 }
699 let mut new_row = acc_row.clone();
700 new_row.extend(pat_row.iter().map(|(k, v)| (k.clone(), *v)));
701 next.push(new_row);
702 }
703 }
704 accumulated = next;
705 } else {
706 // Multi-hop patterns not yet supported for MATCH…CREATE.
707 return Err(sparrowdb_common::Error::Unimplemented);
708 }
709 }
710
711 Ok(accumulated)
712 }
713
714 /// Scan the MATCH patterns of a `MatchMergeRelStatement` and return
715 /// correlated `(variable → NodeId)` binding rows — identical semantics to
716 /// `scan_match_create_rows` but taking the MERGE form's match patterns (SPA-233).
717 pub fn scan_match_merge_rel_rows(
718 &self,
719 mm: &MatchMergeRelStatement,
720 ) -> Result<Vec<HashMap<String, NodeId>>> {
721 // Reuse scan_match_create_rows by wrapping the MERGE patterns in a
722 // MatchCreateStatement with an empty (no-op) CREATE body.
723 let proxy = MatchCreateStatement {
724 match_patterns: mm.match_patterns.clone(),
725 match_props: vec![],
726 create: CreateStatement {
727 nodes: vec![],
728 edges: vec![],
729 },
730 };
731 self.scan_match_create_rows(&proxy)
732 }
733
734 // ── UNWIND ─────────────────────────────────────────────────────────────────
735
736 pub(crate) fn execute_unwind(&self, u: &UnwindStatement) -> Result<QueryResult> {
737 use crate::operators::{Operator, UnwindOperator};
738
739 // Evaluate the list expression to a Vec<Value>.
740 let values = eval_list_expr(&u.expr, &self.params)?;
741
742 // Determine the output column name from the RETURN clause.
743 let column_names = extract_return_column_names(&u.return_clause.items);
744
745 if values.is_empty() {
746 return Ok(QueryResult::empty(column_names));
747 }
748
749 let mut op = UnwindOperator::new(u.alias.clone(), values);
750 let chunks = op.collect_all()?;
751
752 // Materialize: for each chunk/group/row, project the RETURN columns.
753 //
754 // Only fall back to the UNWIND alias value when the output column
755 // actually corresponds to the alias variable. Returning a value for
756 // an unrelated variable (e.g. `RETURN y` when alias is `x`) would
757 // silently produce wrong results instead of NULL.
758 let mut rows: Vec<Vec<Value>> = Vec::new();
759 for chunk in &chunks {
760 for group in &chunk.groups {
761 let n = group.len();
762 for row_idx in 0..n {
763 let row = u
764 .return_clause
765 .items
766 .iter()
767 .map(|item| {
768 // Determine whether this RETURN item refers to the
769 // alias variable produced by UNWIND.
770 let is_alias = match &item.expr {
771 Expr::Var(name) => name == &u.alias,
772 _ => false,
773 };
774 if is_alias {
775 group.get_value(&u.alias, row_idx).unwrap_or(Value::Null)
776 } else {
777 // Variable is not in scope for this UNWIND —
778 // return NULL rather than leaking the alias value.
779 Value::Null
780 }
781 })
782 .collect();
783 rows.push(row);
784 }
785 }
786 }
787
788 Ok(QueryResult {
789 columns: column_names,
790 rows,
791 })
792 }
793
794 // ── CREATE node execution ─────────────────────────────────────────────────
795
796 /// Execute a `CREATE` statement, auto-registering labels as needed (SPA-156).
797 ///
798 /// For each node in the CREATE clause:
799 /// 1. Look up (or create) its **primary** label (first label in the
800 /// pattern) in the catalog. The primary label determines the `NodeId`
801 /// encoding and storage directory.
802 /// 2. Resolve all secondary labels (labels[1..]) and record them in the
803 /// catalog side table after the node is created (SPA-200).
804 /// 3. Convert inline properties to `(col_id, StoreValue)` pairs using the
805 /// same FNV-1a hash used by `WriteTx::merge_node`.
806 /// 4. Write the node to the node store.
807 pub(crate) fn execute_create(&mut self, create: &CreateStatement) -> Result<QueryResult> {
808 for node in &create.nodes {
809 // Resolve the primary label, creating it if absent.
810 let label = node.labels.first().cloned().unwrap_or_default();
811
812 // SPA-208: reject reserved __SO_ label prefix.
813 if is_reserved_label(&label) {
814 return Err(sparrowdb_common::Error::InvalidArgument(format!(
815 "invalid argument: label \"{label}\" is reserved — the __SO_ prefix is for internal use only"
816 )));
817 }
818
819 // SPA-200: reject reserved __SO_ prefix on secondary labels too.
820 for secondary_label_name in node.labels.iter().skip(1) {
821 if is_reserved_label(secondary_label_name) {
822 return Err(sparrowdb_common::Error::InvalidArgument(format!(
823 "invalid argument: label \"{secondary_label_name}\" is reserved — the __SO_ prefix is for internal use only"
824 )));
825 }
826 }
827
828 let label_id: u32 = match self.snapshot.catalog.get_label(&label)? {
829 Some(id) => id as u32,
830 None => self.snapshot.catalog.create_label(&label)? as u32,
831 };
832
833 // Convert AST props to (col_id, StoreValue) pairs.
834 // Property values are full expressions (e.g. `datetime()`),
835 // evaluated with an empty binding map.
836 let empty_bindings: HashMap<String, Value> = HashMap::new();
837 let props: Vec<(u32, StoreValue)> = node
838 .props
839 .iter()
840 .map(|entry| {
841 let col_id = prop_name_to_col_id(&entry.key);
842 let val = eval_expr(&entry.value, &empty_bindings);
843 let store_val = value_to_store_value(val);
844 (col_id, store_val)
845 })
846 .collect();
847
848 // SPA-234: enforce UNIQUE constraints declared via
849 // `CREATE CONSTRAINT ON (n:Label) ASSERT n.property IS UNIQUE`.
850 // For each constrained (label_id, col_id) pair, check whether the
851 // incoming value already exists in the property index. If so,
852 // return a constraint-violation error before writing the node.
853 //
854 // Only inline-encodable types (Int64 and short Bytes ≤ 7 bytes)
855 // are checked via the prop_index fast path. Float values and
856 // long strings require heap storage and cannot be encoded with
857 // to_u64(); for those types we return an explicit error rather
858 // than panicking (StoreValue::Float::to_u64 is documented to
859 // panic for heap-backed values).
860 for (col_id, store_val) in &props {
861 if self.unique_constraints.contains(&(label_id, *col_id)) {
862 let raw = match store_val {
863 StoreValue::Int64(_) => store_val.to_u64(),
864 StoreValue::Bytes(b) if b.len() <= 7 => store_val.to_u64(),
865 StoreValue::Bytes(_) => {
866 return Err(sparrowdb_common::Error::InvalidArgument(
867 "UNIQUE constraints on string values longer than 7 bytes are not yet supported".into(),
868 ));
869 }
870 StoreValue::Float(_) => {
871 return Err(sparrowdb_common::Error::InvalidArgument(
872 "UNIQUE constraints on float values are not yet supported".into(),
873 ));
874 }
875 };
876 if !self
877 .prop_index
878 .borrow()
879 .lookup(label_id, *col_id, raw)
880 .is_empty()
881 {
882 return Err(sparrowdb_common::Error::InvalidArgument(format!(
883 "unique constraint violation: label \"{label}\" already has a node with the same value for this property"
884 )));
885 }
886 }
887 }
888
889 let node_id = self.snapshot.store.create_node(label_id, &props)?;
890 // SPA-234: after writing, insert new values into the prop_index so
891 // that subsequent creates in the same session also respect the
892 // UNIQUE constraint (the index may be stale if built before this
893 // node was written).
894 {
895 let slot =
896 sparrowdb_storage::property_index::PropertyIndex::node_id_to_slot(node_id);
897 let mut idx = self.prop_index.borrow_mut();
898 for (col_id, store_val) in &props {
899 if self.unique_constraints.contains(&(label_id, *col_id)) {
900 // Only insert inline-encodable values; Float/long Bytes
901 // were already rejected above before create_node was called.
902 let raw = match store_val {
903 StoreValue::Int64(_) => store_val.to_u64(),
904 StoreValue::Bytes(b) if b.len() <= 7 => store_val.to_u64(),
905 _ => continue,
906 };
907 idx.insert(label_id, *col_id, slot, raw);
908 }
909 }
910 }
911 // SPA-200: record secondary labels in the catalog side table.
912 // The primary label is already encoded in `node_id`; secondary
913 // labels are persisted here so that MATCH on secondary labels
914 // (and labels(n)) return the full label set.
915 if node.labels.len() > 1 {
916 let mut secondary_label_ids: Vec<sparrowdb_catalog::LabelId> = Vec::new();
917 for secondary_name in node.labels.iter().skip(1) {
918 let sid = match self.snapshot.catalog.get_label(secondary_name)? {
919 Some(id) => id,
920 None => self.snapshot.catalog.create_label(secondary_name)?,
921 };
922 secondary_label_ids.push(sid);
923 }
924 self.snapshot
925 .catalog
926 .record_secondary_labels(node_id, &secondary_label_ids)?;
927 }
928
929 // Update cached row count for the planner (SPA-new).
930 *self
931 .snapshot
932 .label_row_counts
933 .entry(label_id as LabelId)
934 .or_insert(0) += 1;
935 }
936 Ok(QueryResult::empty(vec![]))
937 }
938
939 pub(crate) fn execute_create_index(
940 &mut self,
941 label: &str,
942 property: &str,
943 ) -> Result<QueryResult> {
944 let label_id: u32 = match self.snapshot.catalog.get_label(label)? {
945 Some(id) => id as u32,
946 None => return Ok(QueryResult::empty(vec![])),
947 };
948 let col_id = col_id_of(property);
949 self.prop_index
950 .borrow_mut()
951 .build_for(&self.snapshot.store, label_id, col_id)?;
952 Ok(QueryResult::empty(vec![]))
953 }
954
955 /// Execute `CREATE CONSTRAINT ON (n:Label) ASSERT n.property IS UNIQUE` (SPA-234).
956 ///
957 /// Records `(label_id, col_id)` in `self.unique_constraints` so that
958 /// subsequent `execute_create` calls reject duplicate values. Also builds
959 /// the backing prop-index for that pair (needed to check existence cheaply).
960 /// If the label does not yet exist in the catalog it is auto-created so that
961 /// later `CREATE` statements can register against the constraint.
962 pub(crate) fn execute_create_constraint(
963 &mut self,
964 label: &str,
965 property: &str,
966 ) -> Result<QueryResult> {
967 let label_id: u32 = match self.snapshot.catalog.get_label(label)? {
968 Some(id) => id as u32,
969 None => self.snapshot.catalog.create_label(label)? as u32,
970 };
971 let col_id = col_id_of(property);
972
973 // Build the property index for this (label_id, col_id) pair so that
974 // uniqueness checks in execute_create can use O(log n) lookups.
975 self.prop_index
976 .borrow_mut()
977 .build_for(&self.snapshot.store, label_id, col_id)?;
978
979 // Register the constraint.
980 self.unique_constraints.insert((label_id, col_id));
981
982 Ok(QueryResult::empty(vec![]))
983 }
984}