1use std::sync::Arc;
27
28use sparrowdb_common::NodeId;
29use sparrowdb_storage::edge_store::EdgeStore;
30
31use super::*;
32use crate::chunk::{DataChunk, COL_ID_DST_SLOT, COL_ID_SLOT, COL_ID_SRC_SLOT};
33use crate::pipeline::{
34 BfsArena, ChunkPredicate, GetNeighbors, PipelineOperator, ReadNodeProps, ScanByLabel,
35 SlotIntersect,
36};
37
38#[derive(Debug, Clone, PartialEq, Eq)]
50pub enum ChunkedPlan {
51 Scan,
53 OneHop,
55 TwoHop,
57 MutualNeighbors,
59}
60
61impl std::fmt::Display for ChunkedPlan {
62 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 match self {
64 ChunkedPlan::Scan => write!(f, "Scan"),
65 ChunkedPlan::OneHop => write!(f, "OneHop"),
66 ChunkedPlan::TwoHop => write!(f, "TwoHop"),
67 ChunkedPlan::MutualNeighbors => write!(f, "MutualNeighbors"),
68 }
69 }
70}
71
72impl Engine {
73 pub(crate) fn can_use_chunked_pipeline(&self, m: &MatchStatement) -> bool {
82 if !self.use_chunked_pipeline {
83 return false;
84 }
85 if m.pattern.len() != 1 || !m.pattern[0].rels.is_empty() {
86 return false;
87 }
88 if has_aggregate_in_return(&m.return_clause.items) {
89 return false;
90 }
91 if !m.order_by.is_empty() || m.skip.is_some() || m.limit.is_some() {
92 return false;
93 }
94 if !m.pattern[0].nodes[0].props.is_empty() {
98 return false;
99 }
100 !m.pattern[0].nodes[0].labels.is_empty()
101 }
102
103 pub(crate) fn can_use_one_hop_chunked(&self, m: &MatchStatement) -> bool {
116 use sparrowdb_cypher::ast::EdgeDir;
117
118 if !self.use_chunked_pipeline {
119 return false;
120 }
121 if m.pattern.len() != 1 {
123 return false;
124 }
125 let pat = &m.pattern[0];
126 if pat.rels.len() != 1 || pat.nodes.len() != 2 {
127 return false;
128 }
129 if pat.nodes[0].labels.len() != 1 || pat.nodes[1].labels.len() != 1 {
131 return false;
132 }
133 let dir = &pat.rels[0].dir;
135 if *dir != EdgeDir::Outgoing && *dir != EdgeDir::Incoming {
136 return false;
137 }
138 if has_aggregate_in_return(&m.return_clause.items) {
140 return false;
141 }
142 if m.distinct {
144 return false;
145 }
146 if !m.order_by.is_empty() {
148 return false;
149 }
150 if pat.rels[0].min_hops.is_some() {
152 return false;
153 }
154 let rel_var = &pat.rels[0].var;
159 if !rel_var.is_empty() {
160 let ref_in_return = m.return_clause.items.iter().any(|item| {
161 column_name_for_item(item)
162 .split_once('.')
163 .is_some_and(|(v, _)| v == rel_var.as_str())
164 });
165 if ref_in_return {
166 return false;
167 }
168 if let Some(ref wexpr) = m.where_clause {
170 if expr_references_var(wexpr, rel_var.as_str()) {
171 return false;
172 }
173 }
174 }
175 if let Some(ref wexpr) = m.where_clause {
177 if !is_simple_where_for_chunked(wexpr) {
178 return false;
179 }
180 }
181 if pat.nodes.iter().any(|n| !n.props.is_empty()) {
184 return false;
185 }
186 let src_label = pat.nodes[0].labels.first().cloned().unwrap_or_default();
188 let dst_label = pat.nodes[1].labels.first().cloned().unwrap_or_default();
189 let rel_type = pat.rels[0].rel_type.clone();
190 let n_tables = self
191 .snapshot
192 .catalog
193 .list_rel_tables_with_ids()
194 .into_iter()
195 .filter(|(_, sid, did, rt)| {
196 let type_ok = rel_type.is_empty() || rt == &rel_type;
197 let src_ok = self
198 .snapshot
199 .catalog
200 .get_label(&src_label)
201 .ok()
202 .flatten()
203 .map(|id| id as u32 == *sid as u32)
204 .unwrap_or(false);
205 let dst_ok = self
206 .snapshot
207 .catalog
208 .get_label(&dst_label)
209 .ok()
210 .flatten()
211 .map(|id| id as u32 == *did as u32)
212 .unwrap_or(false);
213 type_ok && src_ok && dst_ok
214 })
215 .count();
216 n_tables == 1
217 }
218
219 pub(crate) fn execute_one_hop_chunked(
235 &self,
236 m: &MatchStatement,
237 column_names: &[String],
238 ) -> Result<QueryResult> {
239 use sparrowdb_cypher::ast::EdgeDir;
240
241 let pat = &m.pattern[0];
242 let rel_pat = &pat.rels[0];
243 let dir = &rel_pat.dir;
244
245 let (src_node_pat, dst_node_pat, swapped) = if *dir == EdgeDir::Incoming {
248 (&pat.nodes[1], &pat.nodes[0], true)
249 } else {
250 (&pat.nodes[0], &pat.nodes[1], false)
251 };
252
253 let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
254 let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
255 let rel_type = rel_pat.rel_type.clone();
256
257 let src_label_id = match self.snapshot.catalog.get_label(&src_label)? {
259 Some(id) => id as u32,
260 None => {
261 return Ok(QueryResult {
262 columns: column_names.to_vec(),
263 rows: vec![],
264 });
265 }
266 };
267 let dst_label_id = match self.snapshot.catalog.get_label(&dst_label)? {
268 Some(id) => id as u32,
269 None => {
270 return Ok(QueryResult {
271 columns: column_names.to_vec(),
272 rows: vec![],
273 });
274 }
275 };
276
277 let (catalog_rel_id, _) = self
279 .snapshot
280 .catalog
281 .list_rel_tables_with_ids()
282 .into_iter()
283 .find(|(_, sid, did, rt)| {
284 let type_ok = rel_type.is_empty() || rt == &rel_type;
285 let src_ok = *sid as u32 == src_label_id;
286 let dst_ok = *did as u32 == dst_label_id;
287 type_ok && src_ok && dst_ok
288 })
289 .map(|(cid, sid, did, rt)| (cid as u32, (sid, did, rt)))
290 .ok_or_else(|| {
291 sparrowdb_common::Error::InvalidArgument(
292 "no matching relationship table found".into(),
293 )
294 })?;
295
296 let hwm_src = self.snapshot.store.hwm_for_label(src_label_id).unwrap_or(0);
297 tracing::debug!(
298 engine = "chunked",
299 src_label = %src_label,
300 dst_label = %dst_label,
301 rel_type = %rel_type,
302 hwm_src,
303 "executing via chunked pipeline (1-hop)"
304 );
305
306 let src_var = src_node_pat.var.as_str();
309 let dst_var = dst_node_pat.var.as_str();
310
311 let (query_src_var, query_dst_var) = if swapped {
314 (dst_node_pat.var.as_str(), src_node_pat.var.as_str())
315 } else {
316 (src_var, dst_var)
317 };
318
319 let mut col_ids_src = collect_col_ids_for_var(query_src_var, column_names, src_label_id);
320 let mut col_ids_dst = collect_col_ids_for_var(query_dst_var, column_names, dst_label_id);
321
322 if let Some(ref wexpr) = m.where_clause {
324 collect_col_ids_from_expr_for_var(wexpr, query_src_var, &mut col_ids_src);
325 collect_col_ids_from_expr_for_var(wexpr, query_dst_var, &mut col_ids_dst);
326 }
327 for p in &src_node_pat.props {
329 let cid = col_id_of(&p.key);
330 if !col_ids_src.contains(&cid) {
331 col_ids_src.push(cid);
332 }
333 }
334 for p in &dst_node_pat.props {
335 let cid = col_id_of(&p.key);
336 if !col_ids_dst.contains(&cid) {
337 col_ids_dst.push(cid);
338 }
339 }
340
341 let delta_records = {
343 let edge_store = EdgeStore::open(
344 &self.snapshot.db_root,
345 sparrowdb_storage::edge_store::RelTableId(catalog_rel_id),
346 );
347 edge_store.and_then(|s| s.read_delta()).unwrap_or_default()
348 };
349
350 let csr = self
352 .snapshot
353 .csrs
354 .get(&catalog_rel_id)
355 .cloned()
356 .unwrap_or_else(|| sparrowdb_storage::csr::CsrForward::build(0, &[]));
357
358 let avg_degree_hint = self
360 .snapshot
361 .rel_degree_stats()
362 .get(&catalog_rel_id)
363 .map(|s| s.mean().ceil() as usize)
364 .unwrap_or(8);
365
366 let src_pred_opt = m
368 .where_clause
369 .as_ref()
370 .and_then(|wexpr| try_compile_predicate(wexpr, query_src_var, &col_ids_src));
371 let dst_pred_opt = m
372 .where_clause
373 .as_ref()
374 .and_then(|wexpr| try_compile_predicate(wexpr, query_dst_var, &col_ids_dst));
375
376 let store_arc = Arc::new(NodeStore::open(self.snapshot.store.root_path())?);
377
378 let limit = m.limit.map(|l| l as usize);
384 let mut rows: Vec<Vec<Value>> = Vec::new();
385
386 let mut scan = ScanByLabel::new(hwm_src);
392
393 'outer: while let Some(scan_chunk) = scan.next_chunk()? {
394 let src_chunk = if !col_ids_src.is_empty() {
398 let mut rnp = ReadNodeProps::new(
399 SingleChunkSource::new(scan_chunk),
400 Arc::clone(&store_arc),
401 src_label_id,
402 crate::chunk::COL_ID_SLOT,
403 col_ids_src.clone(),
404 );
405 match rnp.next_chunk()? {
406 Some(c) => c,
407 None => continue,
408 }
409 } else {
410 scan_chunk
411 };
412
413 let src_chunk = if let Some(ref pred) = src_pred_opt {
415 let pred = pred.clone();
416 let keep: Vec<bool> = {
417 (0..src_chunk.len())
418 .map(|i| pred.eval(&src_chunk, i))
419 .collect()
420 };
421 let mut c = src_chunk;
422 c.filter_sel(|i| keep[i]);
423 if c.live_len() == 0 {
424 continue;
425 }
426 c
427 } else {
428 src_chunk
429 };
430
431 let mut gn = GetNeighbors::new(
433 SingleChunkSource::new(src_chunk.clone()),
434 csr.clone(),
435 &delta_records,
436 src_label_id,
437 avg_degree_hint,
438 );
439
440 while let Some(hop_chunk) = gn.next_chunk()? {
441 let dst_chunk = if !col_ids_dst.is_empty() {
445 let mut rnp = ReadNodeProps::new(
446 SingleChunkSource::new(hop_chunk),
447 Arc::clone(&store_arc),
448 dst_label_id,
449 COL_ID_DST_SLOT,
450 col_ids_dst.clone(),
451 );
452 match rnp.next_chunk()? {
453 Some(c) => c,
454 None => continue,
455 }
456 } else {
457 hop_chunk
458 };
459
460 let dst_chunk = if let Some(ref pred) = dst_pred_opt {
462 let pred = pred.clone();
463 let keep: Vec<bool> = (0..dst_chunk.len())
464 .map(|i| pred.eval(&dst_chunk, i))
465 .collect();
466 let mut c = dst_chunk;
467 c.filter_sel(|i| keep[i]);
468 if c.live_len() == 0 {
469 continue;
470 }
471 c
472 } else {
473 dst_chunk
474 };
475
476 let src_slot_col = src_chunk.find_column(crate::chunk::COL_ID_SLOT);
478 let dst_slot_col = dst_chunk.find_column(COL_ID_DST_SLOT);
479 let hop_src_col = dst_chunk.find_column(COL_ID_SRC_SLOT);
480
481 for row_idx in dst_chunk.live_rows() {
482 let dst_slot = dst_slot_col.map(|c| c.data[row_idx]).unwrap_or(0);
483 let hop_src_slot = hop_src_col.map(|c| c.data[row_idx]).unwrap_or(0);
484
485 let src_node = NodeId(((src_label_id as u64) << 32) | hop_src_slot);
487 let dst_node = NodeId(((dst_label_id as u64) << 32) | dst_slot);
488 if self.is_node_tombstoned(src_node) || self.is_node_tombstoned(dst_node) {
489 continue;
490 }
491
492 let src_props = if let Some(sc) = src_slot_col {
497 let src_row = (0..sc.data.len()).find(|&i| sc.data[i] == hop_src_slot);
499 if let Some(src_ri) = src_row {
500 build_props_from_chunk(&src_chunk, src_ri, &col_ids_src)
501 } else {
502 let nullable = self
504 .snapshot
505 .store
506 .get_node_raw_nullable(src_node, &col_ids_src)?;
507 nullable
508 .into_iter()
509 .filter_map(|(cid, opt)| opt.map(|v| (cid, v)))
510 .collect()
511 }
512 } else {
513 vec![]
514 };
515
516 let dst_props = build_props_from_chunk(&dst_chunk, row_idx, &col_ids_dst);
517
518 if let Some(ref where_expr) = m.where_clause {
521 let (actual_src_var, actual_dst_var) = if swapped {
523 (dst_node_pat.var.as_str(), src_node_pat.var.as_str())
524 } else {
525 (src_node_pat.var.as_str(), dst_node_pat.var.as_str())
526 };
527 let (actual_src_props, actual_dst_props) = if swapped {
528 (&dst_props, &src_props)
529 } else {
530 (&src_props, &dst_props)
531 };
532 let mut row_vals = build_row_vals(
533 actual_src_props,
534 actual_src_var,
535 &col_ids_src,
536 &self.snapshot.store,
537 );
538 row_vals.extend(build_row_vals(
539 actual_dst_props,
540 actual_dst_var,
541 &col_ids_dst,
542 &self.snapshot.store,
543 ));
544 row_vals.extend(self.dollar_params());
545 if !self.eval_where_graph(where_expr, &row_vals) {
546 continue;
547 }
548 }
549
550 let (proj_src_props, proj_dst_props) = if swapped {
552 (&dst_props as &[(u32, u64)], &src_props as &[(u32, u64)])
553 } else {
554 (&src_props as &[(u32, u64)], &dst_props as &[(u32, u64)])
555 };
556 let (proj_src_var, proj_dst_var, proj_src_label, proj_dst_label) = if swapped {
557 (
558 dst_node_pat.var.as_str(),
559 src_node_pat.var.as_str(),
560 dst_label.as_str(),
561 src_label.as_str(),
562 )
563 } else {
564 (
565 src_node_pat.var.as_str(),
566 dst_node_pat.var.as_str(),
567 src_label.as_str(),
568 dst_label.as_str(),
569 )
570 };
571
572 let row = project_hop_row(
573 proj_src_props,
574 proj_dst_props,
575 column_names,
576 proj_src_var,
577 proj_dst_var,
578 None, Some((proj_src_var, proj_src_label)),
580 Some((proj_dst_var, proj_dst_label)),
581 &self.snapshot.store,
582 None, );
584 rows.push(row);
585
586 if let Some(lim) = limit {
588 if rows.len() >= lim {
589 break 'outer;
590 }
591 }
592 }
593 }
594 }
595
596 Ok(QueryResult {
597 columns: column_names.to_vec(),
598 rows,
599 })
600 }
601
602 pub fn try_plan_chunked_match(&self, m: &MatchStatement) -> Option<ChunkedPlan> {
613 if self.can_use_mutual_neighbors_chunked(m) {
615 return Some(ChunkedPlan::MutualNeighbors);
616 }
617 if self.can_use_two_hop_chunked(m) {
618 return Some(ChunkedPlan::TwoHop);
619 }
620 if self.can_use_one_hop_chunked(m) {
621 return Some(ChunkedPlan::OneHop);
622 }
623 if self.can_use_chunked_pipeline(m) {
624 return Some(ChunkedPlan::Scan);
625 }
626 None
627 }
628
629 pub(crate) fn can_use_mutual_neighbors_chunked(&self, m: &MatchStatement) -> bool {
652 use sparrowdb_cypher::ast::EdgeDir;
653
654 if !self.use_chunked_pipeline {
655 return false;
656 }
657 if m.pattern.len() != 1 {
659 return false;
660 }
661 let pat = &m.pattern[0];
662 if pat.rels.len() != 2 || pat.nodes.len() != 3 {
663 return false;
664 }
665 if pat.rels[0].dir != EdgeDir::Outgoing || pat.rels[1].dir != EdgeDir::Incoming {
667 return false;
668 }
669 if pat.rels[0].min_hops.is_some() || pat.rels[1].min_hops.is_some() {
671 return false;
672 }
673 if pat.rels[0].rel_type != pat.rels[1].rel_type {
675 return false;
676 }
677 if pat.nodes[0].labels.len() != 1
679 || pat.nodes[1].labels.len() != 1
680 || pat.nodes[2].labels.len() != 1
681 {
682 return false;
683 }
684 if pat.nodes[0].labels[0] != pat.nodes[1].labels[0]
685 || pat.nodes[1].labels[0] != pat.nodes[2].labels[0]
686 {
687 return false;
688 }
689 if has_aggregate_in_return(&m.return_clause.items) {
691 return false;
692 }
693 if m.distinct {
695 return false;
696 }
697 if !m.order_by.is_empty() {
699 return false;
700 }
701 for rel in &pat.rels {
703 if !rel.var.is_empty() {
704 let ref_in_return = m.return_clause.items.iter().any(|item| {
705 column_name_for_item(item)
706 .split_once('.')
707 .is_some_and(|(v, _)| v == rel.var.as_str())
708 });
709 if ref_in_return {
710 return false;
711 }
712 if let Some(ref wexpr) = m.where_clause {
713 if expr_references_var(wexpr, rel.var.as_str()) {
714 return false;
715 }
716 }
717 }
718 }
719 let a_var = pat.nodes[0].var.as_str();
724 let b_var = pat.nodes[2].var.as_str();
725 match m.where_clause.as_ref() {
726 None => {
727 let a_bound = pat.nodes[0].props.len() == 1;
730 let b_bound = pat.nodes[2].props.len() == 1;
731 if !a_bound || !b_bound {
732 return false;
733 }
734 }
735 Some(wexpr) => {
736 if !where_is_only_id_param_conjuncts(wexpr, a_var, b_var) {
737 return false;
738 }
739 }
740 }
741 let label = pat.nodes[0].labels[0].clone();
743 let rel_type = &pat.rels[0].rel_type;
744 let catalog = &self.snapshot.catalog;
745 let tables = catalog.list_rel_tables_with_ids();
746 let label_id_opt = catalog.get_label(&label).ok().flatten();
747 let label_id = match label_id_opt {
748 Some(id) => id as u32,
749 None => return false,
750 };
751 let has_table = tables.iter().any(|(_, sid, did, rt)| {
752 let type_ok = rel_type.is_empty() || rt == rel_type;
753 let endpoint_ok = *sid as u32 == label_id && *did as u32 == label_id;
754 type_ok && endpoint_ok
755 });
756 has_table
757 }
758
759 pub(crate) fn execute_mutual_neighbors_chunked(
771 &self,
772 m: &MatchStatement,
773 column_names: &[String],
774 ) -> Result<QueryResult> {
775 let pat = &m.pattern[0];
776 let a_node_pat = &pat.nodes[0];
777 let x_node_pat = &pat.nodes[1];
778 let b_node_pat = &pat.nodes[2];
779
780 let label = a_node_pat.labels[0].clone();
781 let rel_type = pat.rels[0].rel_type.clone();
782
783 let label_id = match self.snapshot.catalog.get_label(&label)? {
784 Some(id) => id as u32,
785 None => {
786 return Ok(QueryResult {
787 columns: column_names.to_vec(),
788 rows: vec![],
789 });
790 }
791 };
792
793 let catalog_rel_id = self
795 .snapshot
796 .catalog
797 .list_rel_tables_with_ids()
798 .into_iter()
799 .find(|(_, sid, did, rt)| {
800 let type_ok = rel_type.is_empty() || rt == &rel_type;
801 let endpoint_ok = *sid as u32 == label_id && *did as u32 == label_id;
802 type_ok && endpoint_ok
803 })
804 .map(|(cid, _, _, _)| cid as u32)
805 .ok_or_else(|| {
806 sparrowdb_common::Error::InvalidArgument(
807 "no matching relationship table for mutual-neighbors".into(),
808 )
809 })?;
810
811 let a_var = a_node_pat.var.as_str();
816 let b_var = b_node_pat.var.as_str();
817 let (a_slot_opt, b_slot_opt) = if m.where_clause.is_some() {
818 (
820 extract_id_param_slot(m.where_clause.as_ref(), a_var, &self.params, label_id),
821 extract_id_param_slot(m.where_clause.as_ref(), b_var, &self.params, label_id),
822 )
823 } else {
824 let hwm = self.snapshot.store.hwm_for_label(label_id).unwrap_or(0);
826 let dollar_params = self.dollar_params();
827 let prop_idx = self.prop_index.borrow();
828 (
829 find_slot_by_props(
830 &self.snapshot.store,
831 label_id,
832 hwm,
833 &a_node_pat.props,
834 &dollar_params,
835 &prop_idx,
836 ),
837 find_slot_by_props(
838 &self.snapshot.store,
839 label_id,
840 hwm,
841 &b_node_pat.props,
842 &dollar_params,
843 &prop_idx,
844 ),
845 )
846 };
847
848 let (a_slot, b_slot) = match (a_slot_opt, b_slot_opt) {
849 (Some(a), Some(b)) => (a, b),
850 _ => {
851 return Ok(QueryResult {
853 columns: column_names.to_vec(),
854 rows: vec![],
855 });
856 }
857 };
858
859 if a_slot == b_slot {
863 return Ok(QueryResult {
864 columns: column_names.to_vec(),
865 rows: vec![],
866 });
867 }
868
869 tracing::debug!(
870 engine = "chunked",
871 plan = %ChunkedPlan::MutualNeighbors,
872 label = %label,
873 rel_type = %rel_type,
874 a_slot,
875 b_slot,
876 "executing via chunked pipeline"
877 );
878
879 let csr = self
880 .snapshot
881 .csrs
882 .get(&catalog_rel_id)
883 .cloned()
884 .unwrap_or_else(|| sparrowdb_storage::csr::CsrForward::build(0, &[]));
885
886 let delta_records = {
887 let edge_store = sparrowdb_storage::edge_store::EdgeStore::open(
888 &self.snapshot.db_root,
889 sparrowdb_storage::edge_store::RelTableId(catalog_rel_id),
890 );
891 edge_store.and_then(|s| s.read_delta()).unwrap_or_default()
892 };
893
894 let a_scan = ScanByLabel::from_slots(vec![a_slot]);
896 let a_neighbors = GetNeighbors::new(a_scan, csr.clone(), &delta_records, label_id, 8);
897
898 let b_scan = ScanByLabel::from_slots(vec![b_slot]);
899 let b_neighbors = GetNeighbors::new(b_scan, csr, &delta_records, label_id, 8);
900
901 let a_proj = DstSlotProjector::new(a_neighbors);
904 let b_proj = DstSlotProjector::new(b_neighbors);
905
906 let spill_threshold = 64 * 1024; let mut intersect =
909 SlotIntersect::new(a_proj, b_proj, COL_ID_SLOT, COL_ID_SLOT, spill_threshold);
910
911 let mut common_slots: Vec<u64> = Vec::new();
913 while let Some(chunk) = intersect.next_chunk()? {
914 if let Some(col) = chunk.find_column(COL_ID_SLOT) {
915 for row_idx in chunk.live_rows() {
916 common_slots.push(col.data[row_idx]);
917 }
918 }
919 }
920
921 let x_var = x_node_pat.var.as_str();
923 let mut col_ids_x = collect_col_ids_for_var(x_var, column_names, label_id);
924 if let Some(ref wexpr) = m.where_clause {
925 collect_col_ids_from_expr_for_var(wexpr, x_var, &mut col_ids_x);
926 }
927 for p in &x_node_pat.props {
928 let cid = col_id_of(&p.key);
929 if !col_ids_x.contains(&cid) {
930 col_ids_x.push(cid);
931 }
932 }
933
934 let store_arc = Arc::new(sparrowdb_storage::node_store::NodeStore::open(
935 self.snapshot.store.root_path(),
936 )?);
937
938 let limit = m.limit.map(|l| l as usize);
939 let mut rows: Vec<Vec<Value>> = Vec::new();
940
941 'outer: for x_slot in common_slots {
942 let x_node_id = NodeId(((label_id as u64) << 32) | x_slot);
943
944 if self.is_node_tombstoned(x_node_id) {
946 continue;
947 }
948
949 let x_props: Vec<(u32, u64)> = if !col_ids_x.is_empty() {
951 let nullable = store_arc.batch_read_node_props_nullable(
952 label_id,
953 &[x_slot as u32],
954 &col_ids_x,
955 )?;
956 if nullable.is_empty() {
957 vec![]
958 } else {
959 col_ids_x
960 .iter()
961 .enumerate()
962 .filter_map(|(i, &cid)| nullable[0][i].map(|v| (cid, v)))
963 .collect()
964 }
965 } else {
966 vec![]
967 };
968
969 if let Some(ref where_expr) = m.where_clause {
971 let mut row_vals =
972 build_row_vals(&x_props, x_var, &col_ids_x, &self.snapshot.store);
973 if !a_var.is_empty() {
975 let a_node_id = NodeId(((label_id as u64) << 32) | a_slot);
976 row_vals.insert(a_var.to_string(), Value::NodeRef(a_node_id));
977 }
978 if !b_var.is_empty() {
979 let b_node_id = NodeId(((label_id as u64) << 32) | b_slot);
980 row_vals.insert(b_var.to_string(), Value::NodeRef(b_node_id));
981 }
982 row_vals.extend(self.dollar_params());
983 if !self.eval_where_graph(where_expr, &row_vals) {
984 continue;
985 }
986 }
987
988 let row = project_row(
990 &x_props,
991 column_names,
992 &col_ids_x,
993 x_var,
994 &label,
995 &self.snapshot.store,
996 Some(x_node_id),
997 );
998 rows.push(row);
999
1000 if let Some(lim) = limit {
1001 if rows.len() >= lim {
1002 break 'outer;
1003 }
1004 }
1005 }
1006
1007 Ok(QueryResult {
1008 columns: column_names.to_vec(),
1009 rows,
1010 })
1011 }
1012
1013 pub(crate) fn can_use_two_hop_chunked(&self, m: &MatchStatement) -> bool {
1025 use sparrowdb_cypher::ast::EdgeDir;
1026
1027 if !self.use_chunked_pipeline {
1028 return false;
1029 }
1030 if m.pattern.len() != 1 {
1032 return false;
1033 }
1034 let pat = &m.pattern[0];
1035 if pat.rels.len() != 2 || pat.nodes.len() != 3 {
1036 return false;
1037 }
1038 if pat.rels[0].dir != EdgeDir::Outgoing || pat.rels[1].dir != EdgeDir::Outgoing {
1040 return false;
1041 }
1042 if pat.rels[0].min_hops.is_some() || pat.rels[1].min_hops.is_some() {
1044 return false;
1045 }
1046 if has_aggregate_in_return(&m.return_clause.items) {
1048 return false;
1049 }
1050 if m.distinct {
1052 return false;
1053 }
1054 if !m.order_by.is_empty() {
1056 return false;
1057 }
1058 for rel in &pat.rels {
1060 if !rel.var.is_empty() {
1061 let ref_in_return = m.return_clause.items.iter().any(|item| {
1062 column_name_for_item(item)
1063 .split_once('.')
1064 .is_some_and(|(v, _)| v == rel.var.as_str())
1065 });
1066 if ref_in_return {
1067 return false;
1068 }
1069 if let Some(ref wexpr) = m.where_clause {
1070 if expr_references_var(wexpr, rel.var.as_str()) {
1071 return false;
1072 }
1073 }
1074 }
1075 }
1076 if let Some(ref wexpr) = m.where_clause {
1078 if !is_simple_where_for_chunked(wexpr) {
1079 return false;
1080 }
1081 }
1082 if pat.nodes.iter().any(|n| !n.props.is_empty()) {
1085 return false;
1086 }
1087 let src_label = pat.nodes[0].labels.first().cloned().unwrap_or_default();
1089 let mid_label = pat.nodes[1].labels.first().cloned().unwrap_or_default();
1090 let dst_label = pat.nodes[2].labels.first().cloned().unwrap_or_default();
1091 let rel_type1 = &pat.rels[0].rel_type;
1092 let rel_type2 = &pat.rels[1].rel_type;
1093
1094 if rel_type1 != rel_type2 {
1098 return false;
1099 }
1100
1101 let catalog = &self.snapshot.catalog;
1103 let tables = catalog.list_rel_tables_with_ids();
1104
1105 let hop1_matches: Vec<_> = tables
1106 .iter()
1107 .filter(|(_, sid, did, rt)| {
1108 let type_ok = rel_type1.is_empty() || rt == rel_type1;
1109 let src_ok = catalog
1110 .get_label(&src_label)
1111 .ok()
1112 .flatten()
1113 .map(|id| id as u32 == *sid as u32)
1114 .unwrap_or(false);
1115 let mid_ok = catalog
1116 .get_label(&mid_label)
1117 .ok()
1118 .flatten()
1119 .map(|id| id as u32 == *did as u32)
1120 .unwrap_or(false);
1121 type_ok && src_ok && mid_ok
1122 })
1123 .collect();
1124
1125 let n_tables = hop1_matches.len();
1127 if n_tables != 1 {
1128 return false;
1129 }
1130
1131 let hop2_id = tables.iter().find(|(_, sid, did, rt)| {
1132 let type_ok = rel_type2.is_empty() || rt == rel_type2;
1133 let mid_ok = catalog
1134 .get_label(&mid_label)
1135 .ok()
1136 .flatten()
1137 .map(|id| id as u32 == *sid as u32)
1138 .unwrap_or(false);
1139 let dst_ok = catalog
1140 .get_label(&dst_label)
1141 .ok()
1142 .flatten()
1143 .map(|id| id as u32 == *did as u32)
1144 .unwrap_or(false);
1145 type_ok && mid_ok && dst_ok
1146 });
1147
1148 match (hop1_matches.first(), hop2_id) {
1150 (Some((id1, _, _, _)), Some((id2, _, _, _))) => id1 == id2,
1151 _ => false,
1152 }
1153 }
1154
1155 pub(crate) fn execute_two_hop_chunked(
1177 &self,
1178 m: &MatchStatement,
1179 column_names: &[String],
1180 ) -> Result<QueryResult> {
1181 use sparrowdb_common::Error as DbError;
1182
1183 let pat = &m.pattern[0];
1184 let src_node_pat = &pat.nodes[0];
1185 let mid_node_pat = &pat.nodes[1];
1186 let dst_node_pat = &pat.nodes[2];
1187
1188 let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
1189 let mid_label = mid_node_pat.labels.first().cloned().unwrap_or_default();
1190 let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
1191 let rel_type = pat.rels[0].rel_type.clone();
1192
1193 let src_label_id = match self.snapshot.catalog.get_label(&src_label)? {
1195 Some(id) => id as u32,
1196 None => {
1197 return Ok(QueryResult {
1198 columns: column_names.to_vec(),
1199 rows: vec![],
1200 });
1201 }
1202 };
1203 let mid_label_id = if mid_label.is_empty() {
1204 src_label_id
1205 } else {
1206 match self.snapshot.catalog.get_label(&mid_label)? {
1207 Some(id) => id as u32,
1208 None => {
1209 return Ok(QueryResult {
1210 columns: column_names.to_vec(),
1211 rows: vec![],
1212 });
1213 }
1214 }
1215 };
1216 let dst_label_id = match self.snapshot.catalog.get_label(&dst_label)? {
1217 Some(id) => id as u32,
1218 None => {
1219 return Ok(QueryResult {
1220 columns: column_names.to_vec(),
1221 rows: vec![],
1222 });
1223 }
1224 };
1225
1226 let catalog_rel_id = self
1228 .snapshot
1229 .catalog
1230 .list_rel_tables_with_ids()
1231 .into_iter()
1232 .find(|(_, sid, did, rt)| {
1233 let type_ok = rel_type.is_empty() || rt == &rel_type;
1234 let src_ok = *sid as u32 == src_label_id;
1235 let mid_ok = *did as u32 == mid_label_id;
1236 type_ok && src_ok && mid_ok
1237 })
1238 .map(|(cid, _, _, _)| cid as u32)
1239 .ok_or_else(|| {
1240 sparrowdb_common::Error::InvalidArgument(
1241 "no matching relationship table found for 2-hop".into(),
1242 )
1243 })?;
1244
1245 let hwm_src = self.snapshot.store.hwm_for_label(src_label_id).unwrap_or(0);
1246 let hwm_dst = self.snapshot.store.hwm_for_label(dst_label_id).unwrap_or(0);
1247 tracing::debug!(
1248 engine = "chunked",
1249 src_label = %src_label,
1250 mid_label = %mid_label,
1251 dst_label = %dst_label,
1252 rel_type = %rel_type,
1253 hwm_src,
1254 hwm_dst,
1255 "executing via chunked pipeline (2-hop)"
1256 );
1257
1258 let src_var = src_node_pat.var.as_str();
1260 let mid_var = mid_node_pat.var.as_str();
1261 let dst_var = dst_node_pat.var.as_str();
1262
1263 let mut col_ids_src = collect_col_ids_for_var(src_var, column_names, src_label_id);
1266 let mut col_ids_dst = collect_col_ids_for_var(dst_var, column_names, dst_label_id);
1267
1268 let mut col_ids_mid: Vec<u32> = vec![];
1270
1271 if let Some(ref wexpr) = m.where_clause {
1272 collect_col_ids_from_expr_for_var(wexpr, src_var, &mut col_ids_src);
1273 collect_col_ids_from_expr_for_var(wexpr, dst_var, &mut col_ids_dst);
1274 collect_col_ids_from_expr_for_var(wexpr, mid_var, &mut col_ids_mid);
1275 }
1276 for p in &src_node_pat.props {
1278 let cid = sparrowdb_common::col_id_of(&p.key);
1279 if !col_ids_src.contains(&cid) {
1280 col_ids_src.push(cid);
1281 }
1282 }
1283 for p in &mid_node_pat.props {
1284 let cid = sparrowdb_common::col_id_of(&p.key);
1285 if !col_ids_mid.contains(&cid) {
1286 col_ids_mid.push(cid);
1287 }
1288 }
1289 for p in &dst_node_pat.props {
1290 let cid = sparrowdb_common::col_id_of(&p.key);
1291 if !col_ids_dst.contains(&cid) {
1292 col_ids_dst.push(cid);
1293 }
1294 }
1295 if !mid_var.is_empty() {
1297 let mid_return_ids = collect_col_ids_for_var(mid_var, column_names, mid_label_id);
1298 for cid in mid_return_ids {
1299 if !col_ids_mid.contains(&cid) {
1300 col_ids_mid.push(cid);
1301 }
1302 }
1303 }
1304
1305 let delta_records = {
1307 let edge_store = sparrowdb_storage::edge_store::EdgeStore::open(
1308 &self.snapshot.db_root,
1309 sparrowdb_storage::edge_store::RelTableId(catalog_rel_id),
1310 );
1311 edge_store.and_then(|s| s.read_delta()).unwrap_or_default()
1312 };
1313
1314 let csr = self
1316 .snapshot
1317 .csrs
1318 .get(&catalog_rel_id)
1319 .cloned()
1320 .unwrap_or_else(|| sparrowdb_storage::csr::CsrForward::build(0, &[]));
1321
1322 let avg_degree_hint = self
1323 .snapshot
1324 .rel_degree_stats()
1325 .get(&catalog_rel_id)
1326 .map(|s| s.mean().ceil() as usize)
1327 .unwrap_or(8);
1328
1329 let src_pred_opt = m
1331 .where_clause
1332 .as_ref()
1333 .and_then(|wexpr| try_compile_predicate(wexpr, src_var, &col_ids_src));
1334 let mid_pred_opt = m
1335 .where_clause
1336 .as_ref()
1337 .and_then(|wexpr| try_compile_predicate(wexpr, mid_var, &col_ids_mid));
1338 let dst_pred_opt = m
1339 .where_clause
1340 .as_ref()
1341 .and_then(|wexpr| try_compile_predicate(wexpr, dst_var, &col_ids_dst));
1342
1343 let store_arc = Arc::new(sparrowdb_storage::node_store::NodeStore::open(
1344 self.snapshot.store.root_path(),
1345 )?);
1346
1347 let limit = m.limit.map(|l| l as usize);
1348 let memory_limit = self.memory_limit_bytes;
1349 let mut rows: Vec<Vec<Value>> = Vec::new();
1350
1351 let node_capacity = (hwm_src.max(hwm_dst) as usize).max(64);
1359 let mut frontier = BfsArena::new(
1360 avg_degree_hint * (crate::chunk::CHUNK_CAPACITY / 2),
1361 node_capacity,
1362 );
1363
1364 let row_size_estimate = column_names.len().max(1) * 16;
1368
1369 let mut scan = ScanByLabel::new(hwm_src);
1370
1371 'outer: while let Some(scan_chunk) = scan.next_chunk()? {
1372 let src_chunk = if !col_ids_src.is_empty() {
1374 let mut rnp = ReadNodeProps::new(
1375 SingleChunkSource::new(scan_chunk),
1376 Arc::clone(&store_arc),
1377 src_label_id,
1378 crate::chunk::COL_ID_SLOT,
1379 col_ids_src.clone(),
1380 );
1381 match rnp.next_chunk()? {
1382 Some(c) => c,
1383 None => continue,
1384 }
1385 } else {
1386 scan_chunk
1387 };
1388
1389 let src_chunk = if let Some(ref pred) = src_pred_opt {
1391 let pred = pred.clone();
1392 let keep: Vec<bool> = (0..src_chunk.len())
1393 .map(|i| pred.eval(&src_chunk, i))
1394 .collect();
1395 let mut c = src_chunk;
1396 c.filter_sel(|i| keep[i]);
1397 if c.live_len() == 0 {
1398 continue;
1399 }
1400 c
1401 } else {
1402 src_chunk
1403 };
1404
1405 let mut gn1 = GetNeighbors::new(
1407 SingleChunkSource::new(src_chunk.clone()),
1408 csr.clone(),
1409 &delta_records,
1410 src_label_id,
1411 avg_degree_hint,
1412 );
1413
1414 while let Some(hop1_chunk) = gn1.next_chunk()? {
1416 frontier.clear();
1421
1422 let accum_bytes = rows.len() * row_size_estimate + frontier.bytes_used();
1426 if accum_bytes > memory_limit {
1427 return Err(DbError::QueryMemoryExceeded);
1428 }
1429
1430 let mid_chunk = if !col_ids_mid.is_empty() {
1432 let mut rnp = ReadNodeProps::new(
1433 SingleChunkSource::new(hop1_chunk),
1434 Arc::clone(&store_arc),
1435 mid_label_id,
1436 COL_ID_DST_SLOT,
1437 col_ids_mid.clone(),
1438 );
1439 match rnp.next_chunk()? {
1440 Some(c) => c,
1441 None => continue,
1442 }
1443 } else {
1444 hop1_chunk
1445 };
1446
1447 let mid_chunk = if let Some(ref pred) = mid_pred_opt {
1449 let pred = pred.clone();
1450 let keep: Vec<bool> = (0..mid_chunk.len())
1451 .map(|i| pred.eval(&mid_chunk, i))
1452 .collect();
1453 let mut c = mid_chunk;
1454 c.filter_sel(|i| keep[i]);
1455 if c.live_len() == 0 {
1456 continue;
1457 }
1458 c
1459 } else {
1460 mid_chunk
1461 };
1462
1463 let mid_slot_col = mid_chunk.find_column(COL_ID_DST_SLOT);
1465 let hop1_src_col = mid_chunk.find_column(COL_ID_SRC_SLOT);
1466
1467 let live_pairs: Vec<(u64, u64)> = mid_chunk
1469 .live_rows()
1470 .map(|row_idx| {
1471 let mid_slot = mid_slot_col.map(|c| c.data[row_idx]).unwrap_or(0);
1472 let src_slot = hop1_src_col.map(|c| c.data[row_idx]).unwrap_or(0);
1473 (src_slot, mid_slot)
1474 })
1475 .collect();
1476
1477 for &(_, mid_slot) in &live_pairs {
1488 if frontier.visit(mid_slot) {
1489 frontier.current_mut().push(mid_slot);
1490 }
1491 }
1492
1493 let mid_slots_chunk = {
1496 let data: Vec<u64> = frontier.current().to_vec();
1497 let col =
1498 crate::chunk::ColumnVector::from_data(crate::chunk::COL_ID_SLOT, data);
1499 DataChunk::from_columns(vec![col])
1500 };
1501
1502 let mut gn2 = GetNeighbors::new(
1503 SingleChunkSource::new(mid_slots_chunk),
1504 csr.clone(),
1505 &delta_records,
1506 mid_label_id,
1507 avg_degree_hint,
1508 );
1509
1510 while let Some(hop2_chunk) = gn2.next_chunk()? {
1511 let dst_chunk = if !col_ids_dst.is_empty() {
1515 let mut rnp = ReadNodeProps::new(
1516 SingleChunkSource::new(hop2_chunk),
1517 Arc::clone(&store_arc),
1518 dst_label_id,
1519 COL_ID_DST_SLOT,
1520 col_ids_dst.clone(),
1521 );
1522 match rnp.next_chunk()? {
1523 Some(c) => c,
1524 None => continue,
1525 }
1526 } else {
1527 hop2_chunk
1528 };
1529
1530 let dst_chunk = if let Some(ref pred) = dst_pred_opt {
1532 let pred = pred.clone();
1533 let keep: Vec<bool> = (0..dst_chunk.len())
1534 .map(|i| pred.eval(&dst_chunk, i))
1535 .collect();
1536 let mut c = dst_chunk;
1537 c.filter_sel(|i| keep[i]);
1538 if c.live_len() == 0 {
1539 continue;
1540 }
1541 c
1542 } else {
1543 dst_chunk
1544 };
1545
1546 let hop2_src_col = dst_chunk.find_column(COL_ID_SRC_SLOT); let dst_slot_col = dst_chunk.find_column(COL_ID_DST_SLOT);
1552
1553 let src_slot_col_in_scan = src_chunk.find_column(crate::chunk::COL_ID_SLOT);
1554
1555 let src_index: std::collections::HashMap<u64, usize> = src_slot_col_in_scan
1558 .map(|sc| (0..sc.data.len()).map(|i| (sc.data[i], i)).collect())
1559 .unwrap_or_default();
1560
1561 let mid_index: std::collections::HashMap<u64, usize> = {
1562 let mid_slot_col_in_mid = mid_chunk.find_column(COL_ID_DST_SLOT);
1563 mid_slot_col_in_mid
1564 .map(|mc| (0..mc.data.len()).map(|i| (mc.data[i], i)).collect())
1565 .unwrap_or_default()
1566 };
1567
1568 for row_idx in dst_chunk.live_rows() {
1569 let dst_slot = dst_slot_col.map(|c| c.data[row_idx]).unwrap_or(0);
1570 let via_mid_slot = hop2_src_col.map(|c| c.data[row_idx]).unwrap_or(0);
1571
1572 for &(src_slot, mid_slot) in &live_pairs {
1574 if mid_slot != via_mid_slot {
1575 continue;
1576 }
1577
1578 let src_node = NodeId(((src_label_id as u64) << 32) | src_slot);
1581 let mid_node = NodeId(((mid_label_id as u64) << 32) | mid_slot);
1582 let dst_node = NodeId(((dst_label_id as u64) << 32) | dst_slot);
1583
1584 if self.is_node_tombstoned(src_node)
1586 || self.is_node_tombstoned(mid_node)
1587 || self.is_node_tombstoned(dst_node)
1588 {
1589 continue;
1590 }
1591
1592 let src_props = if src_slot_col_in_scan.is_some() {
1594 if let Some(&src_ri) = src_index.get(&src_slot) {
1595 build_props_from_chunk(&src_chunk, src_ri, &col_ids_src)
1596 } else {
1597 let nullable = self
1598 .snapshot
1599 .store
1600 .get_node_raw_nullable(src_node, &col_ids_src)?;
1601 nullable
1602 .into_iter()
1603 .filter_map(|(cid, opt)| opt.map(|v| (cid, v)))
1604 .collect()
1605 }
1606 } else {
1607 vec![]
1608 };
1609
1610 let mid_props: Vec<(u32, u64)> = if !col_ids_mid.is_empty() {
1612 if let Some(&mid_ri) = mid_index.get(&mid_slot) {
1613 build_props_from_chunk(&mid_chunk, mid_ri, &col_ids_mid)
1614 } else {
1615 let nullable = self
1616 .snapshot
1617 .store
1618 .get_node_raw_nullable(mid_node, &col_ids_mid)?;
1619 nullable
1620 .into_iter()
1621 .filter_map(|(cid, opt)| opt.map(|v| (cid, v)))
1622 .collect()
1623 }
1624 } else {
1625 vec![]
1626 };
1627
1628 let dst_props =
1630 build_props_from_chunk(&dst_chunk, row_idx, &col_ids_dst);
1631
1632 if let Some(ref where_expr) = m.where_clause {
1634 let mut row_vals = build_row_vals(
1635 &src_props,
1636 src_var,
1637 &col_ids_src,
1638 &self.snapshot.store,
1639 );
1640 row_vals.extend(build_row_vals(
1641 &mid_props,
1642 mid_var,
1643 &col_ids_mid,
1644 &self.snapshot.store,
1645 ));
1646 row_vals.extend(build_row_vals(
1647 &dst_props,
1648 dst_var,
1649 &col_ids_dst,
1650 &self.snapshot.store,
1651 ));
1652 row_vals.extend(self.dollar_params());
1653 if !self.eval_where_graph(where_expr, &row_vals) {
1654 continue;
1655 }
1656 }
1657
1658 let row = project_three_var_row(
1660 &src_props,
1661 &mid_props,
1662 &dst_props,
1663 column_names,
1664 src_var,
1665 mid_var,
1666 &self.snapshot.store,
1667 );
1668 rows.push(row);
1669
1670 if rows.len() * row_size_estimate > memory_limit {
1672 return Err(DbError::QueryMemoryExceeded);
1673 }
1674
1675 if let Some(lim) = limit {
1677 if rows.len() >= lim {
1678 break 'outer;
1679 }
1680 }
1681 }
1682 }
1683 }
1684 }
1685 }
1686
1687 Ok(QueryResult {
1688 columns: column_names.to_vec(),
1689 rows,
1690 })
1691 }
1692
1693 pub(crate) fn execute_scan_chunked(
1703 &self,
1704 m: &MatchStatement,
1705 column_names: &[String],
1706 ) -> Result<QueryResult> {
1707 use crate::pipeline::PipelineOperator;
1708
1709 let pat = &m.pattern[0];
1710 let node = &pat.nodes[0];
1711 let label = node.labels.first().cloned().unwrap_or_default();
1712
1713 let label_id = match self.snapshot.catalog.get_label(&label)? {
1715 Some(id) => id as u32,
1716 None => {
1717 return Ok(QueryResult {
1718 columns: column_names.to_vec(),
1719 rows: vec![],
1720 });
1721 }
1722 };
1723
1724 let hwm = self.snapshot.store.hwm_for_label(label_id)?;
1725 tracing::debug!(label = %label, hwm = hwm, "chunked pipeline: label scan");
1726
1727 let mut all_col_ids: Vec<u32> = collect_col_ids_from_columns(column_names);
1729 if let Some(ref wexpr) = m.where_clause {
1730 collect_col_ids_from_expr(wexpr, &mut all_col_ids);
1731 }
1732 for p in &node.props {
1733 let cid = col_id_of(&p.key);
1734 if !all_col_ids.contains(&cid) {
1735 all_col_ids.push(cid);
1736 }
1737 }
1738
1739 let var_name = node.var.as_str();
1740 let mut rows: Vec<Vec<Value>> = Vec::new();
1741
1742 let mut scan = ScanByLabel::new(hwm);
1749
1750 while let Some(chunk) = scan.next_chunk()? {
1751 for row_idx in chunk.live_rows() {
1753 let slot = chunk.column(0).data[row_idx];
1754 let node_id = NodeId(((label_id as u64) << 32) | slot);
1755
1756 if self.is_node_tombstoned(node_id) {
1758 continue;
1759 }
1760
1761 let nullable_props = self
1763 .snapshot
1764 .store
1765 .get_node_raw_nullable(node_id, &all_col_ids)?;
1766 let props: Vec<(u32, u64)> = nullable_props
1767 .iter()
1768 .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
1769 .collect();
1770
1771 if !self.matches_prop_filter(&props, &node.props) {
1773 continue;
1774 }
1775
1776 if let Some(ref where_expr) = m.where_clause {
1778 let mut row_vals =
1779 build_row_vals(&props, var_name, &all_col_ids, &self.snapshot.store);
1780 if !var_name.is_empty() && !label.is_empty() {
1781 row_vals.insert(
1782 format!("{}.__labels__", var_name),
1783 Value::List(vec![Value::String(label.clone())]),
1784 );
1785 }
1786 if !var_name.is_empty() {
1787 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
1788 }
1789 row_vals.extend(self.dollar_params());
1790 if !self.eval_where_graph(where_expr, &row_vals) {
1791 continue;
1792 }
1793 }
1794
1795 let row = project_row(
1797 &props,
1798 column_names,
1799 &all_col_ids,
1800 var_name,
1801 &label,
1802 &self.snapshot.store,
1803 Some(node_id),
1804 );
1805 rows.push(row);
1806 }
1807 }
1808
1809 Ok(QueryResult {
1810 columns: column_names.to_vec(),
1811 rows,
1812 })
1813 }
1814}
1815
1816struct SingleChunkSource {
1823 chunk: Option<DataChunk>,
1824}
1825
1826impl SingleChunkSource {
1827 fn new(chunk: DataChunk) -> Self {
1828 SingleChunkSource { chunk: Some(chunk) }
1829 }
1830}
1831
1832impl PipelineOperator for SingleChunkSource {
1833 fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
1834 Ok(self.chunk.take())
1835 }
1836}
1837
1838fn column_name_for_item(item: &ReturnItem) -> String {
1842 if let Some(ref alias) = item.alias {
1843 return alias.clone();
1844 }
1845 match &item.expr {
1847 Expr::PropAccess { var, prop } => format!("{}.{}", var, prop),
1848 Expr::Var(v) => v.clone(),
1849 _ => String::new(),
1850 }
1851}
1852
1853fn expr_references_var(expr: &Expr, var_name: &str) -> bool {
1867 match expr {
1868 Expr::PropAccess { var, .. } => var.as_str() == var_name,
1869 Expr::BinOp { left, right, .. } => {
1870 expr_references_var(left, var_name) || expr_references_var(right, var_name)
1871 }
1872 Expr::And(a, b) | Expr::Or(a, b) => {
1873 expr_references_var(a, var_name) || expr_references_var(b, var_name)
1874 }
1875 Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
1876 expr_references_var(inner, var_name)
1877 }
1878 _ => false,
1879 }
1880}
1881
1882fn is_simple_where_for_chunked(expr: &Expr) -> bool {
1883 match expr {
1884 Expr::BinOp { left, op, right } => {
1885 match op {
1886 BinOpKind::Contains | BinOpKind::StartsWith | BinOpKind::EndsWith => false,
1888 _ => is_simple_where_for_chunked(left) && is_simple_where_for_chunked(right),
1889 }
1890 }
1891 Expr::And(a, b) | Expr::Or(a, b) => {
1892 is_simple_where_for_chunked(a) && is_simple_where_for_chunked(b)
1893 }
1894 Expr::Not(inner) => is_simple_where_for_chunked(inner),
1895 Expr::IsNull(_) | Expr::IsNotNull(_) => true,
1896 Expr::PropAccess { .. } | Expr::Var(_) | Expr::Literal(_) => true,
1897 Expr::ExistsSubquery(_) | Expr::NotExists(_) | Expr::FnCall { .. } => false,
1899 _ => true,
1900 }
1901}
1902
1903fn try_compile_predicate(expr: &Expr, var_name: &str, _col_ids: &[u32]) -> Option<ChunkPredicate> {
1909 match expr {
1910 Expr::BinOp { left, op, right } => {
1911 let (prop_expr, lit_expr, swapped) = if matches!(right.as_ref(), Expr::Literal(_)) {
1913 (left.as_ref(), right.as_ref(), false)
1914 } else if matches!(left.as_ref(), Expr::Literal(_)) {
1915 (right.as_ref(), left.as_ref(), true)
1916 } else {
1917 return None;
1918 };
1919
1920 let (v, key) = match prop_expr {
1921 Expr::PropAccess { var, prop } => (var.as_str(), prop.as_str()),
1922 _ => return None,
1923 };
1924 if v != var_name {
1925 return None;
1926 }
1927 let col_id = col_id_of(key);
1928
1929 let rhs_raw = match lit_expr {
1930 Expr::Literal(lit) => literal_to_raw_u64(lit)?,
1931 _ => return None,
1932 };
1933
1934 let effective_op = if swapped {
1936 match op {
1937 BinOpKind::Lt => BinOpKind::Gt,
1938 BinOpKind::Le => BinOpKind::Ge,
1939 BinOpKind::Gt => BinOpKind::Lt,
1940 BinOpKind::Ge => BinOpKind::Le,
1941 other => other.clone(),
1942 }
1943 } else {
1944 op.clone()
1945 };
1946
1947 match effective_op {
1948 BinOpKind::Eq => Some(ChunkPredicate::Eq { col_id, rhs_raw }),
1949 BinOpKind::Neq => Some(ChunkPredicate::Ne { col_id, rhs_raw }),
1950 BinOpKind::Gt => Some(ChunkPredicate::Gt { col_id, rhs_raw }),
1951 BinOpKind::Ge => Some(ChunkPredicate::Ge { col_id, rhs_raw }),
1952 BinOpKind::Lt => Some(ChunkPredicate::Lt { col_id, rhs_raw }),
1953 BinOpKind::Le => Some(ChunkPredicate::Le { col_id, rhs_raw }),
1954 _ => None,
1955 }
1956 }
1957 Expr::IsNull(inner) => {
1958 if let Expr::PropAccess { var, prop } = inner.as_ref() {
1959 if var.as_str() == var_name {
1960 return Some(ChunkPredicate::IsNull {
1961 col_id: col_id_of(prop),
1962 });
1963 }
1964 }
1965 None
1966 }
1967 Expr::IsNotNull(inner) => {
1968 if let Expr::PropAccess { var, prop } = inner.as_ref() {
1969 if var.as_str() == var_name {
1970 return Some(ChunkPredicate::IsNotNull {
1971 col_id: col_id_of(prop),
1972 });
1973 }
1974 }
1975 None
1976 }
1977 Expr::And(a, b) => {
1978 let ca = try_compile_predicate(a, var_name, _col_ids);
1979 let cb = try_compile_predicate(b, var_name, _col_ids);
1980 match (ca, cb) {
1981 (Some(pa), Some(pb)) => Some(ChunkPredicate::And(vec![pa, pb])),
1982 _ => None,
1983 }
1984 }
1985 _ => None,
1986 }
1987}
1988
1989fn literal_to_raw_u64(lit: &Literal) -> Option<u64> {
1994 use sparrowdb_storage::node_store::Value as StoreValue;
1995 match lit {
1996 Literal::Int(n) => Some(StoreValue::Int64(*n).to_u64()),
1997 Literal::Bool(b) => Some(StoreValue::Int64(if *b { 1 } else { 0 }).to_u64()),
1998 Literal::String(_) | Literal::Float(_) | Literal::Null | Literal::Param(_) => None,
2000 }
2001}
2002
2003fn build_props_from_chunk(chunk: &DataChunk, row_idx: usize, col_ids: &[u32]) -> Vec<(u32, u64)> {
2008 col_ids
2009 .iter()
2010 .filter_map(|&cid| {
2011 let col = chunk.find_column(cid)?;
2012 if col.nulls.is_null(row_idx) {
2013 None
2014 } else {
2015 Some((cid, col.data[row_idx]))
2016 }
2017 })
2018 .collect()
2019}
2020
2021struct DstSlotProjector<C: PipelineOperator> {
2030 child: C,
2031}
2032
2033impl<C: PipelineOperator> DstSlotProjector<C> {
2034 fn new(child: C) -> Self {
2035 DstSlotProjector { child }
2036 }
2037}
2038
2039impl<C: PipelineOperator> PipelineOperator for DstSlotProjector<C> {
2040 fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
2041 use crate::chunk::ColumnVector;
2042
2043 loop {
2044 let chunk = match self.child.next_chunk()? {
2045 Some(c) => c,
2046 None => return Ok(None),
2047 };
2048
2049 if chunk.is_empty() {
2050 continue;
2051 }
2052
2053 let dst_col = match chunk.find_column(COL_ID_DST_SLOT) {
2055 Some(c) => c,
2056 None => continue,
2057 };
2058
2059 let data: Vec<u64> = chunk.live_rows().map(|i| dst_col.data[i]).collect();
2060 if data.is_empty() {
2061 continue;
2062 }
2063 let col = ColumnVector::from_data(crate::chunk::COL_ID_SLOT, data);
2064 return Ok(Some(DataChunk::from_columns(vec![col])));
2065 }
2066 }
2067}
2068
2069fn is_id_call(expr: &Expr, var_name: &str) -> bool {
2073 match expr {
2074 Expr::FnCall { name, args } => {
2075 name.eq_ignore_ascii_case("id")
2076 && args.len() == 1
2077 && matches!(&args[0], Expr::Var(v) if v.as_str() == var_name)
2078 }
2079 _ => false,
2080 }
2081}
2082
2083fn is_param_literal(expr: &Expr) -> bool {
2085 matches!(expr, Expr::Literal(Literal::Param(_)))
2086}
2087
2088fn where_is_only_id_param_conjuncts(expr: &Expr, a_var: &str, b_var: &str) -> bool {
2095 match expr {
2096 Expr::And(left, right) => {
2097 where_is_only_id_param_conjuncts(left, a_var, b_var)
2098 && where_is_only_id_param_conjuncts(right, a_var, b_var)
2099 }
2100 Expr::BinOp {
2101 left,
2102 op: BinOpKind::Eq,
2103 right,
2104 } => {
2105 (is_id_call(left, a_var) || is_id_call(left, b_var)) && is_param_literal(right)
2107 || is_param_literal(left) && (is_id_call(right, a_var) || is_id_call(right, b_var))
2108 }
2109 _ => false,
2110 }
2111}
2112
2113fn extract_id_param_slot(
2120 where_clause: Option<&Expr>,
2121 var_name: &str,
2122 params: &std::collections::HashMap<String, crate::types::Value>,
2123 expected_label_id: u32,
2124) -> Option<u64> {
2125 let wexpr = where_clause?;
2126 let param_name = find_id_param_name(wexpr, var_name)?;
2127 let val = params.get(¶m_name)?;
2128
2129 let raw_node_id: u64 = match val {
2131 crate::types::Value::Int64(n) => *n as u64,
2132 crate::types::Value::NodeRef(nid) => nid.0,
2133 _ => return None,
2134 };
2135
2136 let (label_id, slot) = super::node_id_parts(raw_node_id);
2137 if label_id != expected_label_id {
2138 return None;
2139 }
2140 Some(slot)
2141}
2142
2143fn find_id_param_name(expr: &Expr, var_name: &str) -> Option<String> {
2145 match expr {
2146 Expr::BinOp { left, op, right } => {
2147 if *op == BinOpKind::Eq {
2148 if is_id_call(left, var_name) {
2149 if let Expr::Literal(Literal::Param(p)) = right.as_ref() {
2150 return Some(p.clone());
2151 }
2152 }
2153 if is_id_call(right, var_name) {
2154 if let Expr::Literal(Literal::Param(p)) = left.as_ref() {
2155 return Some(p.clone());
2156 }
2157 }
2158 }
2159 find_id_param_name(left, var_name).or_else(|| find_id_param_name(right, var_name))
2160 }
2161 Expr::And(a, b) => {
2162 find_id_param_name(a, var_name).or_else(|| find_id_param_name(b, var_name))
2163 }
2164 _ => None,
2165 }
2166}
2167
2168fn find_slot_by_props(
2182 store: &NodeStore,
2183 label_id: u32,
2184 hwm: u64,
2185 props: &[sparrowdb_cypher::ast::PropEntry],
2186 params: &std::collections::HashMap<String, crate::types::Value>,
2187 prop_index: &PropertyIndex,
2188) -> Option<u64> {
2189 if props.is_empty() || hwm == 0 {
2190 return None;
2191 }
2192
2193 if let Some(slots) = try_index_lookup_for_props(props, label_id, prop_index) {
2195 return slots.into_iter().next().map(|s| s as u64);
2196 }
2197
2198 if props.len() == 1 {
2202 let filter = &props[0];
2203 let col_id = prop_name_to_col_id(&filter.key);
2204
2205 let target_raw_opt: Option<u64> = match &filter.value {
2207 Expr::Literal(Literal::Int(n)) => Some(StoreValue::Int64(*n).to_u64()),
2208 Expr::Literal(Literal::String(s)) if s.len() <= 7 => {
2209 Some(StoreValue::Bytes(s.as_bytes().to_vec()).to_u64())
2210 }
2211 _ => None,
2213 };
2214
2215 if let Some(target_raw) = target_raw_opt {
2216 let col_data = match store.read_col_all(label_id, col_id) {
2217 Ok(d) => d,
2218 Err(_) => return None,
2219 };
2220 let null_bitmap = store.read_null_bitmap_all(label_id, col_id).ok().flatten();
2221
2222 for (slot, &raw) in col_data.iter().enumerate().take(hwm as usize) {
2223 let is_present = match &null_bitmap {
2227 None => raw != 0,
2229 Some(bits) => bits.get(slot).copied().unwrap_or(false),
2231 };
2232 if !is_present {
2233 continue;
2234 }
2235 if raw != target_raw {
2236 continue;
2237 }
2238 return Some(slot as u64);
2239 }
2240 return None;
2241 }
2242 }
2243
2244 let col_ids: Vec<u32> = props.iter().map(|p| prop_name_to_col_id(&p.key)).collect();
2246 for slot in 0..hwm {
2247 let node_id = NodeId(((label_id as u64) << 32) | slot);
2248 let Ok(raw_props) = store.get_node_raw_nullable(node_id, &col_ids) else {
2249 continue;
2250 };
2251 let stored: Vec<(u32, u64)> = raw_props
2252 .into_iter()
2253 .filter_map(|(c, opt)| opt.map(|v| (c, v)))
2254 .collect();
2255 if matches_prop_filter_static(&stored, props, params, store) {
2256 return Some(slot);
2257 }
2258 }
2259 None
2260}