sparrowdb_execution/engine/pipeline_exec.rs
1//! Opt-in chunked pipeline execution entry points (Phase 1 + Phase 2 + Phase 3, #299).
2//!
3//! This module wires the Phase 1, Phase 2, and Phase 3 pipeline data structures
4//! into the existing engine without modifying any row-at-a-time code paths.
5//!
6//! When `Engine::use_chunked_pipeline` is `true` AND the query shape qualifies,
7//! these methods are called instead of the row-at-a-time equivalents.
8//!
9//! # Phase 1 supported shape
10//!
11//! Single-label scan with no hops and no aggregation:
12//! `MATCH (n:Label) [WHERE n.prop op val] RETURN n.prop1, n.prop2`
13//!
14//! # Phase 2 supported shape
15//!
16//! Single-label, single-hop, directed (outgoing or incoming):
17//! `MATCH (a:SrcLabel)-[:R]->(b:DstLabel) [WHERE ...] RETURN a.p, b.q [LIMIT n]`
18//!
19//! # Phase 3 supported shape
20//!
21//! Single-label, two-hop same-rel, both hops outgoing:
22//! `MATCH (a:L)-[:R]->(b:L)-[:R]->(c:L) [WHERE ...] RETURN ... [LIMIT n]`
23//!
24//! All other shapes fall back to the row-at-a-time engine.
25
26use std::sync::Arc;
27
28use sparrowdb_common::NodeId;
29use sparrowdb_storage::edge_store::EdgeStore;
30
31use super::*;
32use crate::chunk::{DataChunk, COL_ID_DST_SLOT, COL_ID_SLOT, COL_ID_SRC_SLOT};
33use crate::pipeline::{
34 BfsArena, ChunkPredicate, GetNeighbors, PipelineOperator, ReadNodeProps, ScanByLabel,
35 SlotIntersect,
36};
37
38// ── ChunkedPlan ───────────────────────────────────────────────────────────────
39
40/// Shape selector for the chunked vectorized pipeline (Phase 4, spec §2.3).
41///
42/// Replaces the cascade of `can_use_*` boolean guards with a typed plan enum.
43/// `Engine::try_plan_chunked_match` returns one of these variants (or `None`
44/// to indicate the row engine should be used), and dispatch is a `match` with
45/// no further `if can_use_*` calls.
46///
47/// Each variant may carry shape-specific parameters in future phases. For now
48/// all resolution happens in the `execute_*_chunked` methods.
49#[derive(Debug, Clone, PartialEq, Eq)]
50pub enum ChunkedPlan {
51 /// Single-label scan only — no relationship hops.
52 Scan,
53 /// Single-hop directed traversal.
54 OneHop,
55 /// Two-hop same-rel-type directed traversal.
56 TwoHop,
57 /// Mutual-neighbors: `(a)-[:R]->(x)<-[:R]-(b)` with both a and b bound.
58 MutualNeighbors,
59}
60
61impl std::fmt::Display for ChunkedPlan {
62 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 match self {
64 ChunkedPlan::Scan => write!(f, "Scan"),
65 ChunkedPlan::OneHop => write!(f, "OneHop"),
66 ChunkedPlan::TwoHop => write!(f, "TwoHop"),
67 ChunkedPlan::MutualNeighbors => write!(f, "MutualNeighbors"),
68 }
69 }
70}
71
72impl Engine {
73 /// Return `true` when `m` qualifies for Phase 1 chunked execution.
74 ///
75 /// Eligibility:
76 /// - `use_chunked_pipeline` flag is set.
77 /// - Single node pattern with no relationship hops.
78 /// - No aggregation in RETURN (aggregation is Phase 4).
79 /// - No ORDER BY / SKIP / LIMIT (trivially added in Phase 2).
80 /// - At least one label is specified (unlabeled scans fall back).
81 pub(crate) fn can_use_chunked_pipeline(&self, m: &MatchStatement) -> bool {
82 if !self.use_chunked_pipeline {
83 return false;
84 }
85 if m.pattern.len() != 1 || !m.pattern[0].rels.is_empty() {
86 return false;
87 }
88 if has_aggregate_in_return(&m.return_clause.items) {
89 return false;
90 }
91 if !m.order_by.is_empty() || m.skip.is_some() || m.limit.is_some() {
92 return false;
93 }
94 // DISTINCT deduplication is not implemented in the chunked scan path —
95 // fall back to the row engine which applies deduplicate_rows.
96 if m.distinct {
97 return false;
98 }
99 // Inline prop filters on the node pattern are not evaluated by the
100 // chunked scan path — fall back to the row engine so they are applied.
101 // (Tracked as #362 for native support in the chunked path.)
102 if !m.pattern[0].nodes[0].props.is_empty() {
103 return false;
104 }
105 // Bare variable projection (RETURN n) requires the row engine eval path
106 // to build a full property map (SPA-213). project_row returns Null for
107 // bare vars. Fall back until the chunked path implements SPA-213.
108 if m.return_clause
109 .items
110 .iter()
111 .any(|item| matches!(&item.expr, Expr::Var(_)))
112 {
113 return false;
114 }
115 !m.pattern[0].nodes[0].labels.is_empty()
116 }
117
118 /// Return `true` when `m` qualifies for Phase 2 one-hop chunked execution.
119 ///
120 /// Eligibility (spec §3.6):
121 /// - `use_chunked_pipeline` flag is set.
122 /// - Exactly 2 nodes, 1 relationship (single hop).
123 /// - Both nodes have exactly one label.
124 /// - Directed (Outgoing or Incoming); undirected deferred to Phase 3.
125 /// - No `OPTIONAL MATCH`, no `UNION`, no subquery in `WHERE`.
126 /// - No aggregate, no `ORDER BY`.
127 /// - `LIMIT` allowed when no `DISTINCT`.
128 /// - Planner resolves exactly one relationship table.
129 /// - No edge-property references in RETURN or WHERE.
130 pub(crate) fn can_use_one_hop_chunked(&self, m: &MatchStatement) -> bool {
131 use sparrowdb_cypher::ast::EdgeDir;
132
133 if !self.use_chunked_pipeline {
134 return false;
135 }
136 // Exactly 1 path pattern, 2 nodes, 1 rel.
137 if m.pattern.len() != 1 {
138 return false;
139 }
140 let pat = &m.pattern[0];
141 if pat.rels.len() != 1 || pat.nodes.len() != 2 {
142 return false;
143 }
144 // Both nodes must have exactly one label.
145 if pat.nodes[0].labels.len() != 1 || pat.nodes[1].labels.len() != 1 {
146 return false;
147 }
148 // Only directed (Outgoing or Incoming) supported in Phase 2.
149 let dir = &pat.rels[0].dir;
150 if *dir != EdgeDir::Outgoing && *dir != EdgeDir::Incoming {
151 return false;
152 }
153 // No aggregation.
154 if has_aggregate_in_return(&m.return_clause.items) {
155 return false;
156 }
157 // No DISTINCT — chunked materializer has no dedup.
158 if m.distinct {
159 return false;
160 }
161 // No ORDER BY.
162 if !m.order_by.is_empty() {
163 return false;
164 }
165 // No variable-length hops.
166 if pat.rels[0].min_hops.is_some() {
167 return false;
168 }
169 // No edge-property references (Phase 2 spec §3.7 — no edge prop reads).
170 // Guard both RETURN items and WHERE clause: a `WHERE r.weight > 5` with
171 // no `r.*` in RETURN would silently return 0 rows because the chunked
172 // materializer does not populate edge-property row_vals.
173 let rel_var = &pat.rels[0].var;
174 if !rel_var.is_empty() {
175 let ref_in_return = m.return_clause.items.iter().any(|item| {
176 column_name_for_item(item)
177 .split_once('.')
178 .is_some_and(|(v, _)| v == rel_var.as_str())
179 });
180 if ref_in_return {
181 return false;
182 }
183 // Also reject if the WHERE clause accesses rel-variable properties.
184 if let Some(ref wexpr) = m.where_clause {
185 if expr_references_var(wexpr, rel_var.as_str()) {
186 return false;
187 }
188 }
189 }
190 // Only simple WHERE predicates supported (no CONTAINS, no subquery).
191 if let Some(ref wexpr) = m.where_clause {
192 if !is_simple_where_for_chunked(wexpr) {
193 return false;
194 }
195 }
196 // Inline prop filters on node patterns are not evaluated by the chunked
197 // one-hop path — fall back to the row engine. (See #362.)
198 if pat.nodes.iter().any(|n| !n.props.is_empty()) {
199 return false;
200 }
201 // Resolve to exactly one rel table.
202 let src_label = pat.nodes[0].labels.first().cloned().unwrap_or_default();
203 let dst_label = pat.nodes[1].labels.first().cloned().unwrap_or_default();
204 let rel_type = pat.rels[0].rel_type.clone();
205 let n_tables = self
206 .snapshot
207 .catalog
208 .list_rel_tables_with_ids()
209 .into_iter()
210 .filter(|(_, sid, did, rt)| {
211 let type_ok = rel_type.is_empty() || rt == &rel_type;
212 let src_ok = self
213 .snapshot
214 .catalog
215 .get_label(&src_label)
216 .ok()
217 .flatten()
218 .map(|id| id as u32 == *sid as u32)
219 .unwrap_or(false);
220 let dst_ok = self
221 .snapshot
222 .catalog
223 .get_label(&dst_label)
224 .ok()
225 .flatten()
226 .map(|id| id as u32 == *did as u32)
227 .unwrap_or(false);
228 type_ok && src_ok && dst_ok
229 })
230 .count();
231 n_tables == 1
232 }
233
234 /// Execute a 1-hop query using the Phase 2 chunked pipeline.
235 ///
236 /// Pipeline shape (spec §3.6):
237 /// ```text
238 /// MaterializeRows(limit?)
239 /// <- optional Filter(ChunkPredicate, dst)
240 /// <- ReadNodeProps(dst) [only if dst props referenced]
241 /// <- GetNeighbors(rel_type_id, src_label_id)
242 /// <- optional Filter(ChunkPredicate, src)
243 /// <- ReadNodeProps(src) [only if src props referenced]
244 /// <- ScanByLabel(hwm)
245 /// ```
246 ///
247 /// Terminal projection uses existing `project_hop_row` helpers at the
248 /// materializer sink so we never duplicate projection semantics.
249 pub(crate) fn execute_one_hop_chunked(
250 &self,
251 m: &MatchStatement,
252 column_names: &[String],
253 ) -> Result<QueryResult> {
254 use sparrowdb_cypher::ast::EdgeDir;
255
256 let pat = &m.pattern[0];
257 let rel_pat = &pat.rels[0];
258 let dir = &rel_pat.dir;
259
260 // For Incoming, swap the logical src/dst so the pipeline always runs
261 // in the outgoing direction and we swap back at projection time.
262 let (src_node_pat, dst_node_pat, swapped) = if *dir == EdgeDir::Incoming {
263 (&pat.nodes[1], &pat.nodes[0], true)
264 } else {
265 (&pat.nodes[0], &pat.nodes[1], false)
266 };
267
268 let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
269 let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
270 let rel_type = rel_pat.rel_type.clone();
271
272 // Resolve label IDs — both must exist for this to reach us.
273 let src_label_id = match self.snapshot.catalog.get_label(&src_label)? {
274 Some(id) => id as u32,
275 None => {
276 return Ok(QueryResult {
277 columns: column_names.to_vec(),
278 rows: vec![],
279 });
280 }
281 };
282 let dst_label_id = match self.snapshot.catalog.get_label(&dst_label)? {
283 Some(id) => id as u32,
284 None => {
285 return Ok(QueryResult {
286 columns: column_names.to_vec(),
287 rows: vec![],
288 });
289 }
290 };
291
292 // Resolve rel table ID.
293 let (catalog_rel_id, _) = self
294 .snapshot
295 .catalog
296 .list_rel_tables_with_ids()
297 .into_iter()
298 .find(|(_, sid, did, rt)| {
299 let type_ok = rel_type.is_empty() || rt == &rel_type;
300 let src_ok = *sid as u32 == src_label_id;
301 let dst_ok = *did as u32 == dst_label_id;
302 type_ok && src_ok && dst_ok
303 })
304 .map(|(cid, sid, did, rt)| (cid as u32, (sid, did, rt)))
305 .ok_or_else(|| {
306 sparrowdb_common::Error::InvalidArgument(
307 "no matching relationship table found".into(),
308 )
309 })?;
310
311 let hwm_src = self.snapshot.store.hwm_for_label(src_label_id).unwrap_or(0);
312 tracing::debug!(
313 engine = "chunked",
314 src_label = %src_label,
315 dst_label = %dst_label,
316 rel_type = %rel_type,
317 hwm_src,
318 "executing via chunked pipeline (1-hop)"
319 );
320
321 // Determine which property col_ids are needed for src and dst,
322 // taking into account both RETURN and WHERE references.
323 let src_var = src_node_pat.var.as_str();
324 let dst_var = dst_node_pat.var.as_str();
325
326 // For column name collection: when swapped, actual query vars are
327 // swapped vs. the src/dst in the pipeline. Use the original query vars.
328 let (query_src_var, query_dst_var) = if swapped {
329 (dst_node_pat.var.as_str(), src_node_pat.var.as_str())
330 } else {
331 (src_var, dst_var)
332 };
333
334 let mut col_ids_src = collect_col_ids_for_var(query_src_var, column_names, src_label_id);
335 let mut col_ids_dst = collect_col_ids_for_var(query_dst_var, column_names, dst_label_id);
336
337 // Ensure WHERE-referenced columns are fetched.
338 if let Some(ref wexpr) = m.where_clause {
339 collect_col_ids_from_expr_for_var(wexpr, query_src_var, &mut col_ids_src);
340 collect_col_ids_from_expr_for_var(wexpr, query_dst_var, &mut col_ids_dst);
341 }
342 // Ensure inline prop filter columns are fetched.
343 for p in &src_node_pat.props {
344 let cid = col_id_of(&p.key);
345 if !col_ids_src.contains(&cid) {
346 col_ids_src.push(cid);
347 }
348 }
349 for p in &dst_node_pat.props {
350 let cid = col_id_of(&p.key);
351 if !col_ids_dst.contains(&cid) {
352 col_ids_dst.push(cid);
353 }
354 }
355
356 // Build delta index for this rel table.
357 let delta_records = {
358 let edge_store = EdgeStore::open(
359 &self.snapshot.db_root,
360 sparrowdb_storage::edge_store::RelTableId(catalog_rel_id),
361 );
362 edge_store.and_then(|s| s.read_delta()).unwrap_or_default()
363 };
364
365 // Get CSR for this rel table.
366 let csr = self
367 .snapshot
368 .csrs
369 .get(&catalog_rel_id)
370 .cloned()
371 .unwrap_or_else(|| sparrowdb_storage::csr::CsrForward::build(0, &[]));
372
373 // Degree hint from stats.
374 let avg_degree_hint = self
375 .snapshot
376 .rel_degree_stats()
377 .get(&catalog_rel_id)
378 .map(|s| s.mean().ceil() as usize)
379 .unwrap_or(8);
380
381 // Build WHERE predicates for src and dst (or use closure fallback).
382 let src_pred_opt = m
383 .where_clause
384 .as_ref()
385 .and_then(|wexpr| try_compile_predicate(wexpr, query_src_var, &col_ids_src));
386 let dst_pred_opt = m
387 .where_clause
388 .as_ref()
389 .and_then(|wexpr| try_compile_predicate(wexpr, query_dst_var, &col_ids_dst));
390
391 let store_arc = Arc::new(NodeStore::open(self.snapshot.store.root_path())?);
392
393 // ── Build the pipeline ────────────────────────────────────────────────
394 //
395 // We box each layer into a type-erased enum so we can build the pipeline
396 // dynamically depending on which operators are needed.
397
398 let limit = m.limit.map(|l| l as usize);
399 let mut rows: Vec<Vec<Value>> = Vec::new();
400
401 // Use a macro-free trait-object approach: build as a flat loop with
402 // explicit operator invocations to avoid complex generic nesting.
403 // This keeps Phase 2 simple and Phase 4 can refactor to a proper
404 // operator tree if needed.
405
406 let mut scan = ScanByLabel::new(hwm_src);
407
408 'outer: while let Some(scan_chunk) = scan.next_chunk()? {
409 // Tombstone check happens in the slot loop below.
410
411 // ── ReadNodeProps(src) ────────────────────────────────────────────
412 let src_chunk = if !col_ids_src.is_empty() {
413 let mut rnp = ReadNodeProps::new(
414 SingleChunkSource::new(scan_chunk),
415 Arc::clone(&store_arc),
416 src_label_id,
417 crate::chunk::COL_ID_SLOT,
418 col_ids_src.clone(),
419 );
420 match rnp.next_chunk()? {
421 Some(c) => c,
422 None => continue,
423 }
424 } else {
425 scan_chunk
426 };
427
428 // ── Filter(src) ───────────────────────────────────────────────────
429 let src_chunk = if let Some(ref pred) = src_pred_opt {
430 let pred = pred.clone();
431 let keep: Vec<bool> = {
432 (0..src_chunk.len())
433 .map(|i| pred.eval(&src_chunk, i))
434 .collect()
435 };
436 let mut c = src_chunk;
437 c.filter_sel(|i| keep[i]);
438 if c.live_len() == 0 {
439 continue;
440 }
441 c
442 } else {
443 src_chunk
444 };
445
446 // ── GetNeighbors ──────────────────────────────────────────────────
447 let mut gn = GetNeighbors::new(
448 SingleChunkSource::new(src_chunk.clone()),
449 csr.clone(),
450 &delta_records,
451 src_label_id,
452 avg_degree_hint,
453 );
454
455 while let Some(hop_chunk) = gn.next_chunk()? {
456 // hop_chunk has COL_ID_SRC_SLOT and COL_ID_DST_SLOT columns.
457
458 // ── ReadNodeProps(dst) ────────────────────────────────────────
459 let dst_chunk = if !col_ids_dst.is_empty() {
460 let mut rnp = ReadNodeProps::new(
461 SingleChunkSource::new(hop_chunk),
462 Arc::clone(&store_arc),
463 dst_label_id,
464 COL_ID_DST_SLOT,
465 col_ids_dst.clone(),
466 );
467 match rnp.next_chunk()? {
468 Some(c) => c,
469 None => continue,
470 }
471 } else {
472 hop_chunk
473 };
474
475 // ── Filter(dst) ───────────────────────────────────────────────
476 let dst_chunk = if let Some(ref pred) = dst_pred_opt {
477 let pred = pred.clone();
478 let keep: Vec<bool> = (0..dst_chunk.len())
479 .map(|i| pred.eval(&dst_chunk, i))
480 .collect();
481 let mut c = dst_chunk;
482 c.filter_sel(|i| keep[i]);
483 if c.live_len() == 0 {
484 continue;
485 }
486 c
487 } else {
488 dst_chunk
489 };
490
491 // ── MaterializeRows ───────────────────────────────────────────
492 let src_slot_col = src_chunk.find_column(crate::chunk::COL_ID_SLOT);
493 let dst_slot_col = dst_chunk.find_column(COL_ID_DST_SLOT);
494 let hop_src_col = dst_chunk.find_column(COL_ID_SRC_SLOT);
495
496 for row_idx in dst_chunk.live_rows() {
497 let dst_slot = dst_slot_col.map(|c| c.data[row_idx]).unwrap_or(0);
498 let hop_src_slot = hop_src_col.map(|c| c.data[row_idx]).unwrap_or(0);
499
500 // Tombstone checks.
501 let src_node = NodeId(((src_label_id as u64) << 32) | hop_src_slot);
502 let dst_node = NodeId(((dst_label_id as u64) << 32) | dst_slot);
503 if self.is_node_tombstoned(src_node) || self.is_node_tombstoned(dst_node) {
504 continue;
505 }
506
507 // Build src_props from the src_chunk (find the src slot row).
508 // The src_chunk row index = the physical index in the scan
509 // chunk that produced this hop. We locate it by matching
510 // hop_src_slot with the slot column.
511 let src_props = if let Some(sc) = src_slot_col {
512 // Find the src row by slot value.
513 let src_row = (0..sc.data.len()).find(|&i| sc.data[i] == hop_src_slot);
514 if let Some(src_ri) = src_row {
515 build_props_from_chunk(&src_chunk, src_ri, &col_ids_src)
516 } else {
517 // Fallback: read from store.
518 let nullable = self
519 .snapshot
520 .store
521 .get_node_raw_nullable(src_node, &col_ids_src)?;
522 nullable
523 .into_iter()
524 .filter_map(|(cid, opt)| opt.map(|v| (cid, v)))
525 .collect()
526 }
527 } else {
528 vec![]
529 };
530
531 let dst_props = build_props_from_chunk(&dst_chunk, row_idx, &col_ids_dst);
532
533 // Apply WHERE clause if present (covers complex predicates
534 // that couldn't be compiled into ChunkPredicate).
535 if let Some(ref where_expr) = m.where_clause {
536 // Determine actual src/dst variable names for row_vals.
537 let (actual_src_var, actual_dst_var) = if swapped {
538 (dst_node_pat.var.as_str(), src_node_pat.var.as_str())
539 } else {
540 (src_node_pat.var.as_str(), dst_node_pat.var.as_str())
541 };
542 let (actual_src_props, actual_dst_props) = if swapped {
543 (&dst_props, &src_props)
544 } else {
545 (&src_props, &dst_props)
546 };
547 let mut row_vals = build_row_vals(
548 actual_src_props,
549 actual_src_var,
550 &col_ids_src,
551 &self.snapshot.store,
552 );
553 row_vals.extend(build_row_vals(
554 actual_dst_props,
555 actual_dst_var,
556 &col_ids_dst,
557 &self.snapshot.store,
558 ));
559 row_vals.extend(self.dollar_params());
560 if !self.eval_where_graph(where_expr, &row_vals) {
561 continue;
562 }
563 }
564
565 // Project output row using existing hop-row helper.
566 let (proj_src_props, proj_dst_props) = if swapped {
567 (&dst_props as &[(u32, u64)], &src_props as &[(u32, u64)])
568 } else {
569 (&src_props as &[(u32, u64)], &dst_props as &[(u32, u64)])
570 };
571 let (proj_src_var, proj_dst_var, proj_src_label, proj_dst_label) = if swapped {
572 (
573 dst_node_pat.var.as_str(),
574 src_node_pat.var.as_str(),
575 dst_label.as_str(),
576 src_label.as_str(),
577 )
578 } else {
579 (
580 src_node_pat.var.as_str(),
581 dst_node_pat.var.as_str(),
582 src_label.as_str(),
583 dst_label.as_str(),
584 )
585 };
586
587 let row = project_hop_row(
588 proj_src_props,
589 proj_dst_props,
590 column_names,
591 proj_src_var,
592 proj_dst_var,
593 None, // no rel_var_type for Phase 2
594 Some((proj_src_var, proj_src_label)),
595 Some((proj_dst_var, proj_dst_label)),
596 &self.snapshot.store,
597 None, // no edge_props
598 );
599 rows.push(row);
600
601 // LIMIT short-circuit.
602 if let Some(lim) = limit {
603 if rows.len() >= lim {
604 break 'outer;
605 }
606 }
607 }
608 }
609 }
610
611 Ok(QueryResult {
612 columns: column_names.to_vec(),
613 rows,
614 })
615 }
616
617 /// Select a `ChunkedPlan` for the given `MatchStatement` (Phase 4, spec §2.3).
618 ///
619 /// Returns `Some(plan)` when the query shape maps to a known chunked fast-path,
620 /// or `None` when the row engine should handle it. The caller dispatches via
621 /// `match` — no further `can_use_*` calls are made after this returns.
622 ///
623 /// # Precedence
624 ///
625 /// MutualNeighbors is checked before TwoHop because it is a more specific
626 /// pattern (both endpoints bound) that would otherwise fall into TwoHop.
627 pub fn try_plan_chunked_match(&self, m: &MatchStatement) -> Option<ChunkedPlan> {
628 // MutualNeighbors is a specialised 2-hop shape — check first.
629 if self.can_use_mutual_neighbors_chunked(m) {
630 return Some(ChunkedPlan::MutualNeighbors);
631 }
632 if self.can_use_two_hop_chunked(m) {
633 return Some(ChunkedPlan::TwoHop);
634 }
635 if self.can_use_one_hop_chunked(m) {
636 return Some(ChunkedPlan::OneHop);
637 }
638 if self.can_use_chunked_pipeline(m) {
639 return Some(ChunkedPlan::Scan);
640 }
641 None
642 }
643
644 /// Return `true` when `m` qualifies for Phase 4 mutual-neighbors chunked execution.
645 ///
646 /// The mutual-neighbors pattern is:
647 /// ```cypher
648 /// MATCH (a:L)-[:R]->(x:L)<-[:R]-(b:L)
649 /// WHERE id(a) = $x AND id(b) = $y
650 /// RETURN x
651 /// ```
652 ///
653 /// # Guard (spec §5.2 hard gate)
654 ///
655 /// Must be strict:
656 /// - Exactly 2 nodes in each of 2 path patterns, OR exactly 3 nodes + 2 rels
657 /// with the middle node shared and direction fork (first hop Outgoing, second
658 /// hop Incoming or vice versa).
659 /// - Actually: we look for exactly 1 path pattern with 3 nodes + 2 rels where
660 /// hops are directed but in *opposite* directions (fork pattern).
661 /// - Both endpoint nodes must have exactly one bound-param `id()` filter.
662 /// - Same rel-type for both hops.
663 /// - Same label on all three nodes.
664 /// - No edge-property references.
665 /// - No aggregation, no ORDER BY, no DISTINCT.
666 pub(crate) fn can_use_mutual_neighbors_chunked(&self, m: &MatchStatement) -> bool {
667 use sparrowdb_cypher::ast::EdgeDir;
668
669 if !self.use_chunked_pipeline {
670 return false;
671 }
672 // Exactly 1 path pattern, 3 nodes, 2 rels.
673 if m.pattern.len() != 1 {
674 return false;
675 }
676 let pat = &m.pattern[0];
677 if pat.rels.len() != 2 || pat.nodes.len() != 3 {
678 return false;
679 }
680 // Fork pattern: first hop Outgoing, second hop Incoming (a→x←b).
681 if pat.rels[0].dir != EdgeDir::Outgoing || pat.rels[1].dir != EdgeDir::Incoming {
682 return false;
683 }
684 // No variable-length hops.
685 if pat.rels[0].min_hops.is_some() || pat.rels[1].min_hops.is_some() {
686 return false;
687 }
688 // Same rel-type for both hops (including both empty).
689 if pat.rels[0].rel_type != pat.rels[1].rel_type {
690 return false;
691 }
692 // All three nodes must have the same single label.
693 if pat.nodes[0].labels.len() != 1
694 || pat.nodes[1].labels.len() != 1
695 || pat.nodes[2].labels.len() != 1
696 {
697 return false;
698 }
699 if pat.nodes[0].labels[0] != pat.nodes[1].labels[0]
700 || pat.nodes[1].labels[0] != pat.nodes[2].labels[0]
701 {
702 return false;
703 }
704 // No aggregation.
705 if has_aggregate_in_return(&m.return_clause.items) {
706 return false;
707 }
708 // No DISTINCT.
709 if m.distinct {
710 return false;
711 }
712 // No ORDER BY.
713 if !m.order_by.is_empty() {
714 return false;
715 }
716 // No edge-property references.
717 for rel in &pat.rels {
718 if !rel.var.is_empty() {
719 let ref_in_return = m.return_clause.items.iter().any(|item| {
720 column_name_for_item(item)
721 .split_once('.')
722 .is_some_and(|(v, _)| v == rel.var.as_str())
723 });
724 if ref_in_return {
725 return false;
726 }
727 if let Some(ref wexpr) = m.where_clause {
728 if expr_references_var(wexpr, rel.var.as_str()) {
729 return false;
730 }
731 }
732 }
733 }
734 // Endpoint binding: either WHERE id(a)=$x AND id(b)=$y, or both
735 // endpoint nodes carry exactly one inline prop filter (e.g. {uid: 0}).
736 // The inline-prop form is the shape used by the Facebook benchmark Q8:
737 // MATCH (a:User {uid: X})-[:R]->(m)<-[:R]-(b:User {uid: Y}) RETURN m.uid
738 let a_var = pat.nodes[0].var.as_str();
739 let b_var = pat.nodes[2].var.as_str();
740 match m.where_clause.as_ref() {
741 None => {
742 // Accept inline-prop binding: each endpoint must carry exactly
743 // one prop filter so execute can scan for the matching slot.
744 let a_bound = pat.nodes[0].props.len() == 1;
745 let b_bound = pat.nodes[2].props.len() == 1;
746 if !a_bound || !b_bound {
747 return false;
748 }
749 }
750 Some(wexpr) => {
751 if !where_is_only_id_param_conjuncts(wexpr, a_var, b_var) {
752 return false;
753 }
754 }
755 }
756 // Rel table must exist.
757 let label = pat.nodes[0].labels[0].clone();
758 let rel_type = &pat.rels[0].rel_type;
759 let catalog = &self.snapshot.catalog;
760 let tables = catalog.list_rel_tables_with_ids();
761 let label_id_opt = catalog.get_label(&label).ok().flatten();
762 let label_id = match label_id_opt {
763 Some(id) => id as u32,
764 None => return false,
765 };
766 let has_table = tables.iter().any(|(_, sid, did, rt)| {
767 let type_ok = rel_type.is_empty() || rt == rel_type;
768 let endpoint_ok = *sid as u32 == label_id && *did as u32 == label_id;
769 type_ok && endpoint_ok
770 });
771 has_table
772 }
773
774 /// Execute the mutual-neighbors fast-path for the chunked pipeline (Phase 4).
775 ///
776 /// Pattern: `MATCH (a:L)-[:R]->(x:L)<-[:R]-(b:L) WHERE id(a)=$x AND id(b)=$y RETURN x`
777 ///
778 /// Algorithm:
779 /// 1. Resolve bound slot for `a` from `id(a) = $x` param.
780 /// 2. Resolve bound slot for `b` from `id(b) = $y` param.
781 /// 3. Expand outgoing neighbors of `a` into set A.
782 /// 4. Expand outgoing neighbors of `b` into set B.
783 /// 5. Intersect A ∩ B via `SlotIntersect` — produces sorted common neighbors.
784 /// 6. Materialise output rows from common neighbor slots.
785 pub(crate) fn execute_mutual_neighbors_chunked(
786 &self,
787 m: &MatchStatement,
788 column_names: &[String],
789 ) -> Result<QueryResult> {
790 let pat = &m.pattern[0];
791 let a_node_pat = &pat.nodes[0];
792 let x_node_pat = &pat.nodes[1];
793 let b_node_pat = &pat.nodes[2];
794
795 let label = a_node_pat.labels[0].clone();
796 let rel_type = pat.rels[0].rel_type.clone();
797
798 let label_id = match self.snapshot.catalog.get_label(&label)? {
799 Some(id) => id as u32,
800 None => {
801 return Ok(QueryResult {
802 columns: column_names.to_vec(),
803 rows: vec![],
804 });
805 }
806 };
807
808 // Resolve rel table ID.
809 let catalog_rel_id = self
810 .snapshot
811 .catalog
812 .list_rel_tables_with_ids()
813 .into_iter()
814 .find(|(_, sid, did, rt)| {
815 let type_ok = rel_type.is_empty() || rt == &rel_type;
816 let endpoint_ok = *sid as u32 == label_id && *did as u32 == label_id;
817 type_ok && endpoint_ok
818 })
819 .map(|(cid, _, _, _)| cid as u32)
820 .ok_or_else(|| {
821 sparrowdb_common::Error::InvalidArgument(
822 "no matching relationship table for mutual-neighbors".into(),
823 )
824 })?;
825
826 // Extract bound slots for a and b.
827 // Two supported forms:
828 // 1. WHERE id(a) = $x AND id(b) = $y — param-bound NodeId
829 // 2. Inline props on endpoint nodes — scan label for matching slot
830 let a_var = a_node_pat.var.as_str();
831 let b_var = b_node_pat.var.as_str();
832 let (a_slot_opt, b_slot_opt) = if m.where_clause.is_some() {
833 // Form 1: id() params.
834 (
835 extract_id_param_slot(m.where_clause.as_ref(), a_var, &self.params, label_id),
836 extract_id_param_slot(m.where_clause.as_ref(), b_var, &self.params, label_id),
837 )
838 } else {
839 // Form 2: inline props — scan the label to find matching slots.
840 let hwm = self.snapshot.store.hwm_for_label(label_id).unwrap_or(0);
841 let dollar_params = self.dollar_params();
842 let prop_idx = self.prop_index.borrow();
843 (
844 find_slot_by_props(
845 &self.snapshot.store,
846 label_id,
847 hwm,
848 &a_node_pat.props,
849 &dollar_params,
850 &prop_idx,
851 ),
852 find_slot_by_props(
853 &self.snapshot.store,
854 label_id,
855 hwm,
856 &b_node_pat.props,
857 &dollar_params,
858 &prop_idx,
859 ),
860 )
861 };
862
863 let (a_slot, b_slot) = match (a_slot_opt, b_slot_opt) {
864 (Some(a), Some(b)) => (a, b),
865 _ => {
866 // Endpoint not resolved — return empty.
867 return Ok(QueryResult {
868 columns: column_names.to_vec(),
869 rows: vec![],
870 });
871 }
872 };
873
874 // Cypher requires distinct node bindings; a node cannot be its own mutual
875 // neighbor. When both id() params resolve to the same slot the intersection
876 // would include `a` itself, which is semantically wrong — return empty.
877 if a_slot == b_slot {
878 return Ok(QueryResult {
879 columns: column_names.to_vec(),
880 rows: vec![],
881 });
882 }
883
884 tracing::debug!(
885 engine = "chunked",
886 plan = %ChunkedPlan::MutualNeighbors,
887 label = %label,
888 rel_type = %rel_type,
889 a_slot,
890 b_slot,
891 "executing via chunked pipeline"
892 );
893
894 let csr = self
895 .snapshot
896 .csrs
897 .get(&catalog_rel_id)
898 .cloned()
899 .unwrap_or_else(|| sparrowdb_storage::csr::CsrForward::build(0, &[]));
900
901 let delta_records = {
902 let edge_store = sparrowdb_storage::edge_store::EdgeStore::open(
903 &self.snapshot.db_root,
904 sparrowdb_storage::edge_store::RelTableId(catalog_rel_id),
905 );
906 edge_store.and_then(|s| s.read_delta()).unwrap_or_default()
907 };
908
909 // Build neighbor sets via GetNeighbors on single-slot sources.
910 let a_scan = ScanByLabel::from_slots(vec![a_slot]);
911 let a_neighbors = GetNeighbors::new(a_scan, csr.clone(), &delta_records, label_id, 8);
912
913 let b_scan = ScanByLabel::from_slots(vec![b_slot]);
914 let b_neighbors = GetNeighbors::new(b_scan, csr, &delta_records, label_id, 8);
915
916 // GetNeighbors emits (src_slot, dst_slot) pairs. We need dst_slot column.
917 // Wrap in an adaptor that projects COL_ID_DST_SLOT → COL_ID_SLOT.
918 let a_proj = DstSlotProjector::new(a_neighbors);
919 let b_proj = DstSlotProjector::new(b_neighbors);
920
921 // Intersect.
922 let spill_threshold = 64 * 1024; // 64 K entries before spill warning
923 let mut intersect =
924 SlotIntersect::new(a_proj, b_proj, COL_ID_SLOT, COL_ID_SLOT, spill_threshold);
925
926 // Collect common neighbor slots.
927 let mut common_slots: Vec<u64> = Vec::new();
928 while let Some(chunk) = intersect.next_chunk()? {
929 if let Some(col) = chunk.find_column(COL_ID_SLOT) {
930 for row_idx in chunk.live_rows() {
931 common_slots.push(col.data[row_idx]);
932 }
933 }
934 }
935
936 // Materialise output rows.
937 let x_var = x_node_pat.var.as_str();
938 let mut col_ids_x = collect_col_ids_for_var(x_var, column_names, label_id);
939 if let Some(ref wexpr) = m.where_clause {
940 collect_col_ids_from_expr_for_var(wexpr, x_var, &mut col_ids_x);
941 }
942 for p in &x_node_pat.props {
943 let cid = col_id_of(&p.key);
944 if !col_ids_x.contains(&cid) {
945 col_ids_x.push(cid);
946 }
947 }
948
949 let store_arc = Arc::new(sparrowdb_storage::node_store::NodeStore::open(
950 self.snapshot.store.root_path(),
951 )?);
952
953 let limit = m.limit.map(|l| l as usize);
954 let mut rows: Vec<Vec<Value>> = Vec::new();
955
956 'outer: for x_slot in common_slots {
957 let x_node_id = NodeId(((label_id as u64) << 32) | x_slot);
958
959 // Skip tombstoned common neighbors.
960 if self.is_node_tombstoned(x_node_id) {
961 continue;
962 }
963
964 // Read x properties.
965 let x_props: Vec<(u32, u64)> = if !col_ids_x.is_empty() {
966 let nullable = store_arc.batch_read_node_props_nullable(
967 label_id,
968 &[x_slot as u32],
969 &col_ids_x,
970 )?;
971 if nullable.is_empty() {
972 vec![]
973 } else {
974 col_ids_x
975 .iter()
976 .enumerate()
977 .filter_map(|(i, &cid)| nullable[0][i].map(|v| (cid, v)))
978 .collect()
979 }
980 } else {
981 vec![]
982 };
983
984 // Apply remaining WHERE predicates (e.g. x.prop filters).
985 if let Some(ref where_expr) = m.where_clause {
986 let mut row_vals =
987 build_row_vals(&x_props, x_var, &col_ids_x, &self.snapshot.store);
988 // Also inject a and b NodeRef for id() evaluation.
989 if !a_var.is_empty() {
990 let a_node_id = NodeId(((label_id as u64) << 32) | a_slot);
991 row_vals.insert(a_var.to_string(), Value::NodeRef(a_node_id));
992 }
993 if !b_var.is_empty() {
994 let b_node_id = NodeId(((label_id as u64) << 32) | b_slot);
995 row_vals.insert(b_var.to_string(), Value::NodeRef(b_node_id));
996 }
997 row_vals.extend(self.dollar_params());
998 if !self.eval_where_graph(where_expr, &row_vals) {
999 continue;
1000 }
1001 }
1002
1003 // Project output row.
1004 let row = project_row(
1005 &x_props,
1006 column_names,
1007 &col_ids_x,
1008 x_var,
1009 &label,
1010 &self.snapshot.store,
1011 Some(x_node_id),
1012 );
1013 rows.push(row);
1014
1015 if let Some(lim) = limit {
1016 if rows.len() >= lim {
1017 break 'outer;
1018 }
1019 }
1020 }
1021
1022 Ok(QueryResult {
1023 columns: column_names.to_vec(),
1024 rows,
1025 })
1026 }
1027
1028 /// Return `true` when `m` qualifies for Phase 3 two-hop chunked execution.
1029 ///
1030 /// Eligibility (spec §4.3):
1031 /// - `use_chunked_pipeline` flag is set.
1032 /// - Exactly 3 nodes, 2 relationships (two hops).
1033 /// - Both hops resolve to the **same relationship table**.
1034 /// - Both hops same direction (both Outgoing).
1035 /// - No `OPTIONAL MATCH`, no subquery in `WHERE`.
1036 /// - No aggregate, no `ORDER BY`, no `DISTINCT`.
1037 /// - No edge-property references in RETURN or WHERE.
1038 /// - No variable-length hops.
1039 pub(crate) fn can_use_two_hop_chunked(&self, m: &MatchStatement) -> bool {
1040 use sparrowdb_cypher::ast::EdgeDir;
1041
1042 if !self.use_chunked_pipeline {
1043 return false;
1044 }
1045 // Exactly 1 path pattern with 3 nodes and 2 rels.
1046 if m.pattern.len() != 1 {
1047 return false;
1048 }
1049 let pat = &m.pattern[0];
1050 if pat.rels.len() != 2 || pat.nodes.len() != 3 {
1051 return false;
1052 }
1053 // Both hops must be directed Outgoing (Phase 3 constraint).
1054 if pat.rels[0].dir != EdgeDir::Outgoing || pat.rels[1].dir != EdgeDir::Outgoing {
1055 return false;
1056 }
1057 // No variable-length hops.
1058 if pat.rels[0].min_hops.is_some() || pat.rels[1].min_hops.is_some() {
1059 return false;
1060 }
1061 // No aggregation.
1062 if has_aggregate_in_return(&m.return_clause.items) {
1063 return false;
1064 }
1065 // No DISTINCT.
1066 if m.distinct {
1067 return false;
1068 }
1069 // No ORDER BY.
1070 if !m.order_by.is_empty() {
1071 return false;
1072 }
1073 // No edge-property references.
1074 for rel in &pat.rels {
1075 if !rel.var.is_empty() {
1076 let ref_in_return = m.return_clause.items.iter().any(|item| {
1077 column_name_for_item(item)
1078 .split_once('.')
1079 .is_some_and(|(v, _)| v == rel.var.as_str())
1080 });
1081 if ref_in_return {
1082 return false;
1083 }
1084 if let Some(ref wexpr) = m.where_clause {
1085 if expr_references_var(wexpr, rel.var.as_str()) {
1086 return false;
1087 }
1088 }
1089 }
1090 }
1091 // Only simple WHERE predicates.
1092 if let Some(ref wexpr) = m.where_clause {
1093 if !is_simple_where_for_chunked(wexpr) {
1094 return false;
1095 }
1096 }
1097 // Inline prop filters on node patterns are not evaluated by the chunked
1098 // two-hop path — fall back to the row engine. (See #362.)
1099 if pat.nodes.iter().any(|n| !n.props.is_empty()) {
1100 return false;
1101 }
1102 // Both hops must resolve to the same relationship table.
1103 let src_label = pat.nodes[0].labels.first().cloned().unwrap_or_default();
1104 let mid_label = pat.nodes[1].labels.first().cloned().unwrap_or_default();
1105 let dst_label = pat.nodes[2].labels.first().cloned().unwrap_or_default();
1106 let rel_type1 = &pat.rels[0].rel_type;
1107 let rel_type2 = &pat.rels[1].rel_type;
1108
1109 // Both rel types must be identical (including both-empty).
1110 // Allowing one empty + one non-empty would silently ignore the typed hop
1111 // in execute_two_hop_chunked which only uses rels[0].rel_type.
1112 if rel_type1 != rel_type2 {
1113 return false;
1114 }
1115
1116 // Resolve the shared rel table: src→mid and mid→dst must map to same table.
1117 let catalog = &self.snapshot.catalog;
1118 let tables = catalog.list_rel_tables_with_ids();
1119
1120 let hop1_matches: Vec<_> = tables
1121 .iter()
1122 .filter(|(_, sid, did, rt)| {
1123 let type_ok = rel_type1.is_empty() || rt == rel_type1;
1124 let src_ok = catalog
1125 .get_label(&src_label)
1126 .ok()
1127 .flatten()
1128 .map(|id| id as u32 == *sid as u32)
1129 .unwrap_or(false);
1130 let mid_ok = catalog
1131 .get_label(&mid_label)
1132 .ok()
1133 .flatten()
1134 .map(|id| id as u32 == *did as u32)
1135 .unwrap_or(false);
1136 type_ok && src_ok && mid_ok
1137 })
1138 .collect();
1139
1140 // Only enter chunked path if there is exactly one matching rel table.
1141 let n_tables = hop1_matches.len();
1142 if n_tables != 1 {
1143 return false;
1144 }
1145
1146 let hop2_id = tables.iter().find(|(_, sid, did, rt)| {
1147 let type_ok = rel_type2.is_empty() || rt == rel_type2;
1148 let mid_ok = catalog
1149 .get_label(&mid_label)
1150 .ok()
1151 .flatten()
1152 .map(|id| id as u32 == *sid as u32)
1153 .unwrap_or(false);
1154 let dst_ok = catalog
1155 .get_label(&dst_label)
1156 .ok()
1157 .flatten()
1158 .map(|id| id as u32 == *did as u32)
1159 .unwrap_or(false);
1160 type_ok && mid_ok && dst_ok
1161 });
1162
1163 // Both hops must resolve, and to the same table.
1164 match (hop1_matches.first(), hop2_id) {
1165 (Some((id1, _, _, _)), Some((id2, _, _, _))) => id1 == id2,
1166 _ => false,
1167 }
1168 }
1169
1170 /// Execute a 2-hop query using the Phase 3 chunked pipeline.
1171 ///
1172 /// Pipeline shape (spec §4.3, same-rel 2-hop):
1173 /// ```text
1174 /// MaterializeRows(limit?)
1175 /// <- optional Filter(ChunkPredicate, dst)
1176 /// <- ReadNodeProps(dst) [only if dst props referenced]
1177 /// <- GetNeighbors(hop2, mid_label) [second hop]
1178 /// <- optional Filter(ChunkPredicate, mid) [intermediate predicates]
1179 /// <- ReadNodeProps(mid) [only if mid props referenced in WHERE]
1180 /// <- GetNeighbors(hop1, src_label) [first hop]
1181 /// <- optional Filter(ChunkPredicate, src)
1182 /// <- ReadNodeProps(src) [only if src props referenced]
1183 /// <- ScanByLabel(hwm)
1184 /// ```
1185 ///
1186 /// Memory-limit enforcement: if the accumulated output row count in bytes
1187 /// exceeds `self.memory_limit_bytes`, returns `Error::QueryMemoryExceeded`.
1188 ///
1189 /// Path multiplicity: duplicate destination slots from distinct source paths
1190 /// are emitted as distinct output rows (no implicit dedup — spec §4.1).
1191 pub(crate) fn execute_two_hop_chunked(
1192 &self,
1193 m: &MatchStatement,
1194 column_names: &[String],
1195 ) -> Result<QueryResult> {
1196 use sparrowdb_common::Error as DbError;
1197
1198 let pat = &m.pattern[0];
1199 let src_node_pat = &pat.nodes[0];
1200 let mid_node_pat = &pat.nodes[1];
1201 let dst_node_pat = &pat.nodes[2];
1202
1203 let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
1204 let mid_label = mid_node_pat.labels.first().cloned().unwrap_or_default();
1205 let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
1206 let rel_type = pat.rels[0].rel_type.clone();
1207
1208 // Resolve label IDs.
1209 let src_label_id = match self.snapshot.catalog.get_label(&src_label)? {
1210 Some(id) => id as u32,
1211 None => {
1212 return Ok(QueryResult {
1213 columns: column_names.to_vec(),
1214 rows: vec![],
1215 });
1216 }
1217 };
1218 let mid_label_id = if mid_label.is_empty() {
1219 src_label_id
1220 } else {
1221 match self.snapshot.catalog.get_label(&mid_label)? {
1222 Some(id) => id as u32,
1223 None => {
1224 return Ok(QueryResult {
1225 columns: column_names.to_vec(),
1226 rows: vec![],
1227 });
1228 }
1229 }
1230 };
1231 let dst_label_id = match self.snapshot.catalog.get_label(&dst_label)? {
1232 Some(id) => id as u32,
1233 None => {
1234 return Ok(QueryResult {
1235 columns: column_names.to_vec(),
1236 rows: vec![],
1237 });
1238 }
1239 };
1240
1241 // Resolve the shared rel table ID.
1242 let catalog_rel_id = self
1243 .snapshot
1244 .catalog
1245 .list_rel_tables_with_ids()
1246 .into_iter()
1247 .find(|(_, sid, did, rt)| {
1248 let type_ok = rel_type.is_empty() || rt == &rel_type;
1249 let src_ok = *sid as u32 == src_label_id;
1250 let mid_ok = *did as u32 == mid_label_id;
1251 type_ok && src_ok && mid_ok
1252 })
1253 .map(|(cid, _, _, _)| cid as u32)
1254 .ok_or_else(|| {
1255 sparrowdb_common::Error::InvalidArgument(
1256 "no matching relationship table found for 2-hop".into(),
1257 )
1258 })?;
1259
1260 let hwm_src = self.snapshot.store.hwm_for_label(src_label_id).unwrap_or(0);
1261 let hwm_dst = self.snapshot.store.hwm_for_label(dst_label_id).unwrap_or(0);
1262 tracing::debug!(
1263 engine = "chunked",
1264 src_label = %src_label,
1265 mid_label = %mid_label,
1266 dst_label = %dst_label,
1267 rel_type = %rel_type,
1268 hwm_src,
1269 hwm_dst,
1270 "executing via chunked pipeline (2-hop)"
1271 );
1272
1273 // Variable names from the query.
1274 let src_var = src_node_pat.var.as_str();
1275 let mid_var = mid_node_pat.var.as_str();
1276 let dst_var = dst_node_pat.var.as_str();
1277
1278 // Collect property col_ids needed for each node.
1279 // Late materialization: only read what WHERE or RETURN references.
1280 let mut col_ids_src = collect_col_ids_for_var(src_var, column_names, src_label_id);
1281 let mut col_ids_dst = collect_col_ids_for_var(dst_var, column_names, dst_label_id);
1282
1283 // Mid node properties: only needed if WHERE references them.
1284 let mut col_ids_mid: Vec<u32> = vec![];
1285
1286 if let Some(ref wexpr) = m.where_clause {
1287 collect_col_ids_from_expr_for_var(wexpr, src_var, &mut col_ids_src);
1288 collect_col_ids_from_expr_for_var(wexpr, dst_var, &mut col_ids_dst);
1289 collect_col_ids_from_expr_for_var(wexpr, mid_var, &mut col_ids_mid);
1290 }
1291 // Inline prop filters.
1292 for p in &src_node_pat.props {
1293 let cid = sparrowdb_common::col_id_of(&p.key);
1294 if !col_ids_src.contains(&cid) {
1295 col_ids_src.push(cid);
1296 }
1297 }
1298 for p in &mid_node_pat.props {
1299 let cid = sparrowdb_common::col_id_of(&p.key);
1300 if !col_ids_mid.contains(&cid) {
1301 col_ids_mid.push(cid);
1302 }
1303 }
1304 for p in &dst_node_pat.props {
1305 let cid = sparrowdb_common::col_id_of(&p.key);
1306 if !col_ids_dst.contains(&cid) {
1307 col_ids_dst.push(cid);
1308 }
1309 }
1310 // If mid var is referenced in RETURN, read those props too.
1311 if !mid_var.is_empty() {
1312 let mid_return_ids = collect_col_ids_for_var(mid_var, column_names, mid_label_id);
1313 for cid in mid_return_ids {
1314 if !col_ids_mid.contains(&cid) {
1315 col_ids_mid.push(cid);
1316 }
1317 }
1318 }
1319
1320 // Build delta index for this rel table.
1321 let delta_records = {
1322 let edge_store = sparrowdb_storage::edge_store::EdgeStore::open(
1323 &self.snapshot.db_root,
1324 sparrowdb_storage::edge_store::RelTableId(catalog_rel_id),
1325 );
1326 edge_store.and_then(|s| s.read_delta()).unwrap_or_default()
1327 };
1328
1329 // Get CSR for the shared rel table.
1330 let csr = self
1331 .snapshot
1332 .csrs
1333 .get(&catalog_rel_id)
1334 .cloned()
1335 .unwrap_or_else(|| sparrowdb_storage::csr::CsrForward::build(0, &[]));
1336
1337 let avg_degree_hint = self
1338 .snapshot
1339 .rel_degree_stats()
1340 .get(&catalog_rel_id)
1341 .map(|s| s.mean().ceil() as usize)
1342 .unwrap_or(8);
1343
1344 // Compile WHERE predicates.
1345 let src_pred_opt = m
1346 .where_clause
1347 .as_ref()
1348 .and_then(|wexpr| try_compile_predicate(wexpr, src_var, &col_ids_src));
1349 let mid_pred_opt = m
1350 .where_clause
1351 .as_ref()
1352 .and_then(|wexpr| try_compile_predicate(wexpr, mid_var, &col_ids_mid));
1353 let dst_pred_opt = m
1354 .where_clause
1355 .as_ref()
1356 .and_then(|wexpr| try_compile_predicate(wexpr, dst_var, &col_ids_dst));
1357
1358 let store_arc = Arc::new(sparrowdb_storage::node_store::NodeStore::open(
1359 self.snapshot.store.root_path(),
1360 )?);
1361
1362 let limit = m.limit.map(|l| l as usize);
1363 let memory_limit = self.memory_limit_bytes;
1364 let mut rows: Vec<Vec<Value>> = Vec::new();
1365
1366 // ── BfsArena: reused across both hops ────────────────────────────────
1367 //
1368 // BfsArena replaces the old FrontierScratch + per-chunk HashSet dedup
1369 // pattern. It pairs a double-buffer frontier with a flat bitvector for
1370 // O(1) visited-set membership testing — no per-chunk HashSet allocation.
1371 // arena.clear() only zeroes modified bitvector words (O(dirty)), not
1372 // the full pre-allocated bitvector.
1373 let node_capacity = (hwm_src.max(hwm_dst) as usize).max(64);
1374 let mut frontier = BfsArena::new(
1375 avg_degree_hint * (crate::chunk::CHUNK_CAPACITY / 2),
1376 node_capacity,
1377 );
1378
1379 // ── Memory-limit tracking ─────────────────────────────────────────────
1380 // We track accumulated output rows as a proxy for memory usage.
1381 // Each output row is estimated as column_names.len() * 16 bytes.
1382 let row_size_estimate = column_names.len().max(1) * 16;
1383
1384 let mut scan = ScanByLabel::new(hwm_src);
1385
1386 'outer: while let Some(scan_chunk) = scan.next_chunk()? {
1387 // ── ReadNodeProps(src) ────────────────────────────────────────────
1388 let src_chunk = if !col_ids_src.is_empty() {
1389 let mut rnp = ReadNodeProps::new(
1390 SingleChunkSource::new(scan_chunk),
1391 Arc::clone(&store_arc),
1392 src_label_id,
1393 crate::chunk::COL_ID_SLOT,
1394 col_ids_src.clone(),
1395 );
1396 match rnp.next_chunk()? {
1397 Some(c) => c,
1398 None => continue,
1399 }
1400 } else {
1401 scan_chunk
1402 };
1403
1404 // ── Filter(src) ───────────────────────────────────────────────────
1405 let src_chunk = if let Some(ref pred) = src_pred_opt {
1406 let pred = pred.clone();
1407 let keep: Vec<bool> = (0..src_chunk.len())
1408 .map(|i| pred.eval(&src_chunk, i))
1409 .collect();
1410 let mut c = src_chunk;
1411 c.filter_sel(|i| keep[i]);
1412 if c.live_len() == 0 {
1413 continue;
1414 }
1415 c
1416 } else {
1417 src_chunk
1418 };
1419
1420 // ── Hop 1: GetNeighbors(src → mid) ────────────────────────────────
1421 let mut gn1 = GetNeighbors::new(
1422 SingleChunkSource::new(src_chunk.clone()),
1423 csr.clone(),
1424 &delta_records,
1425 src_label_id,
1426 avg_degree_hint,
1427 );
1428
1429 // For each hop-1 output chunk: (src_slot, mid_slot) pairs.
1430 while let Some(hop1_chunk) = gn1.next_chunk()? {
1431 // Reset the BfsArena for this hop-1 chunk. clear() is O(1)
1432 // amortized — no allocations, just length resets + bitmap clear.
1433 // Must happen BEFORE the memory-limit check so frontier.bytes_used()
1434 // reflects the cleared (zero) state rather than the previous iteration.
1435 frontier.clear();
1436
1437 // Memory-limit check: check after each hop-1 chunk.
1438 // frontier.bytes_used() is 0 after clear(), so this measures row
1439 // accumulation only.
1440 let accum_bytes = rows.len() * row_size_estimate + frontier.bytes_used();
1441 if accum_bytes > memory_limit {
1442 return Err(DbError::QueryMemoryExceeded);
1443 }
1444
1445 // ── ReadNodeProps(mid) — only if WHERE references mid ─────────
1446 let mid_chunk = if !col_ids_mid.is_empty() {
1447 let mut rnp = ReadNodeProps::new(
1448 SingleChunkSource::new(hop1_chunk),
1449 Arc::clone(&store_arc),
1450 mid_label_id,
1451 COL_ID_DST_SLOT,
1452 col_ids_mid.clone(),
1453 );
1454 match rnp.next_chunk()? {
1455 Some(c) => c,
1456 None => continue,
1457 }
1458 } else {
1459 hop1_chunk
1460 };
1461
1462 // ── Filter(mid) — intermediate hop predicate ─────────────────
1463 let mid_chunk = if let Some(ref pred) = mid_pred_opt {
1464 let pred = pred.clone();
1465 let keep: Vec<bool> = (0..mid_chunk.len())
1466 .map(|i| pred.eval(&mid_chunk, i))
1467 .collect();
1468 let mut c = mid_chunk;
1469 c.filter_sel(|i| keep[i]);
1470 if c.live_len() == 0 {
1471 continue;
1472 }
1473 c
1474 } else {
1475 mid_chunk
1476 };
1477
1478 // ── Hop 2: GetNeighbors(mid → dst) ────────────────────────────
1479 let mid_slot_col = mid_chunk.find_column(COL_ID_DST_SLOT);
1480 let hop1_src_col = mid_chunk.find_column(COL_ID_SRC_SLOT);
1481
1482 // Collect (src_slot, mid_slot) pairs for live mid rows.
1483 let live_pairs: Vec<(u64, u64)> = mid_chunk
1484 .live_rows()
1485 .map(|row_idx| {
1486 let mid_slot = mid_slot_col.map(|c| c.data[row_idx]).unwrap_or(0);
1487 let src_slot = hop1_src_col.map(|c| c.data[row_idx]).unwrap_or(0);
1488 (src_slot, mid_slot)
1489 })
1490 .collect();
1491
1492 // Populate BfsArena.current with DEDUPLICATED mid slots for hop-2.
1493 // Deduplication prevents GetNeighbors from expanding the same mid
1494 // node multiple times (once per source path through it), which would
1495 // produce N^2 output rows instead of N.
1496 // Path multiplicity is preserved by iterating ALL live_pairs at
1497 // materialization time — we emit one row per distinct (src, mid, dst)
1498 // triple, which is the correct semantics.
1499 //
1500 // arena.visit() uses a RoaringBitmap for O(1) membership checks,
1501 // eliminating the per-chunk HashSet allocation of the old approach.
1502 for &(_, mid_slot) in &live_pairs {
1503 if frontier.visit(mid_slot) {
1504 frontier.current_mut().push(mid_slot);
1505 }
1506 }
1507
1508 // Use BfsArena.current as input to GetNeighbors.
1509 // Build a ScanByLabel-equivalent from deduplicated mid slots.
1510 let mid_slots_chunk = {
1511 let data: Vec<u64> = frontier.current().to_vec();
1512 let col =
1513 crate::chunk::ColumnVector::from_data(crate::chunk::COL_ID_SLOT, data);
1514 DataChunk::from_columns(vec![col])
1515 };
1516
1517 let mut gn2 = GetNeighbors::new(
1518 SingleChunkSource::new(mid_slots_chunk),
1519 csr.clone(),
1520 &delta_records,
1521 mid_label_id,
1522 avg_degree_hint,
1523 );
1524
1525 while let Some(hop2_chunk) = gn2.next_chunk()? {
1526 // hop2_chunk: (mid_slot=COL_ID_SRC_SLOT, dst_slot=COL_ID_DST_SLOT)
1527
1528 // ── ReadNodeProps(dst) ────────────────────────────────────
1529 let dst_chunk = if !col_ids_dst.is_empty() {
1530 let mut rnp = ReadNodeProps::new(
1531 SingleChunkSource::new(hop2_chunk),
1532 Arc::clone(&store_arc),
1533 dst_label_id,
1534 COL_ID_DST_SLOT,
1535 col_ids_dst.clone(),
1536 );
1537 match rnp.next_chunk()? {
1538 Some(c) => c,
1539 None => continue,
1540 }
1541 } else {
1542 hop2_chunk
1543 };
1544
1545 // ── Filter(dst) ───────────────────────────────────────────
1546 let dst_chunk = if let Some(ref pred) = dst_pred_opt {
1547 let pred = pred.clone();
1548 let keep: Vec<bool> = (0..dst_chunk.len())
1549 .map(|i| pred.eval(&dst_chunk, i))
1550 .collect();
1551 let mut c = dst_chunk;
1552 c.filter_sel(|i| keep[i]);
1553 if c.live_len() == 0 {
1554 continue;
1555 }
1556 c
1557 } else {
1558 dst_chunk
1559 };
1560
1561 // ── MaterializeRows ───────────────────────────────────────
1562 // For each live (mid_slot, dst_slot) pair, walk backwards
1563 // through live_pairs to find all (src_slot, mid_slot) pairs,
1564 // emitting one row per (src, mid, dst) path.
1565 let hop2_src_col = dst_chunk.find_column(COL_ID_SRC_SLOT); // mid_slot
1566 let dst_slot_col = dst_chunk.find_column(COL_ID_DST_SLOT);
1567
1568 let src_slot_col_in_scan = src_chunk.find_column(crate::chunk::COL_ID_SLOT);
1569
1570 // Build slot→row-index maps once before the triple loop to
1571 // avoid O(N) linear scans per output row (WARNING 2).
1572 let src_index: std::collections::HashMap<u64, usize> = src_slot_col_in_scan
1573 .map(|sc| (0..sc.data.len()).map(|i| (sc.data[i], i)).collect())
1574 .unwrap_or_default();
1575
1576 let mid_index: std::collections::HashMap<u64, usize> = {
1577 let mid_slot_col_in_mid = mid_chunk.find_column(COL_ID_DST_SLOT);
1578 mid_slot_col_in_mid
1579 .map(|mc| (0..mc.data.len()).map(|i| (mc.data[i], i)).collect())
1580 .unwrap_or_default()
1581 };
1582
1583 for row_idx in dst_chunk.live_rows() {
1584 let dst_slot = dst_slot_col.map(|c| c.data[row_idx]).unwrap_or(0);
1585 let via_mid_slot = hop2_src_col.map(|c| c.data[row_idx]).unwrap_or(0);
1586
1587 // Find all (src, mid) pairs whose mid == via_mid_slot.
1588 for &(src_slot, mid_slot) in &live_pairs {
1589 if mid_slot != via_mid_slot {
1590 continue;
1591 }
1592
1593 // Path multiplicity: each (src, mid, dst) triple is
1594 // a distinct path — emit as a distinct row (no dedup).
1595 let src_node = NodeId(((src_label_id as u64) << 32) | src_slot);
1596 let mid_node = NodeId(((mid_label_id as u64) << 32) | mid_slot);
1597 let dst_node = NodeId(((dst_label_id as u64) << 32) | dst_slot);
1598
1599 // Tombstone checks.
1600 if self.is_node_tombstoned(src_node)
1601 || self.is_node_tombstoned(mid_node)
1602 || self.is_node_tombstoned(dst_node)
1603 {
1604 continue;
1605 }
1606
1607 // Read src props (from scan chunk, using pre-built index).
1608 let src_props = if src_slot_col_in_scan.is_some() {
1609 if let Some(&src_ri) = src_index.get(&src_slot) {
1610 build_props_from_chunk(&src_chunk, src_ri, &col_ids_src)
1611 } else {
1612 let nullable = self
1613 .snapshot
1614 .store
1615 .get_node_raw_nullable(src_node, &col_ids_src)?;
1616 nullable
1617 .into_iter()
1618 .filter_map(|(cid, opt)| opt.map(|v| (cid, v)))
1619 .collect()
1620 }
1621 } else {
1622 vec![]
1623 };
1624
1625 // Read mid props (from mid_chunk, using pre-built index).
1626 let mid_props: Vec<(u32, u64)> = if !col_ids_mid.is_empty() {
1627 if let Some(&mid_ri) = mid_index.get(&mid_slot) {
1628 build_props_from_chunk(&mid_chunk, mid_ri, &col_ids_mid)
1629 } else {
1630 let nullable = self
1631 .snapshot
1632 .store
1633 .get_node_raw_nullable(mid_node, &col_ids_mid)?;
1634 nullable
1635 .into_iter()
1636 .filter_map(|(cid, opt)| opt.map(|v| (cid, v)))
1637 .collect()
1638 }
1639 } else {
1640 vec![]
1641 };
1642
1643 // Read dst props (from dst_chunk).
1644 let dst_props =
1645 build_props_from_chunk(&dst_chunk, row_idx, &col_ids_dst);
1646
1647 // Apply WHERE clause (fallback for complex predicates).
1648 if let Some(ref where_expr) = m.where_clause {
1649 let mut row_vals = build_row_vals(
1650 &src_props,
1651 src_var,
1652 &col_ids_src,
1653 &self.snapshot.store,
1654 );
1655 row_vals.extend(build_row_vals(
1656 &mid_props,
1657 mid_var,
1658 &col_ids_mid,
1659 &self.snapshot.store,
1660 ));
1661 row_vals.extend(build_row_vals(
1662 &dst_props,
1663 dst_var,
1664 &col_ids_dst,
1665 &self.snapshot.store,
1666 ));
1667 row_vals.extend(self.dollar_params());
1668 if !self.eval_where_graph(where_expr, &row_vals) {
1669 continue;
1670 }
1671 }
1672
1673 // Project output row using existing three-var helper.
1674 let row = project_three_var_row(
1675 &src_props,
1676 &mid_props,
1677 &dst_props,
1678 column_names,
1679 src_var,
1680 mid_var,
1681 &self.snapshot.store,
1682 );
1683 rows.push(row);
1684
1685 // Memory-limit check on accumulated output.
1686 if rows.len() * row_size_estimate > memory_limit {
1687 return Err(DbError::QueryMemoryExceeded);
1688 }
1689
1690 // LIMIT short-circuit.
1691 if let Some(lim) = limit {
1692 if rows.len() >= lim {
1693 break 'outer;
1694 }
1695 }
1696 }
1697 }
1698 }
1699 }
1700 }
1701
1702 Ok(QueryResult {
1703 columns: column_names.to_vec(),
1704 rows,
1705 })
1706 }
1707
1708 /// Execute a simple label scan using the Phase 1 chunked pipeline.
1709 ///
1710 /// The pipeline emits slot numbers in `CHUNK_CAPACITY`-sized batches via
1711 /// `ScanByLabel`. For each chunk we apply inline-prop filters and the WHERE
1712 /// clause row-at-a-time (same semantics as the row-at-a-time engine) and
1713 /// batch-read the RETURN properties.
1714 ///
1715 /// Phase 2 will replace the per-row property reads with bulk columnar reads
1716 /// and evaluate the WHERE clause column-at-a-time.
1717 pub(crate) fn execute_scan_chunked(
1718 &self,
1719 m: &MatchStatement,
1720 column_names: &[String],
1721 ) -> Result<QueryResult> {
1722 use crate::pipeline::PipelineOperator;
1723
1724 let pat = &m.pattern[0];
1725 let node = &pat.nodes[0];
1726 let label = node.labels.first().cloned().unwrap_or_default();
1727
1728 // Unknown label → 0 rows (standard Cypher semantics, matches row-at-a-time).
1729 let label_id = match self.snapshot.catalog.get_label(&label)? {
1730 Some(id) => id as u32,
1731 None => {
1732 return Ok(QueryResult {
1733 columns: column_names.to_vec(),
1734 rows: vec![],
1735 });
1736 }
1737 };
1738
1739 let hwm = self.snapshot.store.hwm_for_label(label_id)?;
1740 tracing::debug!(label = %label, hwm = hwm, "chunked pipeline: label scan");
1741
1742 // Collect all col_ids needed (RETURN + WHERE + inline prop filters).
1743 let mut all_col_ids: Vec<u32> = collect_col_ids_from_columns(column_names);
1744 if let Some(ref wexpr) = m.where_clause {
1745 collect_col_ids_from_expr(wexpr, &mut all_col_ids);
1746 }
1747 for p in &node.props {
1748 let cid = col_id_of(&p.key);
1749 if !all_col_ids.contains(&cid) {
1750 all_col_ids.push(cid);
1751 }
1752 }
1753
1754 let var_name = node.var.as_str();
1755 let mut rows: Vec<Vec<Value>> = Vec::new();
1756
1757 // ── Drive the ScanByLabel pipeline ───────────────────────────────────
1758 //
1759 // Phase 1: the scan is purely over slot indices (0..hwm). Property
1760 // reads happen per live-slot inside this loop. Phase 2 will push the
1761 // reads into the pipeline operators themselves.
1762
1763 let mut scan = ScanByLabel::new(hwm);
1764
1765 while let Some(chunk) = scan.next_chunk()? {
1766 // Process each slot in this chunk.
1767 for row_idx in chunk.live_rows() {
1768 let slot = chunk.column(0).data[row_idx];
1769 let node_id = NodeId(((label_id as u64) << 32) | slot);
1770
1771 // Skip tombstoned nodes (same as row-at-a-time engine).
1772 if self.is_node_tombstoned(node_id) {
1773 continue;
1774 }
1775
1776 // Read properties needed for filter and projection.
1777 let nullable_props = self
1778 .snapshot
1779 .store
1780 .get_node_raw_nullable(node_id, &all_col_ids)?;
1781 let props: Vec<(u32, u64)> = nullable_props
1782 .iter()
1783 .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
1784 .collect();
1785
1786 // Apply inline prop filter.
1787 if !self.matches_prop_filter(&props, &node.props) {
1788 continue;
1789 }
1790
1791 // Apply WHERE clause.
1792 if let Some(ref where_expr) = m.where_clause {
1793 let mut row_vals =
1794 build_row_vals(&props, var_name, &all_col_ids, &self.snapshot.store);
1795 if !var_name.is_empty() && !label.is_empty() {
1796 row_vals.insert(
1797 format!("{}.__labels__", var_name),
1798 Value::List(vec![Value::String(label.clone())]),
1799 );
1800 }
1801 if !var_name.is_empty() {
1802 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
1803 }
1804 row_vals.extend(self.dollar_params());
1805 if !self.eval_where_graph(where_expr, &row_vals) {
1806 continue;
1807 }
1808 }
1809
1810 // Project RETURN columns.
1811 let row = project_row(
1812 &props,
1813 column_names,
1814 &all_col_ids,
1815 var_name,
1816 &label,
1817 &self.snapshot.store,
1818 Some(node_id),
1819 );
1820 rows.push(row);
1821 }
1822 }
1823
1824 Ok(QueryResult {
1825 columns: column_names.to_vec(),
1826 rows,
1827 })
1828 }
1829}
1830
1831// ── SingleChunkSource ─────────────────────────────────────────────────────────
1832
1833/// A one-shot `PipelineOperator` that yields a single pre-built `DataChunk`.
1834///
1835/// Used to wrap an existing chunk so it can be passed to operators that expect
1836/// a child `PipelineOperator`. After the chunk is consumed, returns `None`.
1837struct SingleChunkSource {
1838 chunk: Option<DataChunk>,
1839}
1840
1841impl SingleChunkSource {
1842 fn new(chunk: DataChunk) -> Self {
1843 SingleChunkSource { chunk: Some(chunk) }
1844 }
1845}
1846
1847impl PipelineOperator for SingleChunkSource {
1848 fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
1849 Ok(self.chunk.take())
1850 }
1851}
1852
1853// ── Free helpers used by execute_one_hop_chunked ──────────────────────────────
1854
1855/// Extract the column name for a `ReturnItem`.
1856fn column_name_for_item(item: &ReturnItem) -> String {
1857 if let Some(ref alias) = item.alias {
1858 return alias.clone();
1859 }
1860 // Fallback: render the expr as a rough string.
1861 match &item.expr {
1862 Expr::PropAccess { var, prop } => format!("{}.{}", var, prop),
1863 Expr::Var(v) => v.clone(),
1864 _ => String::new(),
1865 }
1866}
1867
1868/// Returns `true` when the WHERE expression can be fully handled by the chunked
1869/// pipeline (either compiled into `ChunkPredicate` or evaluated via the fallback
1870/// row-vals path — which covers all simple property predicates).
1871///
1872/// Returns `false` for CONTAINS/STARTS WITH/EXISTS/subquery shapes that would
1873/// require the full row-engine evaluator in a way that the chunked path can't
1874/// trivially support at the sink.
1875/// Returns `true` if `expr` contains any `var.prop` access for the given variable name.
1876///
1877/// Used to guard the chunked path against edge-property predicates in WHERE:
1878/// `WHERE r.weight > 5` must fall back to the row engine because the chunked
1879/// materializer does not populate edge-property row_vals, which would silently
1880/// return zero results rather than the correct filtered set.
1881fn expr_references_var(expr: &Expr, var_name: &str) -> bool {
1882 match expr {
1883 Expr::PropAccess { var, .. } => var.as_str() == var_name,
1884 Expr::BinOp { left, right, .. } => {
1885 expr_references_var(left, var_name) || expr_references_var(right, var_name)
1886 }
1887 Expr::And(a, b) | Expr::Or(a, b) => {
1888 expr_references_var(a, var_name) || expr_references_var(b, var_name)
1889 }
1890 Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
1891 expr_references_var(inner, var_name)
1892 }
1893 _ => false,
1894 }
1895}
1896
1897fn is_simple_where_for_chunked(expr: &Expr) -> bool {
1898 match expr {
1899 Expr::BinOp { left, op, right } => {
1900 match op {
1901 // These require text-index support or are unsafe to fallback.
1902 BinOpKind::Contains | BinOpKind::StartsWith | BinOpKind::EndsWith => false,
1903 _ => is_simple_where_for_chunked(left) && is_simple_where_for_chunked(right),
1904 }
1905 }
1906 Expr::And(a, b) | Expr::Or(a, b) => {
1907 is_simple_where_for_chunked(a) && is_simple_where_for_chunked(b)
1908 }
1909 Expr::Not(inner) => is_simple_where_for_chunked(inner),
1910 Expr::IsNull(_) | Expr::IsNotNull(_) => true,
1911 Expr::PropAccess { .. } | Expr::Var(_) | Expr::Literal(_) => true,
1912 // Subqueries, EXISTS, function calls → fall back to row engine.
1913 Expr::ExistsSubquery(_) | Expr::NotExists(_) | Expr::FnCall { .. } => false,
1914 _ => true,
1915 }
1916}
1917
1918/// Try to compile a simple WHERE expression for `var_name` into a `ChunkPredicate`.
1919///
1920/// Only handles `var.prop op literal` patterns. Returns `None` when the
1921/// expression references multiple variables or uses unsupported operators,
1922/// in which case the row-vals fallback path in the materializer handles it.
1923fn try_compile_predicate(expr: &Expr, var_name: &str, _col_ids: &[u32]) -> Option<ChunkPredicate> {
1924 match expr {
1925 Expr::BinOp { left, op, right } => {
1926 // Only handle `var.prop op literal` or `literal op var.prop`.
1927 let (prop_expr, lit_expr, swapped) = if matches!(right.as_ref(), Expr::Literal(_)) {
1928 (left.as_ref(), right.as_ref(), false)
1929 } else if matches!(left.as_ref(), Expr::Literal(_)) {
1930 (right.as_ref(), left.as_ref(), true)
1931 } else {
1932 return None;
1933 };
1934
1935 let (v, key) = match prop_expr {
1936 Expr::PropAccess { var, prop } => (var.as_str(), prop.as_str()),
1937 _ => return None,
1938 };
1939 if v != var_name {
1940 return None;
1941 }
1942 let col_id = col_id_of(key);
1943
1944 let rhs_raw = match lit_expr {
1945 Expr::Literal(lit) => literal_to_raw_u64(lit)?,
1946 _ => return None,
1947 };
1948
1949 // Swap operators if literal is on the left.
1950 let effective_op = if swapped {
1951 match op {
1952 BinOpKind::Lt => BinOpKind::Gt,
1953 BinOpKind::Le => BinOpKind::Ge,
1954 BinOpKind::Gt => BinOpKind::Lt,
1955 BinOpKind::Ge => BinOpKind::Le,
1956 other => other.clone(),
1957 }
1958 } else {
1959 op.clone()
1960 };
1961
1962 match effective_op {
1963 BinOpKind::Eq => Some(ChunkPredicate::Eq { col_id, rhs_raw }),
1964 BinOpKind::Neq => Some(ChunkPredicate::Ne { col_id, rhs_raw }),
1965 BinOpKind::Gt => Some(ChunkPredicate::Gt { col_id, rhs_raw }),
1966 BinOpKind::Ge => Some(ChunkPredicate::Ge { col_id, rhs_raw }),
1967 BinOpKind::Lt => Some(ChunkPredicate::Lt { col_id, rhs_raw }),
1968 BinOpKind::Le => Some(ChunkPredicate::Le { col_id, rhs_raw }),
1969 _ => None,
1970 }
1971 }
1972 Expr::IsNull(inner) => {
1973 if let Expr::PropAccess { var, prop } = inner.as_ref() {
1974 if var.as_str() == var_name {
1975 return Some(ChunkPredicate::IsNull {
1976 col_id: col_id_of(prop),
1977 });
1978 }
1979 }
1980 None
1981 }
1982 Expr::IsNotNull(inner) => {
1983 if let Expr::PropAccess { var, prop } = inner.as_ref() {
1984 if var.as_str() == var_name {
1985 return Some(ChunkPredicate::IsNotNull {
1986 col_id: col_id_of(prop),
1987 });
1988 }
1989 }
1990 None
1991 }
1992 Expr::And(a, b) => {
1993 let ca = try_compile_predicate(a, var_name, _col_ids);
1994 let cb = try_compile_predicate(b, var_name, _col_ids);
1995 match (ca, cb) {
1996 (Some(pa), Some(pb)) => Some(ChunkPredicate::And(vec![pa, pb])),
1997 _ => None,
1998 }
1999 }
2000 _ => None,
2001 }
2002}
2003
2004/// Encode a literal as a raw `u64` for `ChunkPredicate` comparison.
2005///
2006/// Returns `None` for string/float literals that cannot be compared using
2007/// simple raw-u64 equality (those fall back to the row-vals path).
2008fn literal_to_raw_u64(lit: &Literal) -> Option<u64> {
2009 use sparrowdb_storage::node_store::Value as StoreValue;
2010 match lit {
2011 Literal::Int(n) => Some(StoreValue::Int64(*n).to_u64()),
2012 Literal::Bool(b) => Some(StoreValue::Int64(if *b { 1 } else { 0 }).to_u64()),
2013 // Strings and floats: leave to the row-vals fallback.
2014 Literal::String(_) | Literal::Float(_) | Literal::Null | Literal::Param(_) => None,
2015 }
2016}
2017
2018/// Build a `Vec<(col_id, raw_value)>` from a chunk at a given physical row index.
2019///
2020/// Only returns columns that are NOT null (null-bitmap bit is clear) and whose
2021/// col_id is in `col_ids`.
2022fn build_props_from_chunk(chunk: &DataChunk, row_idx: usize, col_ids: &[u32]) -> Vec<(u32, u64)> {
2023 col_ids
2024 .iter()
2025 .filter_map(|&cid| {
2026 let col = chunk.find_column(cid)?;
2027 if col.nulls.is_null(row_idx) {
2028 None
2029 } else {
2030 Some((cid, col.data[row_idx]))
2031 }
2032 })
2033 .collect()
2034}
2035
2036// ── DstSlotProjector ──────────────────────────────────────────────────────────
2037
2038/// Projects `COL_ID_DST_SLOT` from a `GetNeighbors` output chunk to `COL_ID_SLOT`.
2039///
2040/// `GetNeighbors` emits `(COL_ID_SRC_SLOT, COL_ID_DST_SLOT)` pairs.
2041/// `SlotIntersect` operates on `COL_ID_SLOT` columns. This thin adaptor
2042/// renames the `COL_ID_DST_SLOT` column to `COL_ID_SLOT` so that
2043/// `SlotIntersect` can be wired directly to `GetNeighbors` output.
2044struct DstSlotProjector<C: PipelineOperator> {
2045 child: C,
2046}
2047
2048impl<C: PipelineOperator> DstSlotProjector<C> {
2049 fn new(child: C) -> Self {
2050 DstSlotProjector { child }
2051 }
2052}
2053
2054impl<C: PipelineOperator> PipelineOperator for DstSlotProjector<C> {
2055 fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
2056 use crate::chunk::ColumnVector;
2057
2058 loop {
2059 let chunk = match self.child.next_chunk()? {
2060 Some(c) => c,
2061 None => return Ok(None),
2062 };
2063
2064 if chunk.is_empty() {
2065 continue;
2066 }
2067
2068 // Extract dst slots from live rows and build a new COL_ID_SLOT chunk.
2069 let dst_col = match chunk.find_column(COL_ID_DST_SLOT) {
2070 Some(c) => c,
2071 None => continue,
2072 };
2073
2074 let data: Vec<u64> = chunk.live_rows().map(|i| dst_col.data[i]).collect();
2075 if data.is_empty() {
2076 continue;
2077 }
2078 let col = ColumnVector::from_data(crate::chunk::COL_ID_SLOT, data);
2079 return Ok(Some(DataChunk::from_columns(vec![col])));
2080 }
2081 }
2082}
2083
2084// ── MutualNeighbors helpers ───────────────────────────────────────────────────
2085
2086/// Return `true` if `expr` is `id(var_name)`.
2087fn is_id_call(expr: &Expr, var_name: &str) -> bool {
2088 match expr {
2089 Expr::FnCall { name, args } => {
2090 name.eq_ignore_ascii_case("id")
2091 && args.len() == 1
2092 && matches!(&args[0], Expr::Var(v) if v.as_str() == var_name)
2093 }
2094 _ => false,
2095 }
2096}
2097
2098/// Return `true` if `expr` is a `$param` literal.
2099fn is_param_literal(expr: &Expr) -> bool {
2100 matches!(expr, Expr::Literal(Literal::Param(_)))
2101}
2102
2103/// Return `true` ONLY if `expr` is a pure conjunction of `id(var)=$param`
2104/// equalities for the two given variable names.
2105///
2106/// Any OR, property access, function call other than `id()`, or other expression
2107/// shape returns `false` — this is the strict purity check that prevents
2108/// `WHERE id(a)=$aid OR id(b)=$bid` from incorrectly passing the fast-path guard.
2109fn where_is_only_id_param_conjuncts(expr: &Expr, a_var: &str, b_var: &str) -> bool {
2110 match expr {
2111 Expr::And(left, right) => {
2112 where_is_only_id_param_conjuncts(left, a_var, b_var)
2113 && where_is_only_id_param_conjuncts(right, a_var, b_var)
2114 }
2115 Expr::BinOp {
2116 left,
2117 op: BinOpKind::Eq,
2118 right,
2119 } => {
2120 // Must be id(a_var)=$param, id(b_var)=$param, or either commuted.
2121 (is_id_call(left, a_var) || is_id_call(left, b_var)) && is_param_literal(right)
2122 || is_param_literal(left) && (is_id_call(right, a_var) || is_id_call(right, b_var))
2123 }
2124 _ => false,
2125 }
2126}
2127
2128/// Extract the slot number for `var_name` from `id(var_name) = $param` in WHERE.
2129///
2130/// Looks up the parameter value in `params`, then decodes the slot from the
2131/// NodeId encoding: `slot = node_id & 0xFFFF_FFFF`.
2132///
2133/// Returns `None` when the param is not found or the label doesn't match.
2134fn extract_id_param_slot(
2135 where_clause: Option<&Expr>,
2136 var_name: &str,
2137 params: &std::collections::HashMap<String, crate::types::Value>,
2138 expected_label_id: u32,
2139) -> Option<u64> {
2140 let wexpr = where_clause?;
2141 let param_name = find_id_param_name(wexpr, var_name)?;
2142 let val = params.get(¶m_name)?;
2143
2144 // The param value is expected to be a NodeId (Int64 or NodeRef).
2145 let raw_node_id: u64 = match val {
2146 crate::types::Value::Int64(n) => *n as u64,
2147 crate::types::Value::NodeRef(nid) => nid.0,
2148 _ => return None,
2149 };
2150
2151 let (label_id, slot) = super::node_id_parts(raw_node_id);
2152 if label_id != expected_label_id {
2153 return None;
2154 }
2155 Some(slot)
2156}
2157
2158/// Find the parameter name in `id(var_name) = $param` expressions.
2159fn find_id_param_name(expr: &Expr, var_name: &str) -> Option<String> {
2160 match expr {
2161 Expr::BinOp { left, op, right } => {
2162 if *op == BinOpKind::Eq {
2163 if is_id_call(left, var_name) {
2164 if let Expr::Literal(Literal::Param(p)) = right.as_ref() {
2165 return Some(p.clone());
2166 }
2167 }
2168 if is_id_call(right, var_name) {
2169 if let Expr::Literal(Literal::Param(p)) = left.as_ref() {
2170 return Some(p.clone());
2171 }
2172 }
2173 }
2174 find_id_param_name(left, var_name).or_else(|| find_id_param_name(right, var_name))
2175 }
2176 Expr::And(a, b) => {
2177 find_id_param_name(a, var_name).or_else(|| find_id_param_name(b, var_name))
2178 }
2179 _ => None,
2180 }
2181}
2182
2183/// Scan a label's slots to find the first node that matches all `props` filters.
2184///
2185/// Used by `execute_mutual_neighbors_chunked` when endpoints are bound via
2186/// inline props (`{uid: 0}`) rather than `WHERE id(a) = $param`.
2187///
2188/// # Performance
2189///
2190/// 1. Property index (O(1)) — checked first when an index exists for `(label_id, prop)`.
2191/// 2. Single-column bulk read — reads the column file **once**, scans in memory.
2192/// O(N) in memory instead of O(N) × `fs::read` calls (the per-slot path
2193/// re-reads the entire column file on every slot, causing 4000+ disk reads
2194/// for a typical social-graph dataset).
2195/// 3. Per-slot fallback — used only for complex/multi-prop filters.
2196fn find_slot_by_props(
2197 store: &NodeStore,
2198 label_id: u32,
2199 hwm: u64,
2200 props: &[sparrowdb_cypher::ast::PropEntry],
2201 params: &std::collections::HashMap<String, crate::types::Value>,
2202 prop_index: &PropertyIndex,
2203) -> Option<u64> {
2204 if props.is_empty() || hwm == 0 {
2205 return None;
2206 }
2207
2208 // Fast path: property index (O(1) when an index exists for this label+prop).
2209 if let Some(slots) = try_index_lookup_for_props(props, label_id, prop_index) {
2210 return slots.into_iter().next().map(|s| s as u64);
2211 }
2212
2213 // Single-prop bulk-read path: read the column file once, scan in memory.
2214 // This replaces O(N) per-slot `fs::read` calls (each re-reads the whole file)
2215 // with O(1) file reads + O(N) in-memory iteration.
2216 if props.len() == 1 {
2217 let filter = &props[0];
2218 let col_id = prop_name_to_col_id(&filter.key);
2219
2220 // Encode the filter value to its raw u64 storage representation.
2221 let target_raw_opt: Option<u64> = match &filter.value {
2222 Expr::Literal(Literal::Int(n)) => Some(StoreValue::Int64(*n).to_u64()),
2223 Expr::Literal(Literal::String(s)) if s.len() <= 7 => {
2224 Some(StoreValue::Bytes(s.as_bytes().to_vec()).to_u64())
2225 }
2226 // Params, floats, long strings: fall through to per-slot path.
2227 _ => None,
2228 };
2229
2230 if let Some(target_raw) = target_raw_opt {
2231 let col_data = match store.read_col_all(label_id, col_id) {
2232 Ok(d) => d,
2233 Err(_) => return None,
2234 };
2235 let null_bitmap = store.read_null_bitmap_all(label_id, col_id).ok().flatten();
2236
2237 for (slot, &raw) in col_data.iter().enumerate().take(hwm as usize) {
2238 // Check presence before equality: in pre-SPA-207 data, raw == 0
2239 // means absent (not the integer zero), so a search for uid:0
2240 // must not match an absent slot.
2241 let is_present = match &null_bitmap {
2242 // No bitmap (pre-SPA-207 data): use `raw != 0` sentinel.
2243 None => raw != 0,
2244 // Bitmap present: check the explicit null bit.
2245 Some(bits) => bits.get(slot).copied().unwrap_or(false),
2246 };
2247 if !is_present {
2248 continue;
2249 }
2250 if raw != target_raw {
2251 continue;
2252 }
2253 return Some(slot as u64);
2254 }
2255 return None;
2256 }
2257 }
2258
2259 // Fallback: per-slot read for complex/multi-prop filters.
2260 let col_ids: Vec<u32> = props.iter().map(|p| prop_name_to_col_id(&p.key)).collect();
2261 for slot in 0..hwm {
2262 let node_id = NodeId(((label_id as u64) << 32) | slot);
2263 let Ok(raw_props) = store.get_node_raw_nullable(node_id, &col_ids) else {
2264 continue;
2265 };
2266 let stored: Vec<(u32, u64)> = raw_props
2267 .into_iter()
2268 .filter_map(|(c, opt)| opt.map(|v| (c, v)))
2269 .collect();
2270 if matches_prop_filter_static(&stored, props, params, store) {
2271 return Some(slot);
2272 }
2273 }
2274 None
2275}