use crate::*;
impl InMemoryEngine {
pub(crate) fn execute_expand(
&self,
input: &[Row],
src_col: u32,
types: &[String],
dir: ExpandDir,
legal_src_labels: &[String],
legal_dst_labels: &[String],
) -> Result<RowSet, ExecutionError> {
let mut out = Vec::new();
for row in input {
let src_cell = row
.get(src_col as usize)
.ok_or(ExecutionError::ColumnOutOfBounds {
idx: src_col as usize,
len: row.len(),
})?;
let Value::NodeRef(src_node_id) = src_cell else {
continue;
};
for rel in &self.graph.rels {
if !types.is_empty() && !types.iter().any(|t| t == &rel.typ) {
continue;
}
let mut push_path = |from_id: u64, to_id: u64| -> Result<(), ExecutionError> {
let src_node = self
.graph
.node_by_id(from_id)
.ok_or(ExecutionError::UnknownNode(from_id))?;
let dst_node = self
.graph
.node_by_id(to_id)
.ok_or(ExecutionError::UnknownNode(to_id))?;
if !legal_src_labels.is_empty()
&& !legal_src_labels.iter().any(|l| src_node.labels.contains(l))
{
return Ok(());
}
if !legal_dst_labels.is_empty()
&& !legal_dst_labels.iter().any(|l| dst_node.labels.contains(l))
{
return Ok(());
}
let mut next = row.clone();
next.push(Value::RelRef(rel.id));
next.push(Value::NodeRef(to_id));
out.push(next);
Ok(())
};
match dir {
ExpandDir::Out => {
if rel.src == *src_node_id {
push_path(rel.src, rel.dst)?;
}
}
ExpandDir::In => {
if rel.dst == *src_node_id {
push_path(rel.dst, rel.src)?;
}
}
ExpandDir::Both => {
if rel.src == *src_node_id {
push_path(rel.src, rel.dst)?;
}
if rel.dst == *src_node_id {
push_path(rel.dst, rel.src)?;
}
}
}
}
}
Ok(out)
}
pub(crate) fn execute_optional_expand(
&self,
input: &[Row],
src_col: u32,
types: &[String],
dir: ExpandDir,
legal_src_labels: &[String],
legal_dst_labels: &[String],
) -> Result<RowSet, ExecutionError> {
let mut out = Vec::new();
for row in input {
let src_cell = row
.get(src_col as usize)
.ok_or(ExecutionError::ColumnOutOfBounds {
idx: src_col as usize,
len: row.len(),
})?;
let Value::NodeRef(src_node_id) = src_cell else {
let mut next = row.clone();
next.push(Value::Null);
next.push(Value::Null);
out.push(next);
continue;
};
let mut matched = false;
for rel in &self.graph.rels {
if !types.is_empty() && !types.iter().any(|t| t == &rel.typ) {
continue;
}
let mut push_match = |to_id: u64| {
let Some(src_node) = self.graph.node_by_id(*src_node_id) else {
return;
};
let Some(dst_node) = self.graph.node_by_id(to_id) else {
return;
};
if !legal_src_labels.is_empty()
&& !legal_src_labels.iter().any(|l| src_node.labels.contains(l))
{
return;
}
if !legal_dst_labels.is_empty()
&& !legal_dst_labels.iter().any(|l| dst_node.labels.contains(l))
{
return;
}
let mut next = row.clone();
next.push(Value::RelRef(rel.id));
next.push(Value::NodeRef(to_id));
out.push(next);
matched = true;
};
match dir {
ExpandDir::Out => {
if rel.src == *src_node_id {
push_match(rel.dst);
}
}
ExpandDir::In => {
if rel.dst == *src_node_id {
push_match(rel.src);
}
}
ExpandDir::Both => {
if rel.src == *src_node_id {
push_match(rel.dst);
}
if rel.dst == *src_node_id {
push_match(rel.src);
}
}
}
}
if !matched {
let mut next = row.clone();
next.push(Value::Null);
next.push(Value::Null);
out.push(next);
}
}
Ok(out)
}
pub(crate) fn execute_expand_var_len(
&self,
input: &[Row],
src_col: u32,
types: &[String],
dir: ExpandDir,
min_hops: i32,
max_hops: i32,
) -> Result<RowSet, ExecutionError> {
struct DfsCtx<'a> {
graph: &'a Graph,
row: &'a Row,
types: &'a [String],
dir: ExpandDir,
min_hops: i32,
max_hops: i32,
out: &'a mut RowSet,
}
fn dfs(
ctx: &mut DfsCtx<'_>,
current: u64,
depth: i32,
rel_path: &mut Vec<Value>,
used_rels: &mut HashSet<u64>,
) {
if depth >= ctx.min_hops {
let mut next = ctx.row.clone();
next.push(Value::List(rel_path.clone()));
next.push(Value::NodeRef(current));
ctx.out.push(next);
}
if ctx.max_hops >= 0 && depth >= ctx.max_hops {
return;
}
for rel in &ctx.graph.rels {
if !ctx.types.is_empty() && !ctx.types.iter().any(|t| t == &rel.typ) {
continue;
}
if used_rels.contains(&rel.id) {
continue;
}
let candidate = match ctx.dir {
ExpandDir::Out if rel.src == current => Some(rel.dst),
ExpandDir::In if rel.dst == current => Some(rel.src),
ExpandDir::Both if rel.src == current => Some(rel.dst),
ExpandDir::Both if rel.dst == current => Some(rel.src),
_ => None,
};
let Some(next_node) = candidate else {
continue;
};
used_rels.insert(rel.id);
rel_path.push(Value::RelRef(rel.id));
dfs(ctx, next_node, depth + 1, rel_path, used_rels);
rel_path.pop();
used_rels.remove(&rel.id);
}
}
let min_hops = min_hops.max(0);
let mut out = Vec::new();
for row in input {
let src_cell = row
.get(src_col as usize)
.ok_or(ExecutionError::ColumnOutOfBounds {
idx: src_col as usize,
len: row.len(),
})?;
let Value::NodeRef(src_node_id) = src_cell else {
continue;
};
let mut rel_path = Vec::new();
let mut used_rels = HashSet::new();
let mut ctx = DfsCtx {
graph: &self.graph,
row,
types,
dir,
min_hops,
max_hops,
out: &mut out,
};
dfs(&mut ctx, *src_node_id, 0, &mut rel_path, &mut used_rels);
}
Ok(out)
}
pub(crate) fn execute_semi_expand(
&self,
input: &[Row],
src_col: u32,
types: &[String],
dir: ExpandDir,
legal_src_labels: &[String],
legal_dst_labels: &[String],
) -> Result<RowSet, ExecutionError> {
let mut out = Vec::new();
for row in input {
let src_cell = row
.get(src_col as usize)
.ok_or(ExecutionError::ColumnOutOfBounds {
idx: src_col as usize,
len: row.len(),
})?;
let Value::NodeRef(src_node_id) = src_cell else {
continue;
};
let mut matched = false;
'rels: for rel in &self.graph.rels {
if !types.is_empty() && !types.iter().any(|t| t == &rel.typ) {
continue;
}
let candidate = match dir {
ExpandDir::Out if rel.src == *src_node_id => Some((rel.src, rel.dst)),
ExpandDir::In if rel.dst == *src_node_id => Some((rel.dst, rel.src)),
ExpandDir::Both if rel.src == *src_node_id => Some((rel.src, rel.dst)),
ExpandDir::Both if rel.dst == *src_node_id => Some((rel.dst, rel.src)),
_ => None,
};
let Some((from_id, to_id)) = candidate else {
continue;
};
let src_node = self
.graph
.node_by_id(from_id)
.ok_or(ExecutionError::UnknownNode(from_id))?;
let dst_node = self
.graph
.node_by_id(to_id)
.ok_or(ExecutionError::UnknownNode(to_id))?;
if !legal_src_labels.is_empty()
&& !legal_src_labels.iter().any(|l| src_node.labels.contains(l))
{
continue;
}
if !legal_dst_labels.is_empty()
&& !legal_dst_labels.iter().any(|l| dst_node.labels.contains(l))
{
continue;
}
matched = true;
break 'rels;
}
if matched {
out.push(row.clone());
}
}
Ok(out)
}
}