plexus-engine 0.3.4

Engine integration traits for consuming Plexus plans
Documentation
use crate::*;

impl InMemoryEngine {
    pub(super) fn eval_props_payload(
        &self,
        row: &Row,
        props: &Expr,
    ) -> Result<HashMap<String, Value>, ExecutionError> {
        match self.eval_expr(row, props)? {
            Value::Null => Ok(HashMap::new()),
            Value::Map(entries) => Ok(entries.into_iter().collect()),
            _ => Err(ExecutionError::ExpectedMapPayload),
        }
    }

    pub(super) fn execute_create_node_rows(
        &mut self,
        input_rows: &[Row],
        labels: &[String],
        props: &Expr,
    ) -> Result<RowSet, ExecutionError> {
        let mut out = Vec::with_capacity(input_rows.len());
        for row in input_rows {
            let node_props = self.eval_props_payload(row, props)?;
            let node_id = self.create_node(labels, node_props)?;
            let mut next = row.clone();
            next.push(Value::NodeRef(node_id));
            out.push(next);
        }
        Ok(out)
    }

    pub(super) fn execute_create_rel_rows(
        &mut self,
        input_rows: &[Row],
        src_col: i32,
        dst_col: i32,
        rel_type: &str,
        props: &Expr,
    ) -> Result<RowSet, ExecutionError> {
        let mut out = Vec::with_capacity(input_rows.len());
        for row in input_rows {
            let src_idx = src_col as usize;
            let src_id = match row.get(src_idx) {
                Some(Value::NodeRef(id)) => *id,
                Some(_) => return Err(ExecutionError::ExpectedNodeRef { idx: src_idx }),
                None => {
                    return Err(ExecutionError::ColumnOutOfBounds {
                        idx: src_idx,
                        len: row.len(),
                    });
                }
            };
            let dst_idx = dst_col as usize;
            let dst_id = match row.get(dst_idx) {
                Some(Value::NodeRef(id)) => *id,
                Some(_) => return Err(ExecutionError::ExpectedNodeRef { idx: dst_idx }),
                None => {
                    return Err(ExecutionError::ColumnOutOfBounds {
                        idx: dst_idx,
                        len: row.len(),
                    });
                }
            };
            let rel_props = self.eval_props_payload(row, props)?;
            let rel_id = self.create_rel(src_id, dst_id, rel_type, rel_props)?;
            let mut next = row.clone();
            next.push(Value::RelRef(rel_id));
            out.push(next);
        }
        Ok(out)
    }

    pub(super) fn execute_merge_rows(
        &mut self,
        input_rows: &[Row],
        pattern: &Expr,
        on_create_props: &Expr,
        on_match_props: &Expr,
        schema: &[plexus_serde::ColDef],
    ) -> Result<RowSet, ExecutionError> {
        let mut out = Vec::with_capacity(input_rows.len());
        for row in input_rows {
            self.merge_pattern(pattern, on_create_props, on_match_props, schema, row)?;
            out.push(row.clone());
        }
        Ok(out)
    }

    pub(super) fn execute_delete_rows(
        &mut self,
        input_rows: &[Row],
        target_col: i32,
        detach: bool,
    ) -> Result<RowSet, ExecutionError> {
        let mut out = Vec::with_capacity(input_rows.len());
        let idx = target_col as usize;
        for row in input_rows {
            let target = row.get(idx).ok_or(ExecutionError::ColumnOutOfBounds {
                idx,
                len: row.len(),
            })?;
            self.delete_entity(target, detach)?;
            out.push(row.clone());
        }
        Ok(out)
    }

    pub(super) fn execute_set_property_rows(
        &mut self,
        input_rows: &[Row],
        target_col: i32,
        key: &str,
        value_expr: &Expr,
    ) -> Result<RowSet, ExecutionError> {
        let mut out = Vec::with_capacity(input_rows.len());
        let idx = target_col as usize;
        for row in input_rows {
            let target = row.get(idx).ok_or(ExecutionError::ColumnOutOfBounds {
                idx,
                len: row.len(),
            })?;
            let value = self.eval_expr(row, value_expr)?;
            self.set_property(target, key, value)?;
            out.push(row.clone());
        }
        Ok(out)
    }

    pub(super) fn execute_remove_property_rows(
        &mut self,
        input_rows: &[Row],
        target_col: i32,
        key: &str,
    ) -> Result<RowSet, ExecutionError> {
        let mut out = Vec::with_capacity(input_rows.len());
        let idx = target_col as usize;
        for row in input_rows {
            let target = row.get(idx).ok_or(ExecutionError::ColumnOutOfBounds {
                idx,
                len: row.len(),
            })?;
            self.remove_property(target, key)?;
            out.push(row.clone());
        }
        Ok(out)
    }
}