plexus-engine 0.3.6

Engine integration traits for consuming Plexus plans
Documentation
use crate::*;

impl InMemoryEngine {
    pub(crate) fn execute_expand(
        &self,
        input: &[Row],
        src_col: u32,
        types: &[String],
        dir: ExpandDir,
        legal_src_labels: &[String],
        legal_dst_labels: &[String],
    ) -> Result<RowSet, ExecutionError> {
        let mut out = Vec::new();
        for row in input {
            let src_cell = row
                .get(src_col as usize)
                .ok_or(ExecutionError::ColumnOutOfBounds {
                    idx: src_col as usize,
                    len: row.len(),
                })?;
            let Value::NodeRef(src_node_id) = src_cell else {
                continue;
            };

            for rel in &self.graph.rels {
                if !types.is_empty() && !types.iter().any(|t| t == &rel.typ) {
                    continue;
                }

                let mut push_path = |from_id: u64, to_id: u64| -> Result<(), ExecutionError> {
                    let src_node = self
                        .graph
                        .node_by_id(from_id)
                        .ok_or(ExecutionError::UnknownNode(from_id))?;
                    let dst_node = self
                        .graph
                        .node_by_id(to_id)
                        .ok_or(ExecutionError::UnknownNode(to_id))?;
                    if !legal_src_labels.is_empty()
                        && !legal_src_labels.iter().any(|l| src_node.labels.contains(l))
                    {
                        return Ok(());
                    }
                    if !legal_dst_labels.is_empty()
                        && !legal_dst_labels.iter().any(|l| dst_node.labels.contains(l))
                    {
                        return Ok(());
                    }
                    let mut next = row.clone();
                    next.push(Value::RelRef(rel.id));
                    next.push(Value::NodeRef(to_id));
                    out.push(next);
                    Ok(())
                };

                match dir {
                    ExpandDir::Out => {
                        if rel.src == *src_node_id {
                            push_path(rel.src, rel.dst)?;
                        }
                    }
                    ExpandDir::In => {
                        if rel.dst == *src_node_id {
                            push_path(rel.dst, rel.src)?;
                        }
                    }
                    ExpandDir::Both => {
                        if rel.src == *src_node_id {
                            push_path(rel.src, rel.dst)?;
                        }
                        if rel.dst == *src_node_id {
                            push_path(rel.dst, rel.src)?;
                        }
                    }
                }
            }
        }
        Ok(out)
    }

    pub(crate) fn execute_optional_expand(
        &self,
        input: &[Row],
        src_col: u32,
        types: &[String],
        dir: ExpandDir,
        legal_src_labels: &[String],
        legal_dst_labels: &[String],
    ) -> Result<RowSet, ExecutionError> {
        let mut out = Vec::new();
        for row in input {
            let src_cell = row
                .get(src_col as usize)
                .ok_or(ExecutionError::ColumnOutOfBounds {
                    idx: src_col as usize,
                    len: row.len(),
                })?;
            let Value::NodeRef(src_node_id) = src_cell else {
                let mut next = row.clone();
                next.push(Value::Null);
                next.push(Value::Null);
                out.push(next);
                continue;
            };

            let mut matched = false;
            for rel in &self.graph.rels {
                if !types.is_empty() && !types.iter().any(|t| t == &rel.typ) {
                    continue;
                }

                let mut push_match = |to_id: u64| {
                    let Some(src_node) = self.graph.node_by_id(*src_node_id) else {
                        return;
                    };
                    let Some(dst_node) = self.graph.node_by_id(to_id) else {
                        return;
                    };
                    if !legal_src_labels.is_empty()
                        && !legal_src_labels.iter().any(|l| src_node.labels.contains(l))
                    {
                        return;
                    }
                    if !legal_dst_labels.is_empty()
                        && !legal_dst_labels.iter().any(|l| dst_node.labels.contains(l))
                    {
                        return;
                    }
                    let mut next = row.clone();
                    next.push(Value::RelRef(rel.id));
                    next.push(Value::NodeRef(to_id));
                    out.push(next);
                    matched = true;
                };

                match dir {
                    ExpandDir::Out => {
                        if rel.src == *src_node_id {
                            push_match(rel.dst);
                        }
                    }
                    ExpandDir::In => {
                        if rel.dst == *src_node_id {
                            push_match(rel.src);
                        }
                    }
                    ExpandDir::Both => {
                        if rel.src == *src_node_id {
                            push_match(rel.dst);
                        }
                        if rel.dst == *src_node_id {
                            push_match(rel.src);
                        }
                    }
                }
            }

            if !matched {
                let mut next = row.clone();
                next.push(Value::Null);
                next.push(Value::Null);
                out.push(next);
            }
        }
        Ok(out)
    }

