1use std::sync::Arc;
27
28use sparrowdb_common::NodeId;
29use sparrowdb_storage::edge_store::EdgeStore;
30
31use super::*;
32use crate::chunk::{DataChunk, COL_ID_DST_SLOT, COL_ID_SLOT, COL_ID_SRC_SLOT};
33use crate::pipeline::{
34 BfsArena, ChunkPredicate, GetNeighbors, PipelineOperator, ReadNodeProps, ScanByLabel,
35 SlotIntersect,
36};
37
38#[derive(Debug, Clone, PartialEq, Eq)]
50pub enum ChunkedPlan {
51 Scan,
53 OneHop,
55 TwoHop,
57 MutualNeighbors,
59}
60
61impl std::fmt::Display for ChunkedPlan {
62 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 match self {
64 ChunkedPlan::Scan => write!(f, "Scan"),
65 ChunkedPlan::OneHop => write!(f, "OneHop"),
66 ChunkedPlan::TwoHop => write!(f, "TwoHop"),
67 ChunkedPlan::MutualNeighbors => write!(f, "MutualNeighbors"),
68 }
69 }
70}
71
72impl Engine {
73 pub(crate) fn can_use_chunked_pipeline(&self, m: &MatchStatement) -> bool {
82 if !self.use_chunked_pipeline {
83 return false;
84 }
85 if m.pattern.len() != 1 || !m.pattern[0].rels.is_empty() {
86 return false;
87 }
88 if has_aggregate_in_return(&m.return_clause.items) {
89 return false;
90 }
91 if !m.order_by.is_empty() || m.skip.is_some() || m.limit.is_some() {
92 return false;
93 }
94 !m.pattern[0].nodes[0].labels.is_empty()
95 }
96
97 pub(crate) fn can_use_one_hop_chunked(&self, m: &MatchStatement) -> bool {
110 use sparrowdb_cypher::ast::EdgeDir;
111
112 if !self.use_chunked_pipeline {
113 return false;
114 }
115 if m.pattern.len() != 1 {
117 return false;
118 }
119 let pat = &m.pattern[0];
120 if pat.rels.len() != 1 || pat.nodes.len() != 2 {
121 return false;
122 }
123 if pat.nodes[0].labels.len() != 1 || pat.nodes[1].labels.len() != 1 {
125 return false;
126 }
127 let dir = &pat.rels[0].dir;
129 if *dir != EdgeDir::Outgoing && *dir != EdgeDir::Incoming {
130 return false;
131 }
132 if has_aggregate_in_return(&m.return_clause.items) {
134 return false;
135 }
136 if m.distinct {
138 return false;
139 }
140 if !m.order_by.is_empty() {
142 return false;
143 }
144 if pat.rels[0].min_hops.is_some() {
146 return false;
147 }
148 let rel_var = &pat.rels[0].var;
153 if !rel_var.is_empty() {
154 let ref_in_return = m.return_clause.items.iter().any(|item| {
155 column_name_for_item(item)
156 .split_once('.')
157 .is_some_and(|(v, _)| v == rel_var.as_str())
158 });
159 if ref_in_return {
160 return false;
161 }
162 if let Some(ref wexpr) = m.where_clause {
164 if expr_references_var(wexpr, rel_var.as_str()) {
165 return false;
166 }
167 }
168 }
169 if let Some(ref wexpr) = m.where_clause {
171 if !is_simple_where_for_chunked(wexpr) {
172 return false;
173 }
174 }
175 let src_label = pat.nodes[0].labels.first().cloned().unwrap_or_default();
177 let dst_label = pat.nodes[1].labels.first().cloned().unwrap_or_default();
178 let rel_type = pat.rels[0].rel_type.clone();
179 let n_tables = self
180 .snapshot
181 .catalog
182 .list_rel_tables_with_ids()
183 .into_iter()
184 .filter(|(_, sid, did, rt)| {
185 let type_ok = rel_type.is_empty() || rt == &rel_type;
186 let src_ok = self
187 .snapshot
188 .catalog
189 .get_label(&src_label)
190 .ok()
191 .flatten()
192 .map(|id| id as u32 == *sid as u32)
193 .unwrap_or(false);
194 let dst_ok = self
195 .snapshot
196 .catalog
197 .get_label(&dst_label)
198 .ok()
199 .flatten()
200 .map(|id| id as u32 == *did as u32)
201 .unwrap_or(false);
202 type_ok && src_ok && dst_ok
203 })
204 .count();
205 n_tables == 1
206 }
207
208 pub(crate) fn execute_one_hop_chunked(
224 &self,
225 m: &MatchStatement,
226 column_names: &[String],
227 ) -> Result<QueryResult> {
228 use sparrowdb_cypher::ast::EdgeDir;
229
230 let pat = &m.pattern[0];
231 let rel_pat = &pat.rels[0];
232 let dir = &rel_pat.dir;
233
234 let (src_node_pat, dst_node_pat, swapped) = if *dir == EdgeDir::Incoming {
237 (&pat.nodes[1], &pat.nodes[0], true)
238 } else {
239 (&pat.nodes[0], &pat.nodes[1], false)
240 };
241
242 let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
243 let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
244 let rel_type = rel_pat.rel_type.clone();
245
246 let src_label_id = match self.snapshot.catalog.get_label(&src_label)? {
248 Some(id) => id as u32,
249 None => {
250 return Ok(QueryResult {
251 columns: column_names.to_vec(),
252 rows: vec![],
253 });
254 }
255 };
256 let dst_label_id = match self.snapshot.catalog.get_label(&dst_label)? {
257 Some(id) => id as u32,
258 None => {
259 return Ok(QueryResult {
260 columns: column_names.to_vec(),
261 rows: vec![],
262 });
263 }
264 };
265
266 let (catalog_rel_id, _) = self
268 .snapshot
269 .catalog
270 .list_rel_tables_with_ids()
271 .into_iter()
272 .find(|(_, sid, did, rt)| {
273 let type_ok = rel_type.is_empty() || rt == &rel_type;
274 let src_ok = *sid as u32 == src_label_id;
275 let dst_ok = *did as u32 == dst_label_id;
276 type_ok && src_ok && dst_ok
277 })
278 .map(|(cid, sid, did, rt)| (cid as u32, (sid, did, rt)))
279 .ok_or_else(|| {
280 sparrowdb_common::Error::InvalidArgument(
281 "no matching relationship table found".into(),
282 )
283 })?;
284
285 let hwm_src = self.snapshot.store.hwm_for_label(src_label_id).unwrap_or(0);
286 tracing::debug!(
287 engine = "chunked",
288 src_label = %src_label,
289 dst_label = %dst_label,
290 rel_type = %rel_type,
291 hwm_src,
292 "executing via chunked pipeline (1-hop)"
293 );
294
295 let src_var = src_node_pat.var.as_str();
298 let dst_var = dst_node_pat.var.as_str();
299
300 let (query_src_var, query_dst_var) = if swapped {
303 (dst_node_pat.var.as_str(), src_node_pat.var.as_str())
304 } else {
305 (src_var, dst_var)
306 };
307
308 let mut col_ids_src = collect_col_ids_for_var(query_src_var, column_names, src_label_id);
309 let mut col_ids_dst = collect_col_ids_for_var(query_dst_var, column_names, dst_label_id);
310
311 if let Some(ref wexpr) = m.where_clause {
313 collect_col_ids_from_expr_for_var(wexpr, query_src_var, &mut col_ids_src);
314 collect_col_ids_from_expr_for_var(wexpr, query_dst_var, &mut col_ids_dst);
315 }
316 for p in &src_node_pat.props {
318 let cid = col_id_of(&p.key);
319 if !col_ids_src.contains(&cid) {
320 col_ids_src.push(cid);
321 }
322 }
323 for p in &dst_node_pat.props {
324 let cid = col_id_of(&p.key);
325 if !col_ids_dst.contains(&cid) {
326 col_ids_dst.push(cid);
327 }
328 }
329
330 let delta_records = {
332 let edge_store = EdgeStore::open(
333 &self.snapshot.db_root,
334 sparrowdb_storage::edge_store::RelTableId(catalog_rel_id),
335 );
336 edge_store.and_then(|s| s.read_delta()).unwrap_or_default()
337 };
338
339 let csr = self
341 .snapshot
342 .csrs
343 .get(&catalog_rel_id)
344 .cloned()
345 .unwrap_or_else(|| sparrowdb_storage::csr::CsrForward::build(0, &[]));
346
347 let avg_degree_hint = self
349 .snapshot
350 .rel_degree_stats()
351 .get(&catalog_rel_id)
352 .map(|s| s.mean().ceil() as usize)
353 .unwrap_or(8);
354
355 let src_pred_opt = m
357 .where_clause
358 .as_ref()
359 .and_then(|wexpr| try_compile_predicate(wexpr, query_src_var, &col_ids_src));
360 let dst_pred_opt = m
361 .where_clause
362 .as_ref()
363 .and_then(|wexpr| try_compile_predicate(wexpr, query_dst_var, &col_ids_dst));
364
365 let store_arc = Arc::new(NodeStore::open(self.snapshot.store.root_path())?);
366
367 let limit = m.limit.map(|l| l as usize);
373 let mut rows: Vec<Vec<Value>> = Vec::new();
374
375 let mut scan = ScanByLabel::new(hwm_src);
381
382 'outer: while let Some(scan_chunk) = scan.next_chunk()? {
383 let src_chunk = if !col_ids_src.is_empty() {
387 let mut rnp = ReadNodeProps::new(
388 SingleChunkSource::new(scan_chunk),
389 Arc::clone(&store_arc),
390 src_label_id,
391 crate::chunk::COL_ID_SLOT,
392 col_ids_src.clone(),
393 );
394 match rnp.next_chunk()? {
395 Some(c) => c,
396 None => continue,
397 }
398 } else {
399 scan_chunk
400 };
401
402 let src_chunk = if let Some(ref pred) = src_pred_opt {
404 let pred = pred.clone();
405 let keep: Vec<bool> = {
406 (0..src_chunk.len())
407 .map(|i| pred.eval(&src_chunk, i))
408 .collect()
409 };
410 let mut c = src_chunk;
411 c.filter_sel(|i| keep[i]);
412 if c.live_len() == 0 {
413 continue;
414 }
415 c
416 } else {
417 src_chunk
418 };
419
420 let mut gn = GetNeighbors::new(
422 SingleChunkSource::new(src_chunk.clone()),
423 csr.clone(),
424 &delta_records,
425 src_label_id,
426 avg_degree_hint,
427 );
428
429 while let Some(hop_chunk) = gn.next_chunk()? {
430 let dst_chunk = if !col_ids_dst.is_empty() {
434 let mut rnp = ReadNodeProps::new(
435 SingleChunkSource::new(hop_chunk),
436 Arc::clone(&store_arc),
437 dst_label_id,
438 COL_ID_DST_SLOT,
439 col_ids_dst.clone(),
440 );
441 match rnp.next_chunk()? {
442 Some(c) => c,
443 None => continue,
444 }
445 } else {
446 hop_chunk
447 };
448
449 let dst_chunk = if let Some(ref pred) = dst_pred_opt {
451 let pred = pred.clone();
452 let keep: Vec<bool> = (0..dst_chunk.len())
453 .map(|i| pred.eval(&dst_chunk, i))
454 .collect();
455 let mut c = dst_chunk;
456 c.filter_sel(|i| keep[i]);
457 if c.live_len() == 0 {
458 continue;
459 }
460 c
461 } else {
462 dst_chunk
463 };
464
465 let src_slot_col = src_chunk.find_column(crate::chunk::COL_ID_SLOT);
467 let dst_slot_col = dst_chunk.find_column(COL_ID_DST_SLOT);
468 let hop_src_col = dst_chunk.find_column(COL_ID_SRC_SLOT);
469
470 for row_idx in dst_chunk.live_rows() {
471 let dst_slot = dst_slot_col.map(|c| c.data[row_idx]).unwrap_or(0);
472 let hop_src_slot = hop_src_col.map(|c| c.data[row_idx]).unwrap_or(0);
473
474 let src_node = NodeId(((src_label_id as u64) << 32) | hop_src_slot);
476 let dst_node = NodeId(((dst_label_id as u64) << 32) | dst_slot);
477 if self.is_node_tombstoned(src_node) || self.is_node_tombstoned(dst_node) {
478 continue;
479 }
480
481 let src_props = if let Some(sc) = src_slot_col {
486 let src_row = (0..sc.data.len()).find(|&i| sc.data[i] == hop_src_slot);
488 if let Some(src_ri) = src_row {
489 build_props_from_chunk(&src_chunk, src_ri, &col_ids_src)
490 } else {
491 let nullable = self
493 .snapshot
494 .store
495 .get_node_raw_nullable(src_node, &col_ids_src)?;
496 nullable
497 .into_iter()
498 .filter_map(|(cid, opt)| opt.map(|v| (cid, v)))
499 .collect()
500 }
501 } else {
502 vec![]
503 };
504
505 let dst_props = build_props_from_chunk(&dst_chunk, row_idx, &col_ids_dst);
506
507 if let Some(ref where_expr) = m.where_clause {
510 let (actual_src_var, actual_dst_var) = if swapped {
512 (dst_node_pat.var.as_str(), src_node_pat.var.as_str())
513 } else {
514 (src_node_pat.var.as_str(), dst_node_pat.var.as_str())
515 };
516 let (actual_src_props, actual_dst_props) = if swapped {
517 (&dst_props, &src_props)
518 } else {
519 (&src_props, &dst_props)
520 };
521 let mut row_vals = build_row_vals(
522 actual_src_props,
523 actual_src_var,
524 &col_ids_src,
525 &self.snapshot.store,
526 );
527 row_vals.extend(build_row_vals(
528 actual_dst_props,
529 actual_dst_var,
530 &col_ids_dst,
531 &self.snapshot.store,
532 ));
533 row_vals.extend(self.dollar_params());
534 if !self.eval_where_graph(where_expr, &row_vals) {
535 continue;
536 }
537 }
538
539 let (proj_src_props, proj_dst_props) = if swapped {
541 (&dst_props as &[(u32, u64)], &src_props as &[(u32, u64)])
542 } else {
543 (&src_props as &[(u32, u64)], &dst_props as &[(u32, u64)])
544 };
545 let (proj_src_var, proj_dst_var, proj_src_label, proj_dst_label) = if swapped {
546 (
547 dst_node_pat.var.as_str(),
548 src_node_pat.var.as_str(),
549 dst_label.as_str(),
550 src_label.as_str(),
551 )
552 } else {
553 (
554 src_node_pat.var.as_str(),
555 dst_node_pat.var.as_str(),
556 src_label.as_str(),
557 dst_label.as_str(),
558 )
559 };
560
561 let row = project_hop_row(
562 proj_src_props,
563 proj_dst_props,
564 column_names,
565 proj_src_var,
566 proj_dst_var,
567 None, Some((proj_src_var, proj_src_label)),
569 Some((proj_dst_var, proj_dst_label)),
570 &self.snapshot.store,
571 None, );
573 rows.push(row);
574
575 if let Some(lim) = limit {
577 if rows.len() >= lim {
578 break 'outer;
579 }
580 }
581 }
582 }
583 }
584
585 Ok(QueryResult {
586 columns: column_names.to_vec(),
587 rows,
588 })
589 }
590
591 pub fn try_plan_chunked_match(&self, m: &MatchStatement) -> Option<ChunkedPlan> {
602 if self.can_use_mutual_neighbors_chunked(m) {
604 return Some(ChunkedPlan::MutualNeighbors);
605 }
606 if self.can_use_two_hop_chunked(m) {
607 return Some(ChunkedPlan::TwoHop);
608 }
609 if self.can_use_one_hop_chunked(m) {
610 return Some(ChunkedPlan::OneHop);
611 }
612 if self.can_use_chunked_pipeline(m) {
613 return Some(ChunkedPlan::Scan);
614 }
615 None
616 }
617
618 pub(crate) fn can_use_mutual_neighbors_chunked(&self, m: &MatchStatement) -> bool {
641 use sparrowdb_cypher::ast::EdgeDir;
642
643 if !self.use_chunked_pipeline {
644 return false;
645 }
646 if m.pattern.len() != 1 {
648 return false;
649 }
650 let pat = &m.pattern[0];
651 if pat.rels.len() != 2 || pat.nodes.len() != 3 {
652 return false;
653 }
654 if pat.rels[0].dir != EdgeDir::Outgoing || pat.rels[1].dir != EdgeDir::Incoming {
656 return false;
657 }
658 if pat.rels[0].min_hops.is_some() || pat.rels[1].min_hops.is_some() {
660 return false;
661 }
662 if pat.rels[0].rel_type != pat.rels[1].rel_type {
664 return false;
665 }
666 if pat.nodes[0].labels.len() != 1
668 || pat.nodes[1].labels.len() != 1
669 || pat.nodes[2].labels.len() != 1
670 {
671 return false;
672 }
673 if pat.nodes[0].labels[0] != pat.nodes[1].labels[0]
674 || pat.nodes[1].labels[0] != pat.nodes[2].labels[0]
675 {
676 return false;
677 }
678 if has_aggregate_in_return(&m.return_clause.items) {
680 return false;
681 }
682 if m.distinct {
684 return false;
685 }
686 if !m.order_by.is_empty() {
688 return false;
689 }
690 for rel in &pat.rels {
692 if !rel.var.is_empty() {
693 let ref_in_return = m.return_clause.items.iter().any(|item| {
694 column_name_for_item(item)
695 .split_once('.')
696 .is_some_and(|(v, _)| v == rel.var.as_str())
697 });
698 if ref_in_return {
699 return false;
700 }
701 if let Some(ref wexpr) = m.where_clause {
702 if expr_references_var(wexpr, rel.var.as_str()) {
703 return false;
704 }
705 }
706 }
707 }
708 let a_var = pat.nodes[0].var.as_str();
713 let b_var = pat.nodes[2].var.as_str();
714 match m.where_clause.as_ref() {
715 None => return false,
716 Some(wexpr) => {
717 if !where_is_only_id_param_conjuncts(wexpr, a_var, b_var) {
718 return false;
719 }
720 }
721 }
722 let label = pat.nodes[0].labels[0].clone();
724 let rel_type = &pat.rels[0].rel_type;
725 let catalog = &self.snapshot.catalog;
726 let tables = catalog.list_rel_tables_with_ids();
727 let label_id_opt = catalog.get_label(&label).ok().flatten();
728 let label_id = match label_id_opt {
729 Some(id) => id as u32,
730 None => return false,
731 };
732 let has_table = tables.iter().any(|(_, sid, did, rt)| {
733 let type_ok = rel_type.is_empty() || rt == rel_type;
734 let endpoint_ok = *sid as u32 == label_id && *did as u32 == label_id;
735 type_ok && endpoint_ok
736 });
737 has_table
738 }
739
740 pub(crate) fn execute_mutual_neighbors_chunked(
752 &self,
753 m: &MatchStatement,
754 column_names: &[String],
755 ) -> Result<QueryResult> {
756 let pat = &m.pattern[0];
757 let a_node_pat = &pat.nodes[0];
758 let x_node_pat = &pat.nodes[1];
759 let b_node_pat = &pat.nodes[2];
760
761 let label = a_node_pat.labels[0].clone();
762 let rel_type = pat.rels[0].rel_type.clone();
763
764 let label_id = match self.snapshot.catalog.get_label(&label)? {
765 Some(id) => id as u32,
766 None => {
767 return Ok(QueryResult {
768 columns: column_names.to_vec(),
769 rows: vec![],
770 });
771 }
772 };
773
774 let catalog_rel_id = self
776 .snapshot
777 .catalog
778 .list_rel_tables_with_ids()
779 .into_iter()
780 .find(|(_, sid, did, rt)| {
781 let type_ok = rel_type.is_empty() || rt == &rel_type;
782 let endpoint_ok = *sid as u32 == label_id && *did as u32 == label_id;
783 type_ok && endpoint_ok
784 })
785 .map(|(cid, _, _, _)| cid as u32)
786 .ok_or_else(|| {
787 sparrowdb_common::Error::InvalidArgument(
788 "no matching relationship table for mutual-neighbors".into(),
789 )
790 })?;
791
792 let a_var = a_node_pat.var.as_str();
794 let b_var = b_node_pat.var.as_str();
795 let a_slot = extract_id_param_slot(m.where_clause.as_ref(), a_var, &self.params, label_id);
796 let b_slot = extract_id_param_slot(m.where_clause.as_ref(), b_var, &self.params, label_id);
797
798 let (a_slot, b_slot) = match (a_slot, b_slot) {
799 (Some(a), Some(b)) => (a, b),
800 _ => {
801 return Ok(QueryResult {
803 columns: column_names.to_vec(),
804 rows: vec![],
805 });
806 }
807 };
808
809 if a_slot == b_slot {
813 return Ok(QueryResult {
814 columns: column_names.to_vec(),
815 rows: vec![],
816 });
817 }
818
819 tracing::debug!(
820 engine = "chunked",
821 plan = %ChunkedPlan::MutualNeighbors,
822 label = %label,
823 rel_type = %rel_type,
824 a_slot,
825 b_slot,
826 "executing via chunked pipeline"
827 );
828
829 let csr = self
830 .snapshot
831 .csrs
832 .get(&catalog_rel_id)
833 .cloned()
834 .unwrap_or_else(|| sparrowdb_storage::csr::CsrForward::build(0, &[]));
835
836 let delta_records = {
837 let edge_store = sparrowdb_storage::edge_store::EdgeStore::open(
838 &self.snapshot.db_root,
839 sparrowdb_storage::edge_store::RelTableId(catalog_rel_id),
840 );
841 edge_store.and_then(|s| s.read_delta()).unwrap_or_default()
842 };
843
844 let a_scan = ScanByLabel::from_slots(vec![a_slot]);
846 let a_neighbors = GetNeighbors::new(a_scan, csr.clone(), &delta_records, label_id, 8);
847
848 let b_scan = ScanByLabel::from_slots(vec![b_slot]);
849 let b_neighbors = GetNeighbors::new(b_scan, csr, &delta_records, label_id, 8);
850
851 let a_proj = DstSlotProjector::new(a_neighbors);
854 let b_proj = DstSlotProjector::new(b_neighbors);
855
856 let spill_threshold = 64 * 1024; let mut intersect =
859 SlotIntersect::new(a_proj, b_proj, COL_ID_SLOT, COL_ID_SLOT, spill_threshold);
860
861 let mut common_slots: Vec<u64> = Vec::new();
863 while let Some(chunk) = intersect.next_chunk()? {
864 if let Some(col) = chunk.find_column(COL_ID_SLOT) {
865 for row_idx in chunk.live_rows() {
866 common_slots.push(col.data[row_idx]);
867 }
868 }
869 }
870
871 let x_var = x_node_pat.var.as_str();
873 let mut col_ids_x = collect_col_ids_for_var(x_var, column_names, label_id);
874 if let Some(ref wexpr) = m.where_clause {
875 collect_col_ids_from_expr_for_var(wexpr, x_var, &mut col_ids_x);
876 }
877 for p in &x_node_pat.props {
878 let cid = col_id_of(&p.key);
879 if !col_ids_x.contains(&cid) {
880 col_ids_x.push(cid);
881 }
882 }
883
884 let store_arc = Arc::new(sparrowdb_storage::node_store::NodeStore::open(
885 self.snapshot.store.root_path(),
886 )?);
887
888 let limit = m.limit.map(|l| l as usize);
889 let mut rows: Vec<Vec<Value>> = Vec::new();
890
891 'outer: for x_slot in common_slots {
892 let x_node_id = NodeId(((label_id as u64) << 32) | x_slot);
893
894 if self.is_node_tombstoned(x_node_id) {
896 continue;
897 }
898
899 let x_props: Vec<(u32, u64)> = if !col_ids_x.is_empty() {
901 let nullable = store_arc.batch_read_node_props_nullable(
902 label_id,
903 &[x_slot as u32],
904 &col_ids_x,
905 )?;
906 if nullable.is_empty() {
907 vec![]
908 } else {
909 col_ids_x
910 .iter()
911 .enumerate()
912 .filter_map(|(i, &cid)| nullable[0][i].map(|v| (cid, v)))
913 .collect()
914 }
915 } else {
916 vec![]
917 };
918
919 if let Some(ref where_expr) = m.where_clause {
921 let mut row_vals =
922 build_row_vals(&x_props, x_var, &col_ids_x, &self.snapshot.store);
923 if !a_var.is_empty() {
925 let a_node_id = NodeId(((label_id as u64) << 32) | a_slot);
926 row_vals.insert(a_var.to_string(), Value::NodeRef(a_node_id));
927 }
928 if !b_var.is_empty() {
929 let b_node_id = NodeId(((label_id as u64) << 32) | b_slot);
930 row_vals.insert(b_var.to_string(), Value::NodeRef(b_node_id));
931 }
932 row_vals.extend(self.dollar_params());
933 if !self.eval_where_graph(where_expr, &row_vals) {
934 continue;
935 }
936 }
937
938 let row = project_row(
940 &x_props,
941 column_names,
942 &col_ids_x,
943 x_var,
944 &label,
945 &self.snapshot.store,
946 );
947 rows.push(row);
948
949 if let Some(lim) = limit {
950 if rows.len() >= lim {
951 break 'outer;
952 }
953 }
954 }
955
956 Ok(QueryResult {
957 columns: column_names.to_vec(),
958 rows,
959 })
960 }
961
962 pub(crate) fn can_use_two_hop_chunked(&self, m: &MatchStatement) -> bool {
974 use sparrowdb_cypher::ast::EdgeDir;
975
976 if !self.use_chunked_pipeline {
977 return false;
978 }
979 if m.pattern.len() != 1 {
981 return false;
982 }
983 let pat = &m.pattern[0];
984 if pat.rels.len() != 2 || pat.nodes.len() != 3 {
985 return false;
986 }
987 if pat.rels[0].dir != EdgeDir::Outgoing || pat.rels[1].dir != EdgeDir::Outgoing {
989 return false;
990 }
991 if pat.rels[0].min_hops.is_some() || pat.rels[1].min_hops.is_some() {
993 return false;
994 }
995 if has_aggregate_in_return(&m.return_clause.items) {
997 return false;
998 }
999 if m.distinct {
1001 return false;
1002 }
1003 if !m.order_by.is_empty() {
1005 return false;
1006 }
1007 for rel in &pat.rels {
1009 if !rel.var.is_empty() {
1010 let ref_in_return = m.return_clause.items.iter().any(|item| {
1011 column_name_for_item(item)
1012 .split_once('.')
1013 .is_some_and(|(v, _)| v == rel.var.as_str())
1014 });
1015 if ref_in_return {
1016 return false;
1017 }
1018 if let Some(ref wexpr) = m.where_clause {
1019 if expr_references_var(wexpr, rel.var.as_str()) {
1020 return false;
1021 }
1022 }
1023 }
1024 }
1025 if let Some(ref wexpr) = m.where_clause {
1027 if !is_simple_where_for_chunked(wexpr) {
1028 return false;
1029 }
1030 }
1031 let src_label = pat.nodes[0].labels.first().cloned().unwrap_or_default();
1033 let mid_label = pat.nodes[1].labels.first().cloned().unwrap_or_default();
1034 let dst_label = pat.nodes[2].labels.first().cloned().unwrap_or_default();
1035 let rel_type1 = &pat.rels[0].rel_type;
1036 let rel_type2 = &pat.rels[1].rel_type;
1037
1038 if rel_type1 != rel_type2 {
1042 return false;
1043 }
1044
1045 let catalog = &self.snapshot.catalog;
1047 let tables = catalog.list_rel_tables_with_ids();
1048
1049 let hop1_matches: Vec<_> = tables
1050 .iter()
1051 .filter(|(_, sid, did, rt)| {
1052 let type_ok = rel_type1.is_empty() || rt == rel_type1;
1053 let src_ok = catalog
1054 .get_label(&src_label)
1055 .ok()
1056 .flatten()
1057 .map(|id| id as u32 == *sid as u32)
1058 .unwrap_or(false);
1059 let mid_ok = catalog
1060 .get_label(&mid_label)
1061 .ok()
1062 .flatten()
1063 .map(|id| id as u32 == *did as u32)
1064 .unwrap_or(false);
1065 type_ok && src_ok && mid_ok
1066 })
1067 .collect();
1068
1069 let n_tables = hop1_matches.len();
1071 if n_tables != 1 {
1072 return false;
1073 }
1074
1075 let hop2_id = tables.iter().find(|(_, sid, did, rt)| {
1076 let type_ok = rel_type2.is_empty() || rt == rel_type2;
1077 let mid_ok = catalog
1078 .get_label(&mid_label)
1079 .ok()
1080 .flatten()
1081 .map(|id| id as u32 == *sid as u32)
1082 .unwrap_or(false);
1083 let dst_ok = catalog
1084 .get_label(&dst_label)
1085 .ok()
1086 .flatten()
1087 .map(|id| id as u32 == *did as u32)
1088 .unwrap_or(false);
1089 type_ok && mid_ok && dst_ok
1090 });
1091
1092 match (hop1_matches.first(), hop2_id) {
1094 (Some((id1, _, _, _)), Some((id2, _, _, _))) => id1 == id2,
1095 _ => false,
1096 }
1097 }
1098
1099 pub(crate) fn execute_two_hop_chunked(
1121 &self,
1122 m: &MatchStatement,
1123 column_names: &[String],
1124 ) -> Result<QueryResult> {
1125 use sparrowdb_common::Error as DbError;
1126
1127 let pat = &m.pattern[0];
1128 let src_node_pat = &pat.nodes[0];
1129 let mid_node_pat = &pat.nodes[1];
1130 let dst_node_pat = &pat.nodes[2];
1131
1132 let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
1133 let mid_label = mid_node_pat.labels.first().cloned().unwrap_or_default();
1134 let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
1135 let rel_type = pat.rels[0].rel_type.clone();
1136
1137 let src_label_id = match self.snapshot.catalog.get_label(&src_label)? {
1139 Some(id) => id as u32,
1140 None => {
1141 return Ok(QueryResult {
1142 columns: column_names.to_vec(),
1143 rows: vec![],
1144 });
1145 }
1146 };
1147 let mid_label_id = if mid_label.is_empty() {
1148 src_label_id
1149 } else {
1150 match self.snapshot.catalog.get_label(&mid_label)? {
1151 Some(id) => id as u32,
1152 None => {
1153 return Ok(QueryResult {
1154 columns: column_names.to_vec(),
1155 rows: vec![],
1156 });
1157 }
1158 }
1159 };
1160 let dst_label_id = match self.snapshot.catalog.get_label(&dst_label)? {
1161 Some(id) => id as u32,
1162 None => {
1163 return Ok(QueryResult {
1164 columns: column_names.to_vec(),
1165 rows: vec![],
1166 });
1167 }
1168 };
1169
1170 let catalog_rel_id = self
1172 .snapshot
1173 .catalog
1174 .list_rel_tables_with_ids()
1175 .into_iter()
1176 .find(|(_, sid, did, rt)| {
1177 let type_ok = rel_type.is_empty() || rt == &rel_type;
1178 let src_ok = *sid as u32 == src_label_id;
1179 let mid_ok = *did as u32 == mid_label_id;
1180 type_ok && src_ok && mid_ok
1181 })
1182 .map(|(cid, _, _, _)| cid as u32)
1183 .ok_or_else(|| {
1184 sparrowdb_common::Error::InvalidArgument(
1185 "no matching relationship table found for 2-hop".into(),
1186 )
1187 })?;
1188
1189 let hwm_src = self.snapshot.store.hwm_for_label(src_label_id).unwrap_or(0);
1190 tracing::debug!(
1191 engine = "chunked",
1192 src_label = %src_label,
1193 mid_label = %mid_label,
1194 dst_label = %dst_label,
1195 rel_type = %rel_type,
1196 hwm_src,
1197 "executing via chunked pipeline (2-hop)"
1198 );
1199
1200 let src_var = src_node_pat.var.as_str();
1202 let mid_var = mid_node_pat.var.as_str();
1203 let dst_var = dst_node_pat.var.as_str();
1204
1205 let mut col_ids_src = collect_col_ids_for_var(src_var, column_names, src_label_id);
1208 let mut col_ids_dst = collect_col_ids_for_var(dst_var, column_names, dst_label_id);
1209
1210 let mut col_ids_mid: Vec<u32> = vec![];
1212
1213 if let Some(ref wexpr) = m.where_clause {
1214 collect_col_ids_from_expr_for_var(wexpr, src_var, &mut col_ids_src);
1215 collect_col_ids_from_expr_for_var(wexpr, dst_var, &mut col_ids_dst);
1216 collect_col_ids_from_expr_for_var(wexpr, mid_var, &mut col_ids_mid);
1217 }
1218 for p in &src_node_pat.props {
1220 let cid = sparrowdb_common::col_id_of(&p.key);
1221 if !col_ids_src.contains(&cid) {
1222 col_ids_src.push(cid);
1223 }
1224 }
1225 for p in &mid_node_pat.props {
1226 let cid = sparrowdb_common::col_id_of(&p.key);
1227 if !col_ids_mid.contains(&cid) {
1228 col_ids_mid.push(cid);
1229 }
1230 }
1231 for p in &dst_node_pat.props {
1232 let cid = sparrowdb_common::col_id_of(&p.key);
1233 if !col_ids_dst.contains(&cid) {
1234 col_ids_dst.push(cid);
1235 }
1236 }
1237 if !mid_var.is_empty() {
1239 let mid_return_ids = collect_col_ids_for_var(mid_var, column_names, mid_label_id);
1240 for cid in mid_return_ids {
1241 if !col_ids_mid.contains(&cid) {
1242 col_ids_mid.push(cid);
1243 }
1244 }
1245 }
1246
1247 let delta_records = {
1249 let edge_store = sparrowdb_storage::edge_store::EdgeStore::open(
1250 &self.snapshot.db_root,
1251 sparrowdb_storage::edge_store::RelTableId(catalog_rel_id),
1252 );
1253 edge_store.and_then(|s| s.read_delta()).unwrap_or_default()
1254 };
1255
1256 let csr = self
1258 .snapshot
1259 .csrs
1260 .get(&catalog_rel_id)
1261 .cloned()
1262 .unwrap_or_else(|| sparrowdb_storage::csr::CsrForward::build(0, &[]));
1263
1264 let avg_degree_hint = self
1265 .snapshot
1266 .rel_degree_stats()
1267 .get(&catalog_rel_id)
1268 .map(|s| s.mean().ceil() as usize)
1269 .unwrap_or(8);
1270
1271 let src_pred_opt = m
1273 .where_clause
1274 .as_ref()
1275 .and_then(|wexpr| try_compile_predicate(wexpr, src_var, &col_ids_src));
1276 let mid_pred_opt = m
1277 .where_clause
1278 .as_ref()
1279 .and_then(|wexpr| try_compile_predicate(wexpr, mid_var, &col_ids_mid));
1280 let dst_pred_opt = m
1281 .where_clause
1282 .as_ref()
1283 .and_then(|wexpr| try_compile_predicate(wexpr, dst_var, &col_ids_dst));
1284
1285 let store_arc = Arc::new(sparrowdb_storage::node_store::NodeStore::open(
1286 self.snapshot.store.root_path(),
1287 )?);
1288
1289 let limit = m.limit.map(|l| l as usize);
1290 let memory_limit = self.memory_limit_bytes;
1291 let mut rows: Vec<Vec<Value>> = Vec::new();
1292
1293 let mut frontier = BfsArena::new(avg_degree_hint * (crate::chunk::CHUNK_CAPACITY / 2));
1301
1302 let row_size_estimate = column_names.len().max(1) * 16;
1306
1307 let mut scan = ScanByLabel::new(hwm_src);
1308
1309 'outer: while let Some(scan_chunk) = scan.next_chunk()? {
1310 let src_chunk = if !col_ids_src.is_empty() {
1312 let mut rnp = ReadNodeProps::new(
1313 SingleChunkSource::new(scan_chunk),
1314 Arc::clone(&store_arc),
1315 src_label_id,
1316 crate::chunk::COL_ID_SLOT,
1317 col_ids_src.clone(),
1318 );
1319 match rnp.next_chunk()? {
1320 Some(c) => c,
1321 None => continue,
1322 }
1323 } else {
1324 scan_chunk
1325 };
1326
1327 let src_chunk = if let Some(ref pred) = src_pred_opt {
1329 let pred = pred.clone();
1330 let keep: Vec<bool> = (0..src_chunk.len())
1331 .map(|i| pred.eval(&src_chunk, i))
1332 .collect();
1333 let mut c = src_chunk;
1334 c.filter_sel(|i| keep[i]);
1335 if c.live_len() == 0 {
1336 continue;
1337 }
1338 c
1339 } else {
1340 src_chunk
1341 };
1342
1343 let mut gn1 = GetNeighbors::new(
1345 SingleChunkSource::new(src_chunk.clone()),
1346 csr.clone(),
1347 &delta_records,
1348 src_label_id,
1349 avg_degree_hint,
1350 );
1351
1352 while let Some(hop1_chunk) = gn1.next_chunk()? {
1354 frontier.clear();
1359
1360 let accum_bytes = rows.len() * row_size_estimate + frontier.bytes_used();
1364 if accum_bytes > memory_limit {
1365 return Err(DbError::QueryMemoryExceeded);
1366 }
1367
1368 let mid_chunk = if !col_ids_mid.is_empty() {
1370 let mut rnp = ReadNodeProps::new(
1371 SingleChunkSource::new(hop1_chunk),
1372 Arc::clone(&store_arc),
1373 mid_label_id,
1374 COL_ID_DST_SLOT,
1375 col_ids_mid.clone(),
1376 );
1377 match rnp.next_chunk()? {
1378 Some(c) => c,
1379 None => continue,
1380 }
1381 } else {
1382 hop1_chunk
1383 };
1384
1385 let mid_chunk = if let Some(ref pred) = mid_pred_opt {
1387 let pred = pred.clone();
1388 let keep: Vec<bool> = (0..mid_chunk.len())
1389 .map(|i| pred.eval(&mid_chunk, i))
1390 .collect();
1391 let mut c = mid_chunk;
1392 c.filter_sel(|i| keep[i]);
1393 if c.live_len() == 0 {
1394 continue;
1395 }
1396 c
1397 } else {
1398 mid_chunk
1399 };
1400
1401 let mid_slot_col = mid_chunk.find_column(COL_ID_DST_SLOT);
1403 let hop1_src_col = mid_chunk.find_column(COL_ID_SRC_SLOT);
1404
1405 let live_pairs: Vec<(u64, u64)> = mid_chunk
1407 .live_rows()
1408 .map(|row_idx| {
1409 let mid_slot = mid_slot_col.map(|c| c.data[row_idx]).unwrap_or(0);
1410 let src_slot = hop1_src_col.map(|c| c.data[row_idx]).unwrap_or(0);
1411 (src_slot, mid_slot)
1412 })
1413 .collect();
1414
1415 for &(_, mid_slot) in &live_pairs {
1426 if frontier.visit(mid_slot) {
1427 frontier.current_mut().push(mid_slot);
1428 }
1429 }
1430
1431 let mid_slots_chunk = {
1434 let data: Vec<u64> = frontier.current().to_vec();
1435 let col =
1436 crate::chunk::ColumnVector::from_data(crate::chunk::COL_ID_SLOT, data);
1437 DataChunk::from_columns(vec![col])
1438 };
1439
1440 let mut gn2 = GetNeighbors::new(
1441 SingleChunkSource::new(mid_slots_chunk),
1442 csr.clone(),
1443 &delta_records,
1444 mid_label_id,
1445 avg_degree_hint,
1446 );
1447
1448 while let Some(hop2_chunk) = gn2.next_chunk()? {
1449 let dst_chunk = if !col_ids_dst.is_empty() {
1453 let mut rnp = ReadNodeProps::new(
1454 SingleChunkSource::new(hop2_chunk),
1455 Arc::clone(&store_arc),
1456 dst_label_id,
1457 COL_ID_DST_SLOT,
1458 col_ids_dst.clone(),
1459 );
1460 match rnp.next_chunk()? {
1461 Some(c) => c,
1462 None => continue,
1463 }
1464 } else {
1465 hop2_chunk
1466 };
1467
1468 let dst_chunk = if let Some(ref pred) = dst_pred_opt {
1470 let pred = pred.clone();
1471 let keep: Vec<bool> = (0..dst_chunk.len())
1472 .map(|i| pred.eval(&dst_chunk, i))
1473 .collect();
1474 let mut c = dst_chunk;
1475 c.filter_sel(|i| keep[i]);
1476 if c.live_len() == 0 {
1477 continue;
1478 }
1479 c
1480 } else {
1481 dst_chunk
1482 };
1483
1484 let hop2_src_col = dst_chunk.find_column(COL_ID_SRC_SLOT); let dst_slot_col = dst_chunk.find_column(COL_ID_DST_SLOT);
1490
1491 let src_slot_col_in_scan = src_chunk.find_column(crate::chunk::COL_ID_SLOT);
1492
1493 let src_index: std::collections::HashMap<u64, usize> = src_slot_col_in_scan
1496 .map(|sc| (0..sc.data.len()).map(|i| (sc.data[i], i)).collect())
1497 .unwrap_or_default();
1498
1499 let mid_index: std::collections::HashMap<u64, usize> = {
1500 let mid_slot_col_in_mid = mid_chunk.find_column(COL_ID_DST_SLOT);
1501 mid_slot_col_in_mid
1502 .map(|mc| (0..mc.data.len()).map(|i| (mc.data[i], i)).collect())
1503 .unwrap_or_default()
1504 };
1505
1506 for row_idx in dst_chunk.live_rows() {
1507 let dst_slot = dst_slot_col.map(|c| c.data[row_idx]).unwrap_or(0);
1508 let via_mid_slot = hop2_src_col.map(|c| c.data[row_idx]).unwrap_or(0);
1509
1510 for &(src_slot, mid_slot) in &live_pairs {
1512 if mid_slot != via_mid_slot {
1513 continue;
1514 }
1515
1516 let src_node = NodeId(((src_label_id as u64) << 32) | src_slot);
1519 let mid_node = NodeId(((mid_label_id as u64) << 32) | mid_slot);
1520 let dst_node = NodeId(((dst_label_id as u64) << 32) | dst_slot);
1521
1522 if self.is_node_tombstoned(src_node)
1524 || self.is_node_tombstoned(mid_node)
1525 || self.is_node_tombstoned(dst_node)
1526 {
1527 continue;
1528 }
1529
1530 let src_props = if src_slot_col_in_scan.is_some() {
1532 if let Some(&src_ri) = src_index.get(&src_slot) {
1533 build_props_from_chunk(&src_chunk, src_ri, &col_ids_src)
1534 } else {
1535 let nullable = self
1536 .snapshot
1537 .store
1538 .get_node_raw_nullable(src_node, &col_ids_src)?;
1539 nullable
1540 .into_iter()
1541 .filter_map(|(cid, opt)| opt.map(|v| (cid, v)))
1542 .collect()
1543 }
1544 } else {
1545 vec![]
1546 };
1547
1548 let mid_props: Vec<(u32, u64)> = if !col_ids_mid.is_empty() {
1550 if let Some(&mid_ri) = mid_index.get(&mid_slot) {
1551 build_props_from_chunk(&mid_chunk, mid_ri, &col_ids_mid)
1552 } else {
1553 let nullable = self
1554 .snapshot
1555 .store
1556 .get_node_raw_nullable(mid_node, &col_ids_mid)?;
1557 nullable
1558 .into_iter()
1559 .filter_map(|(cid, opt)| opt.map(|v| (cid, v)))
1560 .collect()
1561 }
1562 } else {
1563 vec![]
1564 };
1565
1566 let dst_props =
1568 build_props_from_chunk(&dst_chunk, row_idx, &col_ids_dst);
1569
1570 if let Some(ref where_expr) = m.where_clause {
1572 let mut row_vals = build_row_vals(
1573 &src_props,
1574 src_var,
1575 &col_ids_src,
1576 &self.snapshot.store,
1577 );
1578 row_vals.extend(build_row_vals(
1579 &mid_props,
1580 mid_var,
1581 &col_ids_mid,
1582 &self.snapshot.store,
1583 ));
1584 row_vals.extend(build_row_vals(
1585 &dst_props,
1586 dst_var,
1587 &col_ids_dst,
1588 &self.snapshot.store,
1589 ));
1590 row_vals.extend(self.dollar_params());
1591 if !self.eval_where_graph(where_expr, &row_vals) {
1592 continue;
1593 }
1594 }
1595
1596 let row = project_three_var_row(
1598 &src_props,
1599 &mid_props,
1600 &dst_props,
1601 column_names,
1602 src_var,
1603 mid_var,
1604 &self.snapshot.store,
1605 );
1606 rows.push(row);
1607
1608 if rows.len() * row_size_estimate > memory_limit {
1610 return Err(DbError::QueryMemoryExceeded);
1611 }
1612
1613 if let Some(lim) = limit {
1615 if rows.len() >= lim {
1616 break 'outer;
1617 }
1618 }
1619 }
1620 }
1621 }
1622 }
1623 }
1624
1625 Ok(QueryResult {
1626 columns: column_names.to_vec(),
1627 rows,
1628 })
1629 }
1630
1631 pub(crate) fn execute_scan_chunked(
1641 &self,
1642 m: &MatchStatement,
1643 column_names: &[String],
1644 ) -> Result<QueryResult> {
1645 use crate::pipeline::PipelineOperator;
1646
1647 let pat = &m.pattern[0];
1648 let node = &pat.nodes[0];
1649 let label = node.labels.first().cloned().unwrap_or_default();
1650
1651 let label_id = match self.snapshot.catalog.get_label(&label)? {
1653 Some(id) => id as u32,
1654 None => {
1655 return Ok(QueryResult {
1656 columns: column_names.to_vec(),
1657 rows: vec![],
1658 });
1659 }
1660 };
1661
1662 let hwm = self.snapshot.store.hwm_for_label(label_id)?;
1663 tracing::debug!(label = %label, hwm = hwm, "chunked pipeline: label scan");
1664
1665 let mut all_col_ids: Vec<u32> = collect_col_ids_from_columns(column_names);
1667 if let Some(ref wexpr) = m.where_clause {
1668 collect_col_ids_from_expr(wexpr, &mut all_col_ids);
1669 }
1670 for p in &node.props {
1671 let cid = col_id_of(&p.key);
1672 if !all_col_ids.contains(&cid) {
1673 all_col_ids.push(cid);
1674 }
1675 }
1676
1677 let var_name = node.var.as_str();
1678 let mut rows: Vec<Vec<Value>> = Vec::new();
1679
1680 let mut scan = ScanByLabel::new(hwm);
1687
1688 while let Some(chunk) = scan.next_chunk()? {
1689 for row_idx in chunk.live_rows() {
1691 let slot = chunk.column(0).data[row_idx];
1692 let node_id = NodeId(((label_id as u64) << 32) | slot);
1693
1694 if self.is_node_tombstoned(node_id) {
1696 continue;
1697 }
1698
1699 let nullable_props = self
1701 .snapshot
1702 .store
1703 .get_node_raw_nullable(node_id, &all_col_ids)?;
1704 let props: Vec<(u32, u64)> = nullable_props
1705 .iter()
1706 .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
1707 .collect();
1708
1709 if !self.matches_prop_filter(&props, &node.props) {
1711 continue;
1712 }
1713
1714 if let Some(ref where_expr) = m.where_clause {
1716 let mut row_vals =
1717 build_row_vals(&props, var_name, &all_col_ids, &self.snapshot.store);
1718 if !var_name.is_empty() && !label.is_empty() {
1719 row_vals.insert(
1720 format!("{}.__labels__", var_name),
1721 Value::List(vec![Value::String(label.clone())]),
1722 );
1723 }
1724 if !var_name.is_empty() {
1725 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
1726 }
1727 row_vals.extend(self.dollar_params());
1728 if !self.eval_where_graph(where_expr, &row_vals) {
1729 continue;
1730 }
1731 }
1732
1733 let row = project_row(
1735 &props,
1736 column_names,
1737 &all_col_ids,
1738 var_name,
1739 &label,
1740 &self.snapshot.store,
1741 );
1742 rows.push(row);
1743 }
1744 }
1745
1746 Ok(QueryResult {
1747 columns: column_names.to_vec(),
1748 rows,
1749 })
1750 }
1751}
1752
1753struct SingleChunkSource {
1760 chunk: Option<DataChunk>,
1761}
1762
1763impl SingleChunkSource {
1764 fn new(chunk: DataChunk) -> Self {
1765 SingleChunkSource { chunk: Some(chunk) }
1766 }
1767}
1768
1769impl PipelineOperator for SingleChunkSource {
1770 fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
1771 Ok(self.chunk.take())
1772 }
1773}
1774
1775fn column_name_for_item(item: &ReturnItem) -> String {
1779 if let Some(ref alias) = item.alias {
1780 return alias.clone();
1781 }
1782 match &item.expr {
1784 Expr::PropAccess { var, prop } => format!("{}.{}", var, prop),
1785 Expr::Var(v) => v.clone(),
1786 _ => String::new(),
1787 }
1788}
1789
1790fn expr_references_var(expr: &Expr, var_name: &str) -> bool {
1804 match expr {
1805 Expr::PropAccess { var, .. } => var.as_str() == var_name,
1806 Expr::BinOp { left, right, .. } => {
1807 expr_references_var(left, var_name) || expr_references_var(right, var_name)
1808 }
1809 Expr::And(a, b) | Expr::Or(a, b) => {
1810 expr_references_var(a, var_name) || expr_references_var(b, var_name)
1811 }
1812 Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
1813 expr_references_var(inner, var_name)
1814 }
1815 _ => false,
1816 }
1817}
1818
1819fn is_simple_where_for_chunked(expr: &Expr) -> bool {
1820 match expr {
1821 Expr::BinOp { left, op, right } => {
1822 match op {
1823 BinOpKind::Contains | BinOpKind::StartsWith | BinOpKind::EndsWith => false,
1825 _ => is_simple_where_for_chunked(left) && is_simple_where_for_chunked(right),
1826 }
1827 }
1828 Expr::And(a, b) | Expr::Or(a, b) => {
1829 is_simple_where_for_chunked(a) && is_simple_where_for_chunked(b)
1830 }
1831 Expr::Not(inner) => is_simple_where_for_chunked(inner),
1832 Expr::IsNull(_) | Expr::IsNotNull(_) => true,
1833 Expr::PropAccess { .. } | Expr::Var(_) | Expr::Literal(_) => true,
1834 Expr::ExistsSubquery(_) | Expr::NotExists(_) | Expr::FnCall { .. } => false,
1836 _ => true,
1837 }
1838}
1839
1840fn try_compile_predicate(expr: &Expr, var_name: &str, _col_ids: &[u32]) -> Option<ChunkPredicate> {
1846 match expr {
1847 Expr::BinOp { left, op, right } => {
1848 let (prop_expr, lit_expr, swapped) = if matches!(right.as_ref(), Expr::Literal(_)) {
1850 (left.as_ref(), right.as_ref(), false)
1851 } else if matches!(left.as_ref(), Expr::Literal(_)) {
1852 (right.as_ref(), left.as_ref(), true)
1853 } else {
1854 return None;
1855 };
1856
1857 let (v, key) = match prop_expr {
1858 Expr::PropAccess { var, prop } => (var.as_str(), prop.as_str()),
1859 _ => return None,
1860 };
1861 if v != var_name {
1862 return None;
1863 }
1864 let col_id = col_id_of(key);
1865
1866 let rhs_raw = match lit_expr {
1867 Expr::Literal(lit) => literal_to_raw_u64(lit)?,
1868 _ => return None,
1869 };
1870
1871 let effective_op = if swapped {
1873 match op {
1874 BinOpKind::Lt => BinOpKind::Gt,
1875 BinOpKind::Le => BinOpKind::Ge,
1876 BinOpKind::Gt => BinOpKind::Lt,
1877 BinOpKind::Ge => BinOpKind::Le,
1878 other => other.clone(),
1879 }
1880 } else {
1881 op.clone()
1882 };
1883
1884 match effective_op {
1885 BinOpKind::Eq => Some(ChunkPredicate::Eq { col_id, rhs_raw }),
1886 BinOpKind::Neq => Some(ChunkPredicate::Ne { col_id, rhs_raw }),
1887 BinOpKind::Gt => Some(ChunkPredicate::Gt { col_id, rhs_raw }),
1888 BinOpKind::Ge => Some(ChunkPredicate::Ge { col_id, rhs_raw }),
1889 BinOpKind::Lt => Some(ChunkPredicate::Lt { col_id, rhs_raw }),
1890 BinOpKind::Le => Some(ChunkPredicate::Le { col_id, rhs_raw }),
1891 _ => None,
1892 }
1893 }
1894 Expr::IsNull(inner) => {
1895 if let Expr::PropAccess { var, prop } = inner.as_ref() {
1896 if var.as_str() == var_name {
1897 return Some(ChunkPredicate::IsNull {
1898 col_id: col_id_of(prop),
1899 });
1900 }
1901 }
1902 None
1903 }
1904 Expr::IsNotNull(inner) => {
1905 if let Expr::PropAccess { var, prop } = inner.as_ref() {
1906 if var.as_str() == var_name {
1907 return Some(ChunkPredicate::IsNotNull {
1908 col_id: col_id_of(prop),
1909 });
1910 }
1911 }
1912 None
1913 }
1914 Expr::And(a, b) => {
1915 let ca = try_compile_predicate(a, var_name, _col_ids);
1916 let cb = try_compile_predicate(b, var_name, _col_ids);
1917 match (ca, cb) {
1918 (Some(pa), Some(pb)) => Some(ChunkPredicate::And(vec![pa, pb])),
1919 _ => None,
1920 }
1921 }
1922 _ => None,
1923 }
1924}
1925
1926fn literal_to_raw_u64(lit: &Literal) -> Option<u64> {
1931 use sparrowdb_storage::node_store::Value as StoreValue;
1932 match lit {
1933 Literal::Int(n) => Some(StoreValue::Int64(*n).to_u64()),
1934 Literal::Bool(b) => Some(StoreValue::Int64(if *b { 1 } else { 0 }).to_u64()),
1935 Literal::String(_) | Literal::Float(_) | Literal::Null | Literal::Param(_) => None,
1937 }
1938}
1939
1940fn build_props_from_chunk(chunk: &DataChunk, row_idx: usize, col_ids: &[u32]) -> Vec<(u32, u64)> {
1945 col_ids
1946 .iter()
1947 .filter_map(|&cid| {
1948 let col = chunk.find_column(cid)?;
1949 if col.nulls.is_null(row_idx) {
1950 None
1951 } else {
1952 Some((cid, col.data[row_idx]))
1953 }
1954 })
1955 .collect()
1956}
1957
1958struct DstSlotProjector<C: PipelineOperator> {
1967 child: C,
1968}
1969
1970impl<C: PipelineOperator> DstSlotProjector<C> {
1971 fn new(child: C) -> Self {
1972 DstSlotProjector { child }
1973 }
1974}
1975
1976impl<C: PipelineOperator> PipelineOperator for DstSlotProjector<C> {
1977 fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
1978 use crate::chunk::ColumnVector;
1979
1980 loop {
1981 let chunk = match self.child.next_chunk()? {
1982 Some(c) => c,
1983 None => return Ok(None),
1984 };
1985
1986 if chunk.is_empty() {
1987 continue;
1988 }
1989
1990 let dst_col = match chunk.find_column(COL_ID_DST_SLOT) {
1992 Some(c) => c,
1993 None => continue,
1994 };
1995
1996 let data: Vec<u64> = chunk.live_rows().map(|i| dst_col.data[i]).collect();
1997 if data.is_empty() {
1998 continue;
1999 }
2000 let col = ColumnVector::from_data(crate::chunk::COL_ID_SLOT, data);
2001 return Ok(Some(DataChunk::from_columns(vec![col])));
2002 }
2003 }
2004}
2005
2006fn is_id_call(expr: &Expr, var_name: &str) -> bool {
2010 match expr {
2011 Expr::FnCall { name, args } => {
2012 name.eq_ignore_ascii_case("id")
2013 && args.len() == 1
2014 && matches!(&args[0], Expr::Var(v) if v.as_str() == var_name)
2015 }
2016 _ => false,
2017 }
2018}
2019
2020fn is_param_literal(expr: &Expr) -> bool {
2022 matches!(expr, Expr::Literal(Literal::Param(_)))
2023}
2024
2025fn where_is_only_id_param_conjuncts(expr: &Expr, a_var: &str, b_var: &str) -> bool {
2032 match expr {
2033 Expr::And(left, right) => {
2034 where_is_only_id_param_conjuncts(left, a_var, b_var)
2035 && where_is_only_id_param_conjuncts(right, a_var, b_var)
2036 }
2037 Expr::BinOp {
2038 left,
2039 op: BinOpKind::Eq,
2040 right,
2041 } => {
2042 (is_id_call(left, a_var) || is_id_call(left, b_var)) && is_param_literal(right)
2044 || is_param_literal(left) && (is_id_call(right, a_var) || is_id_call(right, b_var))
2045 }
2046 _ => false,
2047 }
2048}
2049
2050fn extract_id_param_slot(
2057 where_clause: Option<&Expr>,
2058 var_name: &str,
2059 params: &std::collections::HashMap<String, crate::types::Value>,
2060 expected_label_id: u32,
2061) -> Option<u64> {
2062 let wexpr = where_clause?;
2063 let param_name = find_id_param_name(wexpr, var_name)?;
2064 let val = params.get(¶m_name)?;
2065
2066 let raw_node_id: u64 = match val {
2068 crate::types::Value::Int64(n) => *n as u64,
2069 crate::types::Value::NodeRef(nid) => nid.0,
2070 _ => return None,
2071 };
2072
2073 let (label_id, slot) = super::node_id_parts(raw_node_id);
2074 if label_id != expected_label_id {
2075 return None;
2076 }
2077 Some(slot)
2078}
2079
2080fn find_id_param_name(expr: &Expr, var_name: &str) -> Option<String> {
2082 match expr {
2083 Expr::BinOp { left, op, right } => {
2084 if *op == BinOpKind::Eq {
2085 if is_id_call(left, var_name) {
2086 if let Expr::Literal(Literal::Param(p)) = right.as_ref() {
2087 return Some(p.clone());
2088 }
2089 }
2090 if is_id_call(right, var_name) {
2091 if let Expr::Literal(Literal::Param(p)) = left.as_ref() {
2092 return Some(p.clone());
2093 }
2094 }
2095 }
2096 find_id_param_name(left, var_name).or_else(|| find_id_param_name(right, var_name))
2097 }
2098 Expr::And(a, b) => {
2099 find_id_param_name(a, var_name).or_else(|| find_id_param_name(b, var_name))
2100 }
2101 _ => None,
2102 }
2103}