plexus-engine 0.3.4

Engine integration traits for consuming Plexus plans
Documentation
use super::eval::{as_bool, cmp_ordering};
use crate::*;

impl InMemoryEngine {
    pub(crate) fn execute_scan_nodes(
        &self,
        labels: &[String],
        must_labels: &[String],
        forbidden_labels: &[String],
    ) -> RowSet {
        self.graph
            .nodes
            .iter()
            .filter(|n| {
                (labels.is_empty() || labels.iter().any(|l| n.labels.contains(l)))
                    && must_labels.iter().all(|l| n.labels.contains(l))
                    && forbidden_labels.iter().all(|l| !n.labels.contains(l))
            })
            .map(|n| vec![Value::NodeRef(n.id)])
            .collect()
    }

    pub(crate) fn execute_scan_rels(
        &self,
        types: &[String],
        src_labels: &[String],
        dst_labels: &[String],
    ) -> RowSet {
        self.graph
            .rels
            .iter()
            .filter(|r| {
                if !types.is_empty() && !types.iter().any(|t| t == &r.typ) {
                    return false;
                }
                let Some(src) = self.graph.node_by_id(r.src) else {
                    return false;
                };
                let Some(dst) = self.graph.node_by_id(r.dst) else {
                    return false;
                };
                (src_labels.is_empty() || src_labels.iter().any(|l| src.labels.contains(l)))
                    && (dst_labels.is_empty() || dst_labels.iter().any(|l| dst.labels.contains(l)))
            })
            .map(|r| vec![Value::RelRef(r.id)])
            .collect()
    }

    pub(crate) fn execute_unwind(
        &self,
        input: &[Row],
        list_expr: &Expr,
    ) -> Result<RowSet, ExecutionError> {
        let mut out = Vec::new();
        for row in input {
            let value = self.eval_expr(row, list_expr)?;
            match value {
                Value::List(items) => {
                    for item in items {
                        let mut next = row.clone();
                        next.push(item);
                        out.push(next);
                    }
                }
                Value::Null => {}
                scalar => {
                    let mut next = row.clone();
                    next.push(scalar);
                    out.push(next);
                }
            }
        }
        Ok(out)
    }

    pub(crate) fn execute_path_construct(
        &self,
        input: &[Row],
        rel_cols: &[u32],
    ) -> Result<RowSet, ExecutionError> {
        let mut out = Vec::with_capacity(input.len());
        for row in input {
            let mut path = Vec::with_capacity(rel_cols.len());
            for col in rel_cols {
                let v =
                    row.get(*col as usize)
                        .cloned()
                        .ok_or(ExecutionError::ColumnOutOfBounds {
                            idx: *col as usize,
                            len: row.len(),
                        })?;
                path.push(v);
            }
            let mut next = row.clone();
            next.push(Value::List(path));
            out.push(next);
        }
        Ok(out)
    }

    pub(crate) fn execute_filter_rows(
        &self,
        input_rows: &[Row],
        predicate: &Expr,
    ) -> Result<RowSet, ExecutionError> {
        let mut out = Vec::new();
        for row in input_rows {
            if as_bool(&self.eval_expr(row, predicate)?) {
                out.push(row.clone());
            }
        }
        Ok(out)
    }

    pub(crate) fn execute_project_rows(
        &self,
        input_rows: &[Row],
        exprs: &[Expr],
    ) -> Result<RowSet, ExecutionError> {
        let mut out = Vec::with_capacity(input_rows.len());
        for row in input_rows {
            let mut new_row = Vec::with_capacity(exprs.len());
            for e in exprs {
                new_row.push(self.eval_expr(row, e)?);
            }
            out.push(new_row);
        }
        Ok(out)
    }

    pub(crate) fn execute_aggregate_rows(
        &self,
        input_rows: &[Row],
        keys: &[u32],
        aggs: &[Expr],
    ) -> Result<RowSet, ExecutionError> {
        let mut groups: Vec<(Vec<Value>, Vec<Row>)> = Vec::new();
        for row in input_rows {
            let key_vals: Vec<Value> = keys
                .iter()
                .map(|k| {
                    row.get(*k as usize)
                        .cloned()
                        .ok_or(ExecutionError::ColumnOutOfBounds {
                            idx: *k as usize,
                            len: row.len(),
                        })
                })
                .collect::<Result<Vec<_>, _>>()?;
            if let Some((_, g_rows)) = groups.iter_mut().find(|(k, _)| *k == key_vals) {
                g_rows.push(row.clone());
            } else {
                groups.push((key_vals, vec![row.clone()]));
            }
        }

        let mut out = Vec::new();
        for (key_vals, g_rows) in groups {
            let mut out_row = key_vals;
            for a in aggs {
                out_row.push(self.eval_agg(&g_rows, a)?);
            }
            out.push(out_row);
        }
        Ok(out)
    }

    pub(crate) fn execute_sort_rows(
        &self,
        input_rows: &[Row],
        keys: &[u32],
        dirs: &[SortDir],
    ) -> Result<RowSet, ExecutionError> {
        if keys.len() != dirs.len() {
            return Err(ExecutionError::SortArityMismatch {
                keys: keys.len(),
                dirs: dirs.len(),
            });
        }
        let mut out = input_rows.to_vec();
        out.sort_by(|a, b| {
            for (k, dir) in keys.iter().zip(dirs) {
                let av = a.get(*k as usize).unwrap_or(&Value::Null);
                let bv = b.get(*k as usize).unwrap_or(&Value::Null);
                let ord = cmp_ordering(av, bv).unwrap_or(Ordering::Equal);
                if ord != Ordering::Equal {
                    return match dir {
                        SortDir::Asc => ord,
                        SortDir::Desc => ord.reverse(),
                    };
                }
            }
            Ordering::Equal
        });
        Ok(out)
    }

    pub(crate) fn execute_limit_rows(&self, input_rows: &[Row], count: i64, skip: i64) -> RowSet {
        let start = skip.max(0) as usize;
        let iter = input_rows.iter().skip(start).cloned();
        if count >= 0 {
            iter.take(count as usize).collect()
        } else {
            iter.collect()
        }
    }

    pub(crate) fn execute_union_rows(
        &self,
        lhs_rows: &[Row],
        rhs_rows: &[Row],
        all: bool,
    ) -> RowSet {
        if all {
            let mut out = lhs_rows.to_vec();
            out.extend(rhs_rows.to_vec());
            return out;
        }
        let mut out = lhs_rows.to_vec();
        for row in rhs_rows {
            if !out.contains(row) {
                out.push(row.clone());
            }
        }
        out
    }
}