use std::sync::Arc;
use sparrowdb_common::NodeId;
use sparrowdb_storage::edge_store::EdgeStore;
use super::*;
use crate::chunk::{DataChunk, COL_ID_DST_SLOT, COL_ID_SLOT, COL_ID_SRC_SLOT};
use crate::pipeline::{
BfsArena, ChunkPredicate, GetNeighbors, PipelineOperator, ReadNodeProps, ScanByLabel,
SlotIntersect,
};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ChunkedPlan {
Scan,
OneHop,
TwoHop,
MutualNeighbors,
}
impl std::fmt::Display for ChunkedPlan {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ChunkedPlan::Scan => write!(f, "Scan"),
ChunkedPlan::OneHop => write!(f, "OneHop"),
ChunkedPlan::TwoHop => write!(f, "TwoHop"),
ChunkedPlan::MutualNeighbors => write!(f, "MutualNeighbors"),
}
}
}
impl Engine {
pub(crate) fn can_use_chunked_pipeline(&self, m: &MatchStatement) -> bool {
if !self.use_chunked_pipeline {
return false;
}
if m.pattern.len() != 1 || !m.pattern[0].rels.is_empty() {
return false;
}
if has_aggregate_in_return(&m.return_clause.items) {
return false;
}
if !m.order_by.is_empty() || m.skip.is_some() || m.limit.is_some() {
return false;
}
if m.distinct {
return false;
}
if !m.pattern[0].nodes[0].props.is_empty() {
return false;
}
if m.return_clause
.items
.iter()
.any(|item| matches!(&item.expr, Expr::Var(_)))
{
return false;
}
!m.pattern[0].nodes[0].labels.is_empty()
}
pub(crate) fn can_use_one_hop_chunked(&self, m: &MatchStatement) -> bool {
use sparrowdb_cypher::ast::EdgeDir;
if !self.use_chunked_pipeline {
return false;
}
if m.pattern.len() != 1 {
return false;
}
let pat = &m.pattern[0];
if pat.rels.len() != 1 || pat.nodes.len() != 2 {
return false;
}
if pat.nodes[0].labels.len() != 1 || pat.nodes[1].labels.len() != 1 {
return false;
}
let dir = &pat.rels[0].dir;
if *dir != EdgeDir::Outgoing && *dir != EdgeDir::Incoming {
return false;
}
if has_aggregate_in_return(&m.return_clause.items) {
return false;
}
if m.distinct {
return false;
}
if !m.order_by.is_empty() {
return false;
}
if pat.rels[0].min_hops.is_some() {
return false;
}
let rel_var = &pat.rels[0].var;
if !rel_var.is_empty() {
let ref_in_return = m.return_clause.items.iter().any(|item| {
column_name_for_item(item)
.split_once('.')
.is_some_and(|(v, _)| v == rel_var.as_str())
});
if ref_in_return {
return false;
}
if let Some(ref wexpr) = m.where_clause {
if expr_references_var(wexpr, rel_var.as_str()) {
return false;
}
}
}
if let Some(ref wexpr) = m.where_clause {
if !is_simple_where_for_chunked(wexpr) {
return false;
}
}
if pat.nodes.iter().any(|n| !n.props.is_empty()) {
return false;
}
let src_label = pat.nodes[0].labels.first().cloned().unwrap_or_default();
let dst_label = pat.nodes[1].labels.first().cloned().unwrap_or_default();
let rel_type = pat.rels[0].rel_type.clone();
let n_tables = self
.snapshot
.catalog
.list_rel_tables_with_ids()
.into_iter()
.filter(|(_, sid, did, rt)| {
let type_ok = rel_type.is_empty() || rt == &rel_type;
let src_ok = self
.snapshot
.catalog
.get_label(&src_label)
.ok()
.flatten()
.map(|id| id as u32 == *sid as u32)
.unwrap_or(false);
let dst_ok = self
.snapshot
.catalog
.get_label(&dst_label)
.ok()
.flatten()
.map(|id| id as u32 == *did as u32)
.unwrap_or(false);
type_ok && src_ok && dst_ok
})
.count();
n_tables == 1
}
pub(crate) fn execute_one_hop_chunked(
&self,
m: &MatchStatement,
column_names: &[String],
) -> Result<QueryResult> {
use sparrowdb_cypher::ast::EdgeDir;
let pat = &m.pattern[0];
let rel_pat = &pat.rels[0];
let dir = &rel_pat.dir;
let (src_node_pat, dst_node_pat, swapped) = if *dir == EdgeDir::Incoming {
(&pat.nodes[1], &pat.nodes[0], true)
} else {
(&pat.nodes[0], &pat.nodes[1], false)
};
let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
let rel_type = rel_pat.rel_type.clone();
let src_label_id = match self.snapshot.catalog.get_label(&src_label)? {
Some(id) => id as u32,
None => {
return Ok(QueryResult {
columns: column_names.to_vec(),
rows: vec![],
});
}
};
let dst_label_id = match self.snapshot.catalog.get_label(&dst_label)? {
Some(id) => id as u32,
None => {
return Ok(QueryResult {
columns: column_names.to_vec(),
rows: vec![],
});
}
};
let (catalog_rel_id, _) = self
.snapshot
.catalog
.list_rel_tables_with_ids()
.into_iter()
.find(|(_, sid, did, rt)| {
let type_ok = rel_type.is_empty() || rt == &rel_type;
let src_ok = *sid as u32 == src_label_id;
let dst_ok = *did as u32 == dst_label_id;
type_ok && src_ok && dst_ok
})
.map(|(cid, sid, did, rt)| (cid as u32, (sid, did, rt)))
.ok_or_else(|| {
sparrowdb_common::Error::InvalidArgument(
"no matching relationship table found".into(),
)
})?;
let hwm_src = self.snapshot.store.hwm_for_label(src_label_id).unwrap_or(0);
tracing::debug!(
engine = "chunked",
src_label = %src_label,
dst_label = %dst_label,
rel_type = %rel_type,
hwm_src,
"executing via chunked pipeline (1-hop)"
);
let src_var = src_node_pat.var.as_str();
let dst_var = dst_node_pat.var.as_str();
let (query_src_var, query_dst_var) = if swapped {
(dst_node_pat.var.as_str(), src_node_pat.var.as_str())
} else {
(src_var, dst_var)
};
let mut col_ids_src = collect_col_ids_for_var(query_src_var, column_names, src_label_id);
let mut col_ids_dst = collect_col_ids_for_var(query_dst_var, column_names, dst_label_id);
if let Some(ref wexpr) = m.where_clause {
collect_col_ids_from_expr_for_var(wexpr, query_src_var, &mut col_ids_src);
collect_col_ids_from_expr_for_var(wexpr, query_dst_var, &mut col_ids_dst);
}
for p in &src_node_pat.props {
let cid = col_id_of(&p.key);
if !col_ids_src.contains(&cid) {
col_ids_src.push(cid);
}
}
for p in &dst_node_pat.props {
let cid = col_id_of(&p.key);
if !col_ids_dst.contains(&cid) {
col_ids_dst.push(cid);
}
}
let delta_records = {
let edge_store = EdgeStore::open(
&self.snapshot.db_root,
sparrowdb_storage::edge_store::RelTableId(catalog_rel_id),
);
edge_store.and_then(|s| s.read_delta()).unwrap_or_default()
};
let csr = self
.snapshot
.csrs
.get(&catalog_rel_id)
.cloned()
.unwrap_or_else(|| sparrowdb_storage::csr::CsrForward::build(0, &[]));
let avg_degree_hint = self
.snapshot
.rel_degree_stats()
.get(&catalog_rel_id)
.map(|s| s.mean().ceil() as usize)
.unwrap_or(8);
let src_pred_opt = m
.where_clause
.as_ref()
.and_then(|wexpr| try_compile_predicate(wexpr, query_src_var, &col_ids_src));
let dst_pred_opt = m
.where_clause
.as_ref()
.and_then(|wexpr| try_compile_predicate(wexpr, query_dst_var, &col_ids_dst));
let store_arc = Arc::new(NodeStore::open(self.snapshot.store.root_path())?);
let limit = m.limit.map(|l| l as usize);
let mut rows: Vec<Vec<Value>> = Vec::new();
let mut scan = ScanByLabel::new(hwm_src);
'outer: while let Some(scan_chunk) = scan.next_chunk()? {
let src_chunk = if !col_ids_src.is_empty() {
let mut rnp = ReadNodeProps::new(
SingleChunkSource::new(scan_chunk),
Arc::clone(&store_arc),
src_label_id,
crate::chunk::COL_ID_SLOT,
col_ids_src.clone(),
);
match rnp.next_chunk()? {
Some(c) => c,
None => continue,
}
} else {
scan_chunk
};
let src_chunk = if let Some(ref pred) = src_pred_opt {
let pred = pred.clone();
let keep: Vec<bool> = {
(0..src_chunk.len())
.map(|i| pred.eval(&src_chunk, i))
.collect()
};
let mut c = src_chunk;
c.filter_sel(|i| keep[i]);
if c.live_len() == 0 {
continue;
}
c
} else {
src_chunk
};
let mut gn = GetNeighbors::new(
SingleChunkSource::new(src_chunk.clone()),
csr.clone(),
&delta_records,
src_label_id,
avg_degree_hint,
);
while let Some(hop_chunk) = gn.next_chunk()? {
let dst_chunk = if !col_ids_dst.is_empty() {
let mut rnp = ReadNodeProps::new(
SingleChunkSource::new(hop_chunk),
Arc::clone(&store_arc),
dst_label_id,
COL_ID_DST_SLOT,
col_ids_dst.clone(),
);
match rnp.next_chunk()? {
Some(c) => c,
None => continue,
}
} else {
hop_chunk
};
let dst_chunk = if let Some(ref pred) = dst_pred_opt {
let pred = pred.clone();
let keep: Vec<bool> = (0..dst_chunk.len())
.map(|i| pred.eval(&dst_chunk, i))
.collect();
let mut c = dst_chunk;
c.filter_sel(|i| keep[i]);
if c.live_len() == 0 {
continue;
}
c
} else {
dst_chunk
};
let src_slot_col = src_chunk.find_column(crate::chunk::COL_ID_SLOT);
let dst_slot_col = dst_chunk.find_column(COL_ID_DST_SLOT);
let hop_src_col = dst_chunk.find_column(COL_ID_SRC_SLOT);
for row_idx in dst_chunk.live_rows() {
let dst_slot = dst_slot_col.map(|c| c.data[row_idx]).unwrap_or(0);
let hop_src_slot = hop_src_col.map(|c| c.data[row_idx]).unwrap_or(0);
let src_node = NodeId(((src_label_id as u64) << 32) | hop_src_slot);
let dst_node = NodeId(((dst_label_id as u64) << 32) | dst_slot);
if self.is_node_tombstoned(src_node) || self.is_node_tombstoned(dst_node) {
continue;
}
let src_props = if let Some(sc) = src_slot_col {
let src_row = (0..sc.data.len()).find(|&i| sc.data[i] == hop_src_slot);
if let Some(src_ri) = src_row {
build_props_from_chunk(&src_chunk, src_ri, &col_ids_src)
} else {
let nullable = self
.snapshot
.store
.get_node_raw_nullable(src_node, &col_ids_src)?;
nullable
.into_iter()
.filter_map(|(cid, opt)| opt.map(|v| (cid, v)))
.collect()
}
} else {
vec![]
};
let dst_props = build_props_from_chunk(&dst_chunk, row_idx, &col_ids_dst);
if let Some(ref where_expr) = m.where_clause {
let (actual_src_var, actual_dst_var) = if swapped {
(dst_node_pat.var.as_str(), src_node_pat.var.as_str())
} else {
(src_node_pat.var.as_str(), dst_node_pat.var.as_str())
};
let (actual_src_props, actual_dst_props) = if swapped {
(&dst_props, &src_props)
} else {
(&src_props, &dst_props)
};
let mut row_vals = build_row_vals(
actual_src_props,
actual_src_var,
&col_ids_src,
&self.snapshot.store,
);
row_vals.extend(build_row_vals(
actual_dst_props,
actual_dst_var,
&col_ids_dst,
&self.snapshot.store,
));
row_vals.extend(self.dollar_params());
if !self.eval_where_graph(where_expr, &row_vals) {
continue;
}
}
let (proj_src_props, proj_dst_props) = if swapped {
(&dst_props as &[(u32, u64)], &src_props as &[(u32, u64)])
} else {
(&src_props as &[(u32, u64)], &dst_props as &[(u32, u64)])
};
let (proj_src_var, proj_dst_var, proj_src_label, proj_dst_label) = if swapped {
(
dst_node_pat.var.as_str(),
src_node_pat.var.as_str(),
dst_label.as_str(),
src_label.as_str(),
)
} else {
(
src_node_pat.var.as_str(),
dst_node_pat.var.as_str(),
src_label.as_str(),
dst_label.as_str(),
)
};
let row = project_hop_row(
proj_src_props,
proj_dst_props,
column_names,
proj_src_var,
proj_dst_var,
None, Some((proj_src_var, proj_src_label)),
Some((proj_dst_var, proj_dst_label)),
&self.snapshot.store,
None, );
rows.push(row);
if let Some(lim) = limit {
if rows.len() >= lim {
break 'outer;
}
}
}
}
}
Ok(QueryResult {
columns: column_names.to_vec(),
rows,
})
}
pub fn try_plan_chunked_match(&self, m: &MatchStatement) -> Option<ChunkedPlan> {
if self.can_use_mutual_neighbors_chunked(m) {
return Some(ChunkedPlan::MutualNeighbors);
}
if self.can_use_two_hop_chunked(m) {
return Some(ChunkedPlan::TwoHop);
}
if self.can_use_one_hop_chunked(m) {
return Some(ChunkedPlan::OneHop);
}
if self.can_use_chunked_pipeline(m) {
return Some(ChunkedPlan::Scan);
}
None
}
pub(crate) fn can_use_mutual_neighbors_chunked(&self, m: &MatchStatement) -> bool {
use sparrowdb_cypher::ast::EdgeDir;
if !self.use_chunked_pipeline {
return false;
}
if m.pattern.len() != 1 {
return false;
}
let pat = &m.pattern[0];
if pat.rels.len() != 2 || pat.nodes.len() != 3 {
return false;
}
if pat.rels[0].dir != EdgeDir::Outgoing || pat.rels[1].dir != EdgeDir::Incoming {
return false;
}
if pat.rels[0].min_hops.is_some() || pat.rels[1].min_hops.is_some() {
return false;
}
if pat.rels[0].rel_type != pat.rels[1].rel_type {
return false;
}
if pat.nodes[0].labels.len() != 1
|| pat.nodes[1].labels.len() != 1
|| pat.nodes[2].labels.len() != 1
{
return false;
}
if pat.nodes[0].labels[0] != pat.nodes[1].labels[0]
|| pat.nodes[1].labels[0] != pat.nodes[2].labels[0]
{
return false;
}
if has_aggregate_in_return(&m.return_clause.items) {
return false;
}
if m.distinct {
return false;
}
if !m.order_by.is_empty() {
return false;
}
for rel in &pat.rels {
if !rel.var.is_empty() {
let ref_in_return = m.return_clause.items.iter().any(|item| {
column_name_for_item(item)
.split_once('.')
.is_some_and(|(v, _)| v == rel.var.as_str())
});
if ref_in_return {
return false;
}
if let Some(ref wexpr) = m.where_clause {
if expr_references_var(wexpr, rel.var.as_str()) {
return false;
}
}
}
}
let a_var = pat.nodes[0].var.as_str();
let b_var = pat.nodes[2].var.as_str();
match m.where_clause.as_ref() {
None => {
let a_bound = pat.nodes[0].props.len() == 1;
let b_bound = pat.nodes[2].props.len() == 1;
if !a_bound || !b_bound {
return false;
}
}
Some(wexpr) => {
if !where_is_only_id_param_conjuncts(wexpr, a_var, b_var) {
return false;
}
}
}
let label = pat.nodes[0].labels[0].clone();
let rel_type = &pat.rels[0].rel_type;
let catalog = &self.snapshot.catalog;
let tables = catalog.list_rel_tables_with_ids();
let label_id_opt = catalog.get_label(&label).ok().flatten();
let label_id = match label_id_opt {
Some(id) => id as u32,
None => return false,
};
let has_table = tables.iter().any(|(_, sid, did, rt)| {
let type_ok = rel_type.is_empty() || rt == rel_type;
let endpoint_ok = *sid as u32 == label_id && *did as u32 == label_id;
type_ok && endpoint_ok
});
has_table
}
pub(crate) fn execute_mutual_neighbors_chunked(
&self,
m: &MatchStatement,
column_names: &[String],
) -> Result<QueryResult> {
let pat = &m.pattern[0];
let a_node_pat = &pat.nodes[0];
let x_node_pat = &pat.nodes[1];
let b_node_pat = &pat.nodes[2];
let label = a_node_pat.labels[0].clone();
let rel_type = pat.rels[0].rel_type.clone();
let label_id = match self.snapshot.catalog.get_label(&label)? {
Some(id) => id as u32,
None => {
return Ok(QueryResult {
columns: column_names.to_vec(),
rows: vec![],
});
}
};
let catalog_rel_id = self
.snapshot
.catalog
.list_rel_tables_with_ids()
.into_iter()
.find(|(_, sid, did, rt)| {
let type_ok = rel_type.is_empty() || rt == &rel_type;
let endpoint_ok = *sid as u32 == label_id && *did as u32 == label_id;
type_ok && endpoint_ok
})
.map(|(cid, _, _, _)| cid as u32)
.ok_or_else(|| {
sparrowdb_common::Error::InvalidArgument(
"no matching relationship table for mutual-neighbors".into(),
)
})?;
let a_var = a_node_pat.var.as_str();
let b_var = b_node_pat.var.as_str();
let (a_slot_opt, b_slot_opt) = if m.where_clause.is_some() {
(
extract_id_param_slot(m.where_clause.as_ref(), a_var, &self.params, label_id),
extract_id_param_slot(m.where_clause.as_ref(), b_var, &self.params, label_id),
)
} else {
let hwm = self.snapshot.store.hwm_for_label(label_id).unwrap_or(0);
let dollar_params = self.dollar_params();
let prop_idx = self.prop_index.borrow();
(
find_slot_by_props(
&self.snapshot.store,
label_id,
hwm,
&a_node_pat.props,
&dollar_params,
&prop_idx,
),
find_slot_by_props(
&self.snapshot.store,
label_id,
hwm,
&b_node_pat.props,
&dollar_params,
&prop_idx,
),
)
};
let (a_slot, b_slot) = match (a_slot_opt, b_slot_opt) {
(Some(a), Some(b)) => (a, b),
_ => {
return Ok(QueryResult {
columns: column_names.to_vec(),
rows: vec![],
});
}
};
if a_slot == b_slot {
return Ok(QueryResult {
columns: column_names.to_vec(),
rows: vec![],
});
}
tracing::debug!(
engine = "chunked",
plan = %ChunkedPlan::MutualNeighbors,
label = %label,
rel_type = %rel_type,
a_slot,
b_slot,
"executing via chunked pipeline"
);
let csr = self
.snapshot
.csrs
.get(&catalog_rel_id)
.cloned()
.unwrap_or_else(|| sparrowdb_storage::csr::CsrForward::build(0, &[]));
let delta_records = {
let edge_store = sparrowdb_storage::edge_store::EdgeStore::open(
&self.snapshot.db_root,
sparrowdb_storage::edge_store::RelTableId(catalog_rel_id),
);
edge_store.and_then(|s| s.read_delta()).unwrap_or_default()
};
let a_scan = ScanByLabel::from_slots(vec![a_slot]);
let a_neighbors = GetNeighbors::new(a_scan, csr.clone(), &delta_records, label_id, 8);
let b_scan = ScanByLabel::from_slots(vec![b_slot]);
let b_neighbors = GetNeighbors::new(b_scan, csr, &delta_records, label_id, 8);
let a_proj = DstSlotProjector::new(a_neighbors);
let b_proj = DstSlotProjector::new(b_neighbors);
let spill_threshold = 64 * 1024; let mut intersect =
SlotIntersect::new(a_proj, b_proj, COL_ID_SLOT, COL_ID_SLOT, spill_threshold);
let mut common_slots: Vec<u64> = Vec::new();
while let Some(chunk) = intersect.next_chunk()? {
if let Some(col) = chunk.find_column(COL_ID_SLOT) {
for row_idx in chunk.live_rows() {
common_slots.push(col.data[row_idx]);
}
}
}
let x_var = x_node_pat.var.as_str();
let mut col_ids_x = collect_col_ids_for_var(x_var, column_names, label_id);
if let Some(ref wexpr) = m.where_clause {
collect_col_ids_from_expr_for_var(wexpr, x_var, &mut col_ids_x);
}
for p in &x_node_pat.props {
let cid = col_id_of(&p.key);
if !col_ids_x.contains(&cid) {
col_ids_x.push(cid);
}
}
let store_arc = Arc::new(sparrowdb_storage::node_store::NodeStore::open(
self.snapshot.store.root_path(),
)?);
let limit = m.limit.map(|l| l as usize);
let mut rows: Vec<Vec<Value>> = Vec::new();
'outer: for x_slot in common_slots {
let x_node_id = NodeId(((label_id as u64) << 32) | x_slot);
if self.is_node_tombstoned(x_node_id) {
continue;
}
let x_props: Vec<(u32, u64)> = if !col_ids_x.is_empty() {
let nullable = store_arc.batch_read_node_props_nullable(
label_id,
&[x_slot as u32],
&col_ids_x,
)?;
if nullable.is_empty() {
vec![]
} else {
col_ids_x
.iter()
.enumerate()
.filter_map(|(i, &cid)| nullable[0][i].map(|v| (cid, v)))
.collect()
}
} else {
vec![]
};
if let Some(ref where_expr) = m.where_clause {
let mut row_vals =
build_row_vals(&x_props, x_var, &col_ids_x, &self.snapshot.store);
if !a_var.is_empty() {
let a_node_id = NodeId(((label_id as u64) << 32) | a_slot);
row_vals.insert(a_var.to_string(), Value::NodeRef(a_node_id));
}
if !b_var.is_empty() {
let b_node_id = NodeId(((label_id as u64) << 32) | b_slot);
row_vals.insert(b_var.to_string(), Value::NodeRef(b_node_id));
}
row_vals.extend(self.dollar_params());
if !self.eval_where_graph(where_expr, &row_vals) {
continue;
}
}
let row = project_row(
&x_props,
column_names,
&col_ids_x,
x_var,
&label,
&self.snapshot.store,
Some(x_node_id),
);
rows.push(row);
if let Some(lim) = limit {
if rows.len() >= lim {
break 'outer;
}
}
}
Ok(QueryResult {
columns: column_names.to_vec(),
rows,
})
}
pub(crate) fn can_use_two_hop_chunked(&self, m: &MatchStatement) -> bool {
use sparrowdb_cypher::ast::EdgeDir;
if !self.use_chunked_pipeline {
return false;
}
if m.pattern.len() != 1 {
return false;
}
let pat = &m.pattern[0];
if pat.rels.len() != 2 || pat.nodes.len() != 3 {
return false;
}
if pat.rels[0].dir != EdgeDir::Outgoing || pat.rels[1].dir != EdgeDir::Outgoing {
return false;
}
if pat.rels[0].min_hops.is_some() || pat.rels[1].min_hops.is_some() {
return false;
}
if has_aggregate_in_return(&m.return_clause.items) {
return false;
}
if m.distinct {
return false;
}
if !m.order_by.is_empty() {
return false;
}
for rel in &pat.rels {
if !rel.var.is_empty() {
let ref_in_return = m.return_clause.items.iter().any(|item| {
column_name_for_item(item)
.split_once('.')
.is_some_and(|(v, _)| v == rel.var.as_str())
});
if ref_in_return {
return false;
}
if let Some(ref wexpr) = m.where_clause {
if expr_references_var(wexpr, rel.var.as_str()) {
return false;
}
}
}
}
if let Some(ref wexpr) = m.where_clause {
if !is_simple_where_for_chunked(wexpr) {
return false;
}
}
if pat.nodes.iter().any(|n| !n.props.is_empty()) {
return false;
}
let src_label = pat.nodes[0].labels.first().cloned().unwrap_or_default();
let mid_label = pat.nodes[1].labels.first().cloned().unwrap_or_default();
let dst_label = pat.nodes[2].labels.first().cloned().unwrap_or_default();
let rel_type1 = &pat.rels[0].rel_type;
let rel_type2 = &pat.rels[1].rel_type;
if rel_type1 != rel_type2 {
return false;
}
let catalog = &self.snapshot.catalog;
let tables = catalog.list_rel_tables_with_ids();
let hop1_matches: Vec<_> = tables
.iter()
.filter(|(_, sid, did, rt)| {
let type_ok = rel_type1.is_empty() || rt == rel_type1;
let src_ok = catalog
.get_label(&src_label)
.ok()
.flatten()
.map(|id| id as u32 == *sid as u32)
.unwrap_or(false);
let mid_ok = catalog
.get_label(&mid_label)
.ok()
.flatten()
.map(|id| id as u32 == *did as u32)
.unwrap_or(false);
type_ok && src_ok && mid_ok
})
.collect();
let n_tables = hop1_matches.len();
if n_tables != 1 {
return false;
}
let hop2_id = tables.iter().find(|(_, sid, did, rt)| {
let type_ok = rel_type2.is_empty() || rt == rel_type2;
let mid_ok = catalog
.get_label(&mid_label)
.ok()
.flatten()
.map(|id| id as u32 == *sid as u32)
.unwrap_or(false);
let dst_ok = catalog
.get_label(&dst_label)
.ok()
.flatten()
.map(|id| id as u32 == *did as u32)
.unwrap_or(false);
type_ok && mid_ok && dst_ok
});
match (hop1_matches.first(), hop2_id) {
(Some((id1, _, _, _)), Some((id2, _, _, _))) => id1 == id2,
_ => false,
}
}
pub(crate) fn execute_two_hop_chunked(
&self,
m: &MatchStatement,
column_names: &[String],
) -> Result<QueryResult> {
use sparrowdb_common::Error as DbError;
let pat = &m.pattern[0];
let src_node_pat = &pat.nodes[0];
let mid_node_pat = &pat.nodes[1];
let dst_node_pat = &pat.nodes[2];
let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
let mid_label = mid_node_pat.labels.first().cloned().unwrap_or_default();
let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
let rel_type = pat.rels[0].rel_type.clone();
let src_label_id = match self.snapshot.catalog.get_label(&src_label)? {
Some(id) => id as u32,
None => {
return Ok(QueryResult {
columns: column_names.to_vec(),
rows: vec![],
});
}
};
let mid_label_id = if mid_label.is_empty() {
src_label_id
} else {
match self.snapshot.catalog.get_label(&mid_label)? {
Some(id) => id as u32,
None => {
return Ok(QueryResult {
columns: column_names.to_vec(),
rows: vec![],
});
}
}
};
let dst_label_id = match self.snapshot.catalog.get_label(&dst_label)? {
Some(id) => id as u32,
None => {
return Ok(QueryResult {
columns: column_names.to_vec(),
rows: vec![],
});
}
};
let catalog_rel_id = self
.snapshot
.catalog
.list_rel_tables_with_ids()
.into_iter()
.find(|(_, sid, did, rt)| {
let type_ok = rel_type.is_empty() || rt == &rel_type;
let src_ok = *sid as u32 == src_label_id;
let mid_ok = *did as u32 == mid_label_id;
type_ok && src_ok && mid_ok
})
.map(|(cid, _, _, _)| cid as u32)
.ok_or_else(|| {
sparrowdb_common::Error::InvalidArgument(
"no matching relationship table found for 2-hop".into(),
)
})?;
let hwm_src = self.snapshot.store.hwm_for_label(src_label_id).unwrap_or(0);
let hwm_dst = self.snapshot.store.hwm_for_label(dst_label_id).unwrap_or(0);
tracing::debug!(
engine = "chunked",
src_label = %src_label,
mid_label = %mid_label,
dst_label = %dst_label,
rel_type = %rel_type,
hwm_src,
hwm_dst,
"executing via chunked pipeline (2-hop)"
);
let src_var = src_node_pat.var.as_str();
let mid_var = mid_node_pat.var.as_str();
let dst_var = dst_node_pat.var.as_str();
let mut col_ids_src = collect_col_ids_for_var(src_var, column_names, src_label_id);
let mut col_ids_dst = collect_col_ids_for_var(dst_var, column_names, dst_label_id);
let mut col_ids_mid: Vec<u32> = vec![];
if let Some(ref wexpr) = m.where_clause {
collect_col_ids_from_expr_for_var(wexpr, src_var, &mut col_ids_src);
collect_col_ids_from_expr_for_var(wexpr, dst_var, &mut col_ids_dst);
collect_col_ids_from_expr_for_var(wexpr, mid_var, &mut col_ids_mid);
}
for p in &src_node_pat.props {
let cid = sparrowdb_common::col_id_of(&p.key);
if !col_ids_src.contains(&cid) {
col_ids_src.push(cid);
}
}
for p in &mid_node_pat.props {
let cid = sparrowdb_common::col_id_of(&p.key);
if !col_ids_mid.contains(&cid) {
col_ids_mid.push(cid);
}
}
for p in &dst_node_pat.props {
let cid = sparrowdb_common::col_id_of(&p.key);
if !col_ids_dst.contains(&cid) {
col_ids_dst.push(cid);
}
}
if !mid_var.is_empty() {
let mid_return_ids = collect_col_ids_for_var(mid_var, column_names, mid_label_id);
for cid in mid_return_ids {
if !col_ids_mid.contains(&cid) {
col_ids_mid.push(cid);
}
}
}
let delta_records = {
let edge_store = sparrowdb_storage::edge_store::EdgeStore::open(
&self.snapshot.db_root,
sparrowdb_storage::edge_store::RelTableId(catalog_rel_id),
);
edge_store.and_then(|s| s.read_delta()).unwrap_or_default()
};
let csr = self
.snapshot
.csrs
.get(&catalog_rel_id)
.cloned()
.unwrap_or_else(|| sparrowdb_storage::csr::CsrForward::build(0, &[]));
let avg_degree_hint = self
.snapshot
.rel_degree_stats()
.get(&catalog_rel_id)
.map(|s| s.mean().ceil() as usize)
.unwrap_or(8);
let src_pred_opt = m
.where_clause
.as_ref()
.and_then(|wexpr| try_compile_predicate(wexpr, src_var, &col_ids_src));
let mid_pred_opt = m
.where_clause
.as_ref()
.and_then(|wexpr| try_compile_predicate(wexpr, mid_var, &col_ids_mid));
let dst_pred_opt = m
.where_clause
.as_ref()
.and_then(|wexpr| try_compile_predicate(wexpr, dst_var, &col_ids_dst));
let store_arc = Arc::new(sparrowdb_storage::node_store::NodeStore::open(
self.snapshot.store.root_path(),
)?);
let limit = m.limit.map(|l| l as usize);
let memory_limit = self.memory_limit_bytes;
let mut rows: Vec<Vec<Value>> = Vec::new();
let node_capacity = (hwm_src.max(hwm_dst) as usize).max(64);
let mut frontier = BfsArena::new(
avg_degree_hint * (crate::chunk::CHUNK_CAPACITY / 2),
node_capacity,
);
let row_size_estimate = column_names.len().max(1) * 16;
let mut scan = ScanByLabel::new(hwm_src);
'outer: while let Some(scan_chunk) = scan.next_chunk()? {
let src_chunk = if !col_ids_src.is_empty() {
let mut rnp = ReadNodeProps::new(
SingleChunkSource::new(scan_chunk),
Arc::clone(&store_arc),
src_label_id,
crate::chunk::COL_ID_SLOT,
col_ids_src.clone(),
);
match rnp.next_chunk()? {
Some(c) => c,
None => continue,
}
} else {
scan_chunk
};
let src_chunk = if let Some(ref pred) = src_pred_opt {
let pred = pred.clone();
let keep: Vec<bool> = (0..src_chunk.len())
.map(|i| pred.eval(&src_chunk, i))
.collect();
let mut c = src_chunk;
c.filter_sel(|i| keep[i]);
if c.live_len() == 0 {
continue;
}
c
} else {
src_chunk
};
let mut gn1 = GetNeighbors::new(
SingleChunkSource::new(src_chunk.clone()),
csr.clone(),
&delta_records,
src_label_id,
avg_degree_hint,
);
while let Some(hop1_chunk) = gn1.next_chunk()? {
frontier.clear();
let accum_bytes = rows.len() * row_size_estimate + frontier.bytes_used();
if accum_bytes > memory_limit {
return Err(DbError::QueryMemoryExceeded);
}
let mid_chunk = if !col_ids_mid.is_empty() {
let mut rnp = ReadNodeProps::new(
SingleChunkSource::new(hop1_chunk),
Arc::clone(&store_arc),
mid_label_id,
COL_ID_DST_SLOT,
col_ids_mid.clone(),
);
match rnp.next_chunk()? {
Some(c) => c,
None => continue,
}
} else {
hop1_chunk
};
let mid_chunk = if let Some(ref pred) = mid_pred_opt {
let pred = pred.clone();
let keep: Vec<bool> = (0..mid_chunk.len())
.map(|i| pred.eval(&mid_chunk, i))
.collect();
let mut c = mid_chunk;
c.filter_sel(|i| keep[i]);
if c.live_len() == 0 {
continue;
}
c
} else {
mid_chunk
};
let mid_slot_col = mid_chunk.find_column(COL_ID_DST_SLOT);
let hop1_src_col = mid_chunk.find_column(COL_ID_SRC_SLOT);
let live_pairs: Vec<(u64, u64)> = mid_chunk
.live_rows()
.map(|row_idx| {
let mid_slot = mid_slot_col.map(|c| c.data[row_idx]).unwrap_or(0);
let src_slot = hop1_src_col.map(|c| c.data[row_idx]).unwrap_or(0);
(src_slot, mid_slot)
})
.collect();
for &(_, mid_slot) in &live_pairs {
if frontier.visit(mid_slot) {
frontier.current_mut().push(mid_slot);
}
}
let mid_slots_chunk = {
let data: Vec<u64> = frontier.current().to_vec();
let col =
crate::chunk::ColumnVector::from_data(crate::chunk::COL_ID_SLOT, data);
DataChunk::from_columns(vec![col])
};
let mut gn2 = GetNeighbors::new(
SingleChunkSource::new(mid_slots_chunk),
csr.clone(),
&delta_records,
mid_label_id,
avg_degree_hint,
);
while let Some(hop2_chunk) = gn2.next_chunk()? {
let dst_chunk = if !col_ids_dst.is_empty() {
let mut rnp = ReadNodeProps::new(
SingleChunkSource::new(hop2_chunk),
Arc::clone(&store_arc),
dst_label_id,
COL_ID_DST_SLOT,
col_ids_dst.clone(),
);
match rnp.next_chunk()? {
Some(c) => c,
None => continue,
}
} else {
hop2_chunk
};
let dst_chunk = if let Some(ref pred) = dst_pred_opt {
let pred = pred.clone();
let keep: Vec<bool> = (0..dst_chunk.len())
.map(|i| pred.eval(&dst_chunk, i))
.collect();
let mut c = dst_chunk;
c.filter_sel(|i| keep[i]);
if c.live_len() == 0 {
continue;
}
c
} else {
dst_chunk
};
let hop2_src_col = dst_chunk.find_column(COL_ID_SRC_SLOT); let dst_slot_col = dst_chunk.find_column(COL_ID_DST_SLOT);
let src_slot_col_in_scan = src_chunk.find_column(crate::chunk::COL_ID_SLOT);
let src_index: std::collections::HashMap<u64, usize> = src_slot_col_in_scan
.map(|sc| (0..sc.data.len()).map(|i| (sc.data[i], i)).collect())
.unwrap_or_default();
let mid_index: std::collections::HashMap<u64, usize> = {
let mid_slot_col_in_mid = mid_chunk.find_column(COL_ID_DST_SLOT);
mid_slot_col_in_mid
.map(|mc| (0..mc.data.len()).map(|i| (mc.data[i], i)).collect())
.unwrap_or_default()
};
for row_idx in dst_chunk.live_rows() {
let dst_slot = dst_slot_col.map(|c| c.data[row_idx]).unwrap_or(0);
let via_mid_slot = hop2_src_col.map(|c| c.data[row_idx]).unwrap_or(0);
for &(src_slot, mid_slot) in &live_pairs {
if mid_slot != via_mid_slot {
continue;
}
let src_node = NodeId(((src_label_id as u64) << 32) | src_slot);
let mid_node = NodeId(((mid_label_id as u64) << 32) | mid_slot);
let dst_node = NodeId(((dst_label_id as u64) << 32) | dst_slot);
if self.is_node_tombstoned(src_node)
|| self.is_node_tombstoned(mid_node)
|| self.is_node_tombstoned(dst_node)
{
continue;
}
let src_props = if src_slot_col_in_scan.is_some() {
if let Some(&src_ri) = src_index.get(&src_slot) {
build_props_from_chunk(&src_chunk, src_ri, &col_ids_src)
} else {
let nullable = self
.snapshot
.store
.get_node_raw_nullable(src_node, &col_ids_src)?;
nullable
.into_iter()
.filter_map(|(cid, opt)| opt.map(|v| (cid, v)))
.collect()
}
} else {
vec![]
};
let mid_props: Vec<(u32, u64)> = if !col_ids_mid.is_empty() {
if let Some(&mid_ri) = mid_index.get(&mid_slot) {
build_props_from_chunk(&mid_chunk, mid_ri, &col_ids_mid)
} else {
let nullable = self
.snapshot
.store
.get_node_raw_nullable(mid_node, &col_ids_mid)?;
nullable
.into_iter()
.filter_map(|(cid, opt)| opt.map(|v| (cid, v)))
.collect()
}
} else {
vec![]
};
let dst_props =
build_props_from_chunk(&dst_chunk, row_idx, &col_ids_dst);
if let Some(ref where_expr) = m.where_clause {
let mut row_vals = build_row_vals(
&src_props,
src_var,
&col_ids_src,
&self.snapshot.store,
);
row_vals.extend(build_row_vals(
&mid_props,
mid_var,
&col_ids_mid,
&self.snapshot.store,
));
row_vals.extend(build_row_vals(
&dst_props,
dst_var,
&col_ids_dst,
&self.snapshot.store,
));
row_vals.extend(self.dollar_params());
if !self.eval_where_graph(where_expr, &row_vals) {
continue;
}
}
let row = project_three_var_row(
&src_props,
&mid_props,
&dst_props,
column_names,
src_var,
mid_var,
&self.snapshot.store,
);
rows.push(row);
if rows.len() * row_size_estimate > memory_limit {
return Err(DbError::QueryMemoryExceeded);
}
if let Some(lim) = limit {
if rows.len() >= lim {
break 'outer;
}
}
}
}
}
}
}
Ok(QueryResult {
columns: column_names.to_vec(),
rows,
})
}
pub(crate) fn execute_scan_chunked(
&self,
m: &MatchStatement,
column_names: &[String],
) -> Result<QueryResult> {
use crate::pipeline::PipelineOperator;
let pat = &m.pattern[0];
let node = &pat.nodes[0];
let label = node.labels.first().cloned().unwrap_or_default();
let label_id = match self.snapshot.catalog.get_label(&label)? {
Some(id) => id as u32,
None => {
return Ok(QueryResult {
columns: column_names.to_vec(),
rows: vec![],
});
}
};
let hwm = self.snapshot.store.hwm_for_label(label_id)?;
tracing::debug!(label = %label, hwm = hwm, "chunked pipeline: label scan");
let mut all_col_ids: Vec<u32> = collect_col_ids_from_columns(column_names);
if let Some(ref wexpr) = m.where_clause {
collect_col_ids_from_expr(wexpr, &mut all_col_ids);
}
for p in &node.props {
let cid = col_id_of(&p.key);
if !all_col_ids.contains(&cid) {
all_col_ids.push(cid);
}
}
let var_name = node.var.as_str();
let mut rows: Vec<Vec<Value>> = Vec::new();
let mut scan = ScanByLabel::new(hwm);
while let Some(chunk) = scan.next_chunk()? {
for row_idx in chunk.live_rows() {
let slot = chunk.column(0).data[row_idx];
let node_id = NodeId(((label_id as u64) << 32) | slot);
if self.is_node_tombstoned(node_id) {
continue;
}
let nullable_props = self
.snapshot
.store
.get_node_raw_nullable(node_id, &all_col_ids)?;
let props: Vec<(u32, u64)> = nullable_props
.iter()
.filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
.collect();
if !self.matches_prop_filter(&props, &node.props) {
continue;
}
if let Some(ref where_expr) = m.where_clause {
let mut row_vals =
build_row_vals(&props, var_name, &all_col_ids, &self.snapshot.store);
if !var_name.is_empty() && !label.is_empty() {
row_vals.insert(
format!("{}.__labels__", var_name),
Value::List(vec![Value::String(label.clone())]),
);
}
if !var_name.is_empty() {
row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
}
row_vals.extend(self.dollar_params());
if !self.eval_where_graph(where_expr, &row_vals) {
continue;
}
}
let row = project_row(
&props,
column_names,
&all_col_ids,
var_name,
&label,
&self.snapshot.store,
Some(node_id),
);
rows.push(row);
}
}
Ok(QueryResult {
columns: column_names.to_vec(),
rows,
})
}
}
struct SingleChunkSource {
chunk: Option<DataChunk>,
}
impl SingleChunkSource {
fn new(chunk: DataChunk) -> Self {
SingleChunkSource { chunk: Some(chunk) }
}
}
impl PipelineOperator for SingleChunkSource {
fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
Ok(self.chunk.take())
}
}
fn column_name_for_item(item: &ReturnItem) -> String {
if let Some(ref alias) = item.alias {
return alias.clone();
}
match &item.expr {
Expr::PropAccess { var, prop } => format!("{}.{}", var, prop),
Expr::Var(v) => v.clone(),
_ => String::new(),
}
}
fn expr_references_var(expr: &Expr, var_name: &str) -> bool {
match expr {
Expr::PropAccess { var, .. } => var.as_str() == var_name,
Expr::BinOp { left, right, .. } => {
expr_references_var(left, var_name) || expr_references_var(right, var_name)
}
Expr::And(a, b) | Expr::Or(a, b) => {
expr_references_var(a, var_name) || expr_references_var(b, var_name)
}
Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
expr_references_var(inner, var_name)
}
_ => false,
}
}
fn is_simple_where_for_chunked(expr: &Expr) -> bool {
match expr {
Expr::BinOp { left, op, right } => {
match op {
BinOpKind::Contains | BinOpKind::StartsWith | BinOpKind::EndsWith => false,
_ => is_simple_where_for_chunked(left) && is_simple_where_for_chunked(right),
}
}
Expr::And(a, b) | Expr::Or(a, b) => {
is_simple_where_for_chunked(a) && is_simple_where_for_chunked(b)
}
Expr::Not(inner) => is_simple_where_for_chunked(inner),
Expr::IsNull(_) | Expr::IsNotNull(_) => true,
Expr::PropAccess { .. } | Expr::Var(_) | Expr::Literal(_) => true,
Expr::ExistsSubquery(_) | Expr::NotExists(_) | Expr::FnCall { .. } => false,
_ => true,
}
}
fn try_compile_predicate(expr: &Expr, var_name: &str, _col_ids: &[u32]) -> Option<ChunkPredicate> {
match expr {
Expr::BinOp { left, op, right } => {
let (prop_expr, lit_expr, swapped) = if matches!(right.as_ref(), Expr::Literal(_)) {
(left.as_ref(), right.as_ref(), false)
} else if matches!(left.as_ref(), Expr::Literal(_)) {
(right.as_ref(), left.as_ref(), true)
} else {
return None;
};
let (v, key) = match prop_expr {
Expr::PropAccess { var, prop } => (var.as_str(), prop.as_str()),
_ => return None,
};
if v != var_name {
return None;
}
let col_id = col_id_of(key);
let rhs_raw = match lit_expr {
Expr::Literal(lit) => literal_to_raw_u64(lit)?,
_ => return None,
};
let effective_op = if swapped {
match op {
BinOpKind::Lt => BinOpKind::Gt,
BinOpKind::Le => BinOpKind::Ge,
BinOpKind::Gt => BinOpKind::Lt,
BinOpKind::Ge => BinOpKind::Le,
other => other.clone(),
}
} else {
op.clone()
};
match effective_op {
BinOpKind::Eq => Some(ChunkPredicate::Eq { col_id, rhs_raw }),
BinOpKind::Neq => Some(ChunkPredicate::Ne { col_id, rhs_raw }),
BinOpKind::Gt => Some(ChunkPredicate::Gt { col_id, rhs_raw }),
BinOpKind::Ge => Some(ChunkPredicate::Ge { col_id, rhs_raw }),
BinOpKind::Lt => Some(ChunkPredicate::Lt { col_id, rhs_raw }),
BinOpKind::Le => Some(ChunkPredicate::Le { col_id, rhs_raw }),
_ => None,
}
}
Expr::IsNull(inner) => {
if let Expr::PropAccess { var, prop } = inner.as_ref() {
if var.as_str() == var_name {
return Some(ChunkPredicate::IsNull {
col_id: col_id_of(prop),
});
}
}
None
}
Expr::IsNotNull(inner) => {
if let Expr::PropAccess { var, prop } = inner.as_ref() {
if var.as_str() == var_name {
return Some(ChunkPredicate::IsNotNull {
col_id: col_id_of(prop),
});
}
}
None
}
Expr::And(a, b) => {
let ca = try_compile_predicate(a, var_name, _col_ids);
let cb = try_compile_predicate(b, var_name, _col_ids);
match (ca, cb) {
(Some(pa), Some(pb)) => Some(ChunkPredicate::And(vec![pa, pb])),
_ => None,
}
}
_ => None,
}
}
fn literal_to_raw_u64(lit: &Literal) -> Option<u64> {
use sparrowdb_storage::node_store::Value as StoreValue;
match lit {
Literal::Int(n) => Some(StoreValue::Int64(*n).to_u64()),
Literal::Bool(b) => Some(StoreValue::Int64(if *b { 1 } else { 0 }).to_u64()),
Literal::String(_) | Literal::Float(_) | Literal::Null | Literal::Param(_) => None,
}
}
fn build_props_from_chunk(chunk: &DataChunk, row_idx: usize, col_ids: &[u32]) -> Vec<(u32, u64)> {
col_ids
.iter()
.filter_map(|&cid| {
let col = chunk.find_column(cid)?;
if col.nulls.is_null(row_idx) {
None
} else {
Some((cid, col.data[row_idx]))
}
})
.collect()
}
struct DstSlotProjector<C: PipelineOperator> {
child: C,
}
impl<C: PipelineOperator> DstSlotProjector<C> {
fn new(child: C) -> Self {
DstSlotProjector { child }
}
}
impl<C: PipelineOperator> PipelineOperator for DstSlotProjector<C> {
fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
use crate::chunk::ColumnVector;
loop {
let chunk = match self.child.next_chunk()? {
Some(c) => c,
None => return Ok(None),
};
if chunk.is_empty() {
continue;
}
let dst_col = match chunk.find_column(COL_ID_DST_SLOT) {
Some(c) => c,
None => continue,
};
let data: Vec<u64> = chunk.live_rows().map(|i| dst_col.data[i]).collect();
if data.is_empty() {
continue;
}
let col = ColumnVector::from_data(crate::chunk::COL_ID_SLOT, data);
return Ok(Some(DataChunk::from_columns(vec![col])));
}
}
}
fn is_id_call(expr: &Expr, var_name: &str) -> bool {
match expr {
Expr::FnCall { name, args } => {
name.eq_ignore_ascii_case("id")
&& args.len() == 1
&& matches!(&args[0], Expr::Var(v) if v.as_str() == var_name)
}
_ => false,
}
}
fn is_param_literal(expr: &Expr) -> bool {
matches!(expr, Expr::Literal(Literal::Param(_)))
}
fn where_is_only_id_param_conjuncts(expr: &Expr, a_var: &str, b_var: &str) -> bool {
match expr {
Expr::And(left, right) => {
where_is_only_id_param_conjuncts(left, a_var, b_var)
&& where_is_only_id_param_conjuncts(right, a_var, b_var)
}
Expr::BinOp {
left,
op: BinOpKind::Eq,
right,
} => {
(is_id_call(left, a_var) || is_id_call(left, b_var)) && is_param_literal(right)
|| is_param_literal(left) && (is_id_call(right, a_var) || is_id_call(right, b_var))
}
_ => false,
}
}
fn extract_id_param_slot(
where_clause: Option<&Expr>,
var_name: &str,
params: &std::collections::HashMap<String, crate::types::Value>,
expected_label_id: u32,
) -> Option<u64> {
let wexpr = where_clause?;
let param_name = find_id_param_name(wexpr, var_name)?;
let val = params.get(¶m_name)?;
let raw_node_id: u64 = match val {
crate::types::Value::Int64(n) => *n as u64,
crate::types::Value::NodeRef(nid) => nid.0,
_ => return None,
};
let (label_id, slot) = super::node_id_parts(raw_node_id);
if label_id != expected_label_id {
return None;
}
Some(slot)
}
fn find_id_param_name(expr: &Expr, var_name: &str) -> Option<String> {
match expr {
Expr::BinOp { left, op, right } => {
if *op == BinOpKind::Eq {
if is_id_call(left, var_name) {
if let Expr::Literal(Literal::Param(p)) = right.as_ref() {
return Some(p.clone());
}
}
if is_id_call(right, var_name) {
if let Expr::Literal(Literal::Param(p)) = left.as_ref() {
return Some(p.clone());
}
}
}
find_id_param_name(left, var_name).or_else(|| find_id_param_name(right, var_name))
}
Expr::And(a, b) => {
find_id_param_name(a, var_name).or_else(|| find_id_param_name(b, var_name))
}
_ => None,
}
}
fn find_slot_by_props(
store: &NodeStore,
label_id: u32,
hwm: u64,
props: &[sparrowdb_cypher::ast::PropEntry],
params: &std::collections::HashMap<String, crate::types::Value>,
prop_index: &PropertyIndex,
) -> Option<u64> {
if props.is_empty() || hwm == 0 {
return None;
}
if let Some(slots) = try_index_lookup_for_props(props, label_id, prop_index) {
return slots.into_iter().next().map(|s| s as u64);
}
if props.len() == 1 {
let filter = &props[0];
let col_id = prop_name_to_col_id(&filter.key);
let target_raw_opt: Option<u64> = match &filter.value {
Expr::Literal(Literal::Int(n)) => Some(StoreValue::Int64(*n).to_u64()),
Expr::Literal(Literal::String(s)) if s.len() <= 7 => {
Some(StoreValue::Bytes(s.as_bytes().to_vec()).to_u64())
}
_ => None,
};
if let Some(target_raw) = target_raw_opt {
let col_data = match store.read_col_all(label_id, col_id) {
Ok(d) => d,
Err(_) => return None,
};
let null_bitmap = store.read_null_bitmap_all(label_id, col_id).ok().flatten();
for (slot, &raw) in col_data.iter().enumerate().take(hwm as usize) {
let is_present = match &null_bitmap {
None => raw != 0,
Some(bits) => bits.get(slot).copied().unwrap_or(false),
};
if !is_present {
continue;
}
if raw != target_raw {
continue;
}
return Some(slot as u64);
}
return None;
}
}
let col_ids: Vec<u32> = props.iter().map(|p| prop_name_to_col_id(&p.key)).collect();
for slot in 0..hwm {
let node_id = NodeId(((label_id as u64) << 32) | slot);
let Ok(raw_props) = store.get_node_raw_nullable(node_id, &col_ids) else {
continue;
};
let stored: Vec<(u32, u64)> = raw_props
.into_iter()
.filter_map(|(c, opt)| opt.map(|v| (c, v)))
.collect();
if matches_prop_filter_static(&stored, props, params, store) {
return Some(slot);
}
}
None
}