plexus-engine 0.3.4

Engine integration traits for consuming Plexus plans
Documentation
use super::merge;
use crate::*;

impl InMemoryEngine {
    fn input_rows<'a>(
        &self,
        outputs: &'a [Option<RowSet>],
        input: u32,
    ) -> Result<&'a RowSet, ExecutionError> {
        outputs
            .get(input as usize)
            .ok_or(ExecutionError::InvalidOpRef(input))?
            .as_ref()
            .ok_or(ExecutionError::MissingOpOutput(input))
    }
}

impl MutationEngine for InMemoryEngine {
    type Error = ExecutionError;

    fn create_node(
        &mut self,
        labels: &[String],
        props: HashMap<String, Value>,
    ) -> Result<u64, Self::Error> {
        let next_id = self.graph.nodes.iter().map(|n| n.id).max().unwrap_or(0) + 1;
        self.graph.nodes.push(Node {
            id: next_id,
            labels: labels.iter().cloned().collect(),
            props,
        });
        Ok(next_id)
    }

    fn create_rel(
        &mut self,
        src: u64,
        dst: u64,
        rel_type: &str,
        props: HashMap<String, Value>,
    ) -> Result<u64, Self::Error> {
        if self.graph.node_by_id(src).is_none() {
            return Err(ExecutionError::UnknownNode(src));
        }
        if self.graph.node_by_id(dst).is_none() {
            return Err(ExecutionError::UnknownNode(dst));
        }
        let next_id = self.graph.rels.iter().map(|r| r.id).max().unwrap_or(0) + 1;
        self.graph.rels.push(Relationship {
            id: next_id,
            src,
            dst,
            typ: rel_type.to_string(),
            props,
        });
        Ok(next_id)
    }

    fn merge_pattern(
        &mut self,
        pattern: &Expr,
        on_create_props: &Expr,
        on_match_props: &Expr,
        schema: &[plexus_serde::ColDef],
        row: &Row,
    ) -> Result<(), Self::Error> {
        merge::execute_merge_pattern(self, pattern, on_create_props, on_match_props, schema, row)
    }

    fn delete_entity(&mut self, target: &Value, detach: bool) -> Result<(), Self::Error> {
        match target {
            Value::NodeRef(node_id) => {
                if self.graph.node_by_id(*node_id).is_none() {
                    return Err(ExecutionError::UnknownNode(*node_id));
                }
                let has_incident = self
                    .graph
                    .rels
                    .iter()
                    .any(|r| r.src == *node_id || r.dst == *node_id);
                if has_incident && !detach {
                    return Err(ExecutionError::DeleteRequiresDetach(*node_id));
                }
                if detach {
                    self.graph
                        .rels
                        .retain(|r| r.src != *node_id && r.dst != *node_id);
                }
                self.graph.nodes.retain(|n| n.id != *node_id);
                Ok(())
            }
            Value::RelRef(rel_id) => {
                if self.graph.rel_by_id(*rel_id).is_none() {
                    return Err(ExecutionError::UnknownRel(*rel_id));
                }
                self.graph.rels.retain(|r| r.id != *rel_id);
                Ok(())
            }
            Value::Null => Ok(()),
            _ => Err(ExecutionError::ExpectedEntityRef { idx: 0 }),
        }
    }

    fn set_property(&mut self, target: &Value, key: &str, value: Value) -> Result<(), Self::Error> {
        match target {
            Value::NodeRef(node_id) => {
                let node = self
                    .graph
                    .node_by_id_mut(*node_id)
                    .ok_or(ExecutionError::UnknownNode(*node_id))?;
                node.props.insert(key.to_string(), value);
                Ok(())
            }
            Value::RelRef(rel_id) => {
                let rel = self
                    .graph
                    .rel_by_id_mut(*rel_id)
                    .ok_or(ExecutionError::UnknownRel(*rel_id))?;
                rel.props.insert(key.to_string(), value);
                Ok(())
            }
            Value::Null => Ok(()),
            _ => Err(ExecutionError::ExpectedEntityRef { idx: 0 }),
        }
    }