    pub(crate) fn execute_expand_var_len(
        &self,
        input: &[Row],
        src_col: u32,
        types: &[String],
        dir: ExpandDir,
        min_hops: i32,
        max_hops: i32,
    ) -> Result<RowSet, ExecutionError> {
        struct DfsCtx<'a> {
            graph: &'a Graph,
            row: &'a Row,
            types: &'a [String],
            dir: ExpandDir,
            min_hops: i32,
            max_hops: i32,
            out: &'a mut RowSet,
        }

        fn dfs(
            ctx: &mut DfsCtx<'_>,
            current: u64,
            depth: i32,
            rel_path: &mut Vec<Value>,
            used_rels: &mut HashSet<u64>,
        ) {
            if depth >= ctx.min_hops {
                let mut next = ctx.row.clone();
                next.push(Value::List(rel_path.clone()));
                next.push(Value::NodeRef(current));
                ctx.out.push(next);
            }

            if ctx.max_hops >= 0 && depth >= ctx.max_hops {
                return;
            }

            for rel in &ctx.graph.rels {
                if !ctx.types.is_empty() && !ctx.types.iter().any(|t| t == &rel.typ) {
                    continue;
                }
                if used_rels.contains(&rel.id) {
                    continue;
                }

                let candidate = match ctx.dir {
                    ExpandDir::Out if rel.src == current => Some(rel.dst),
                    ExpandDir::In if rel.dst == current => Some(rel.src),
                    ExpandDir::Both if rel.src == current => Some(rel.dst),
                    ExpandDir::Both if rel.dst == current => Some(rel.src),
                    _ => None,
                };

                let Some(next_node) = candidate else {
                    continue;
                };

                used_rels.insert(rel.id);
                rel_path.push(Value::RelRef(rel.id));
                dfs(ctx, next_node, depth + 1, rel_path, used_rels);
                rel_path.pop();
                used_rels.remove(&rel.id);
            }
        }

        let min_hops = min_hops.max(0);
        let mut out = Vec::new();
        for row in input {
            let src_cell = row
                .get(src_col as usize)
                .ok_or(ExecutionError::ColumnOutOfBounds {
                    idx: src_col as usize,
                    len: row.len(),
                })?;
            let Value::NodeRef(src_node_id) = src_cell else {
                continue;
            };

            let mut rel_path = Vec::new();
            let mut used_rels = HashSet::new();
            let mut ctx = DfsCtx {
                graph: &self.graph,
                row,
                types,
                dir,
                min_hops,
                max_hops,
                out: &mut out,
            };
            dfs(&mut ctx, *src_node_id, 0, &mut rel_path, &mut used_rels);
        }
        Ok(out)
    }

    pub(crate) fn execute_semi_expand(
        &self,
        input: &[Row],
        src_col: u32,
        types: &[String],
        dir: ExpandDir,
        legal_src_labels: &[String],
        legal_dst_labels: &[String],
    ) -> Result<RowSet, ExecutionError> {
        let mut out = Vec::new();
        for row in input {
            let src_cell = row
                .get(src_col as usize)
                .ok_or(ExecutionError::ColumnOutOfBounds {
                    idx: src_col as usize,
                    len: row.len(),
                })?;
            let Value::NodeRef(src_node_id) = src_cell else {
                continue;
            };

            let mut matched = false;
            'rels: for rel in &self.graph.rels {
                if !types.is_empty() && !types.iter().any(|t| t == &rel.typ) {
                    continue;
                }
                let candidate = match dir {
                    ExpandDir::Out if rel.src == *src_node_id => Some((rel.src, rel.dst)),
                    ExpandDir::In if rel.dst == *src_node_id => Some((rel.dst, rel.src)),
                    ExpandDir::Both if rel.src == *src_node_id => Some((rel.src, rel.dst)),
                    ExpandDir::Both if rel.dst == *src_node_id => Some((rel.dst, rel.src)),
                    _ => None,
                };
                let Some((from_id, to_id)) = candidate else {
                    continue;
                };

                let src_node = self
                    .graph
                    .node_by_id(from_id)
                    .ok_or(ExecutionError::UnknownNode(from_id))?;
                let dst_node = self
                    .graph
                    .node_by_id(to_id)
                    .ok_or(ExecutionError::UnknownNode(to_id))?;
                if !legal_src_labels.is_empty()
                    && !legal_src_labels.iter().any(|l| src_node.labels.contains(l))
                {
                    continue;
                }
                if !legal_dst_labels.is_empty()
                    && !legal_dst_labels.iter().any(|l| dst_node.labels.contains(l))
                {
                    continue;
                }
                matched = true;
                break 'rels;
            }

            if matched {
                out.push(row.clone());
            }
        }
        Ok(out)
    }
}