    fn remove_property(&mut self, target: &Value, key: &str) -> Result<(), Self::Error> {
        match target {
            Value::NodeRef(node_id) => {
                let node = self
                    .graph
                    .node_by_id_mut(*node_id)
                    .ok_or(ExecutionError::UnknownNode(*node_id))?;
                node.props.remove(key);
                Ok(())
            }
            Value::RelRef(rel_id) => {
                let rel = self
                    .graph
                    .rel_by_id_mut(*rel_id)
                    .ok_or(ExecutionError::UnknownRel(*rel_id))?;
                rel.props.remove(key);
                Ok(())
            }
            Value::Null => Ok(()),
            _ => Err(ExecutionError::ExpectedEntityRef { idx: 0 }),
        }
    }
}

impl PlanEngine for InMemoryEngine {
    type Error = ExecutionError;

    fn execute_plan(&mut self, plan: &Plan) -> Result<QueryResult, Self::Error> {
        // The reference engine does not support multi-graph queries.
        // Reject plans that reference more than one distinct graph_ref.
        let mut seen_ref: Option<&str> = None;
        for op in &plan.ops {
            let graph_ref = match op {
                Op::ScanNodes { graph_ref, .. }
                | Op::Expand { graph_ref, .. }
                | Op::OptionalExpand { graph_ref, .. }
                | Op::ExpandVarLen { graph_ref, .. } => graph_ref.as_deref(),
                _ => None,
            };
            if let Some(r) = graph_ref.map(str::trim).filter(|s| !s.is_empty()) {
                match seen_ref {
                    None => seen_ref = Some(r),
                    Some(prev) if prev != r => {
                        return Err(ExecutionError::MultiGraphUnsupported);
                    }
                    _ => {}
                }
            }
        }

        let mut outputs: Vec<Option<RowSet>> = vec![None; plan.ops.len()];

        for (idx, op) in plan.ops.iter().enumerate() {
            let rows = match op {
                Op::ScanNodes {
                    labels,
                    must_labels,
                    forbidden_labels,
                    ..
                } => self.execute_scan_nodes(labels, must_labels, forbidden_labels),
                Op::ScanRels {
                    types,
                    src_labels,
                    dst_labels,
                    ..
                } => self.execute_scan_rels(types, src_labels, dst_labels),
                Op::Expand {
                    input,
                    src_col,
                    types,
                    dir,
                    legal_src_labels,
                    legal_dst_labels,
                    ..
                } => {
                    let input_rows = self.input_rows(&outputs, *input)?;
                    self.execute_expand(
                        input_rows,
                        *src_col,
                        types,
                        *dir,
                        legal_src_labels,
                        legal_dst_labels,
                    )?
                }
                Op::OptionalExpand {
                    input,
                    src_col,
                    types,
                    dir,
                    legal_src_labels,
                    legal_dst_labels,
                    ..
                } => {
                    let input_rows = self.input_rows(&outputs, *input)?;
                    self.execute_optional_expand(
                        input_rows,
                        *src_col,
                        types,
                        *dir,
                        legal_src_labels,
                        legal_dst_labels,
                    )?
                }
                Op::SemiExpand {
                    input,
                    src_col,
                    types,
                    dir,
                    legal_src_labels,
                    legal_dst_labels,
                    ..
                } => {
                    let input_rows = self.input_rows(&outputs, *input)?;
                    self.execute_semi_expand(
                        input_rows,
                        *src_col,
                        types,
                        *dir,
                        legal_src_labels,
                        legal_dst_labels,
                    )?
                }
                Op::ExpandVarLen {
                    input,
                    src_col,
                    types,
                    dir,
                    min_hops,
                    max_hops,
                    ..
                } => {
                    let input_rows = self.input_rows(&outputs, *input)?;
                    self.execute_expand_var_len(
                        input_rows, *src_col, types, *dir, *min_hops, *max_hops,
                    )?
                }
                Op::Filter { input, predicate } => {
                    let input_rows = self.input_rows(&outputs, *input)?;
                    self.execute_filter_rows(input_rows, predicate)?
                }
                Op::BlockMarker { input, .. } => self.input_rows(&outputs, *input)?.clone(),
                Op::Project { input, exprs, .. } => {
                    let input_rows = self.input_rows(&outputs, *input)?;
                    self.execute_project_rows(input_rows, exprs)?
                }
                Op::Aggregate {
                    input, keys, aggs, ..
                } => {
                    let input_rows = self.input_rows(&outputs, *input)?;
                    self.execute_aggregate_rows(input_rows, keys, aggs)?
                }
                Op::Sort { input, keys, dirs } => {
                    let input_rows = self.input_rows(&outputs, *input)?;
                    self.execute_sort_rows(input_rows, keys, dirs)?
                }
                Op::Limit { input, count, skip } => {
                    let input_rows = self.input_rows(&outputs, *input)?;
                    self.execute_limit_rows(input_rows, *count, *skip)
                }
                Op::Return { input } => self.input_rows(&outputs, *input)?.clone(),
                Op::Unwind {
                    input, list_expr, ..
                } => {
                    let input_rows = self.input_rows(&outputs, *input)?;
                    self.execute_unwind(input_rows, list_expr)?
                }
                Op::PathConstruct {
                    input, rel_cols, ..
                } => {
                    let input_rows = self.input_rows(&outputs, *input)?;
                    self.execute_path_construct(input_rows, rel_cols)?
                }
                Op::Union { lhs, rhs, all, .. } => {
                    let lhs_rows = self.input_rows(&outputs, *lhs)?;
                    let rhs_rows = self.input_rows(&outputs, *rhs)?;
                    self.execute_union_rows(lhs_rows, rhs_rows, *all)
                }
                Op::CreateNode {
                    input,
                    labels,
                    props,
                    ..
                } => {
                    let input_rows = self.input_rows(&outputs, *input)?.clone();
                    self.execute_create_node_rows(&input_rows, labels, props)?
                }
                Op::CreateRel {
                    input,
                    src_col,
                    dst_col,
                    rel_type,
                    props,
                    ..
                } => {
                    let input_rows = self.input_rows(&outputs, *input)?.clone();
                    self.execute_create_rel_rows(&input_rows, *src_col, *dst_col, rel_type, props)?
                }
                Op::Merge {
                    input,
                    pattern,
                    on_create_props,
                    on_match_props,
                    schema,
                    ..
                } => {
                    let input_rows = self.input_rows(&outputs, *input)?.clone();
                    self.execute_merge_rows(
                        &input_rows,
                        pattern,
                        on_create_props,
                        on_match_props,
                        schema,
                    )?
                }
                Op::Delete {
                    input,
                    target_col,
                    detach,
                    ..
                } => {
                    let input_rows = self.input_rows(&outputs, *input)?.clone();
                    self.execute_delete_rows(&input_rows, *target_col, *detach)?
                }
                Op::SetProperty {
                    input,
                    target_col,
                    key,
                    value_expr,
                    ..
                } => {
                    let input_rows = self.input_rows(&outputs, *input)?.clone();
                    self.execute_set_property_rows(&input_rows, *target_col, key, value_expr)?
                }
                Op::RemoveProperty {
                    input,
                    target_col,
                    key,
                    ..
                } => {
                    let input_rows = self.input_rows(&outputs, *input)?.clone();
                    self.execute_remove_property_rows(&input_rows, *target_col, key)?
                }
                Op::VectorScan { .. } => return Err(ExecutionError::UnsupportedOp("vector_scan")),
                Op::Rerank { .. } => return Err(ExecutionError::UnsupportedOp("rerank")),
                Op::ConstRow => vec![vec![]],
            };
            outputs[idx] = Some(rows);
        }

        let root_rows = outputs
            .get(plan.root_op as usize)
            .ok_or(ExecutionError::InvalidRootOp(plan.root_op))?
            .clone()
            .ok_or(ExecutionError::InvalidRootOp(plan.root_op))?;

        Ok(QueryResult { rows: root_rows })
    }
}