use super::eval::{as_bool, cmp_ordering};
use crate::*;
impl InMemoryEngine {
pub(crate) fn execute_scan_nodes(
&self,
labels: &[String],
must_labels: &[String],
forbidden_labels: &[String],
) -> RowSet {
self.graph
.nodes
.iter()
.filter(|n| {
(labels.is_empty() || labels.iter().any(|l| n.labels.contains(l)))
&& must_labels.iter().all(|l| n.labels.contains(l))
&& forbidden_labels.iter().all(|l| !n.labels.contains(l))
})
.map(|n| vec![Value::NodeRef(n.id)])
.collect()
}
pub(crate) fn execute_scan_rels(
&self,
types: &[String],
src_labels: &[String],
dst_labels: &[String],
) -> RowSet {
self.graph
.rels
.iter()
.filter(|r| {
if !types.is_empty() && !types.iter().any(|t| t == &r.typ) {
return false;
}
let Some(src) = self.graph.node_by_id(r.src) else {
return false;
};
let Some(dst) = self.graph.node_by_id(r.dst) else {
return false;
};
(src_labels.is_empty() || src_labels.iter().any(|l| src.labels.contains(l)))
&& (dst_labels.is_empty() || dst_labels.iter().any(|l| dst.labels.contains(l)))
})
.map(|r| vec![Value::RelRef(r.id)])
.collect()
}
pub(crate) fn execute_unwind(
&self,
input: &[Row],
list_expr: &Expr,
) -> Result<RowSet, ExecutionError> {
let mut out = Vec::new();
for row in input {
let value = self.eval_expr(row, list_expr)?;
match value {
Value::List(items) => {
for item in items {
let mut next = row.clone();
next.push(item);
out.push(next);
}
}
Value::Null => {}
scalar => {
let mut next = row.clone();
next.push(scalar);
out.push(next);
}
}
}
Ok(out)
}
pub(crate) fn execute_path_construct(
&self,
input: &[Row],
rel_cols: &[u32],
) -> Result<RowSet, ExecutionError> {
let mut out = Vec::with_capacity(input.len());
for row in input {
let mut path = Vec::with_capacity(rel_cols.len());
for col in rel_cols {
let v =
row.get(*col as usize)
.cloned()
.ok_or(ExecutionError::ColumnOutOfBounds {
idx: *col as usize,
len: row.len(),
})?;
path.push(v);
}
let mut next = row.clone();
next.push(Value::List(path));
out.push(next);
}
Ok(out)
}
pub(crate) fn execute_filter_rows(
&self,
input_rows: &[Row],
predicate: &Expr,
) -> Result<RowSet, ExecutionError> {
let mut out = Vec::new();
for row in input_rows {
if as_bool(&self.eval_expr(row, predicate)?) {
out.push(row.clone());
}
}
Ok(out)
}
pub(crate) fn execute_project_rows(
&self,
input_rows: &[Row],
exprs: &[Expr],
) -> Result<RowSet, ExecutionError> {
let mut out = Vec::with_capacity(input_rows.len());
for row in input_rows {
let mut new_row = Vec::with_capacity(exprs.len());
for e in exprs {
new_row.push(self.eval_expr(row, e)?);
}
out.push(new_row);
}
Ok(out)
}
pub(crate) fn execute_aggregate_rows(
&self,
input_rows: &[Row],
keys: &[u32],
aggs: &[Expr],
) -> Result<RowSet, ExecutionError> {
let mut groups: Vec<(Vec<Value>, Vec<Row>)> = Vec::new();
for row in input_rows {
let key_vals: Vec<Value> = keys
.iter()
.map(|k| {
row.get(*k as usize)
.cloned()
.ok_or(ExecutionError::ColumnOutOfBounds {
idx: *k as usize,
len: row.len(),
})
})
.collect::<Result<Vec<_>, _>>()?;
if let Some((_, g_rows)) = groups.iter_mut().find(|(k, _)| *k == key_vals) {
g_rows.push(row.clone());
} else {
groups.push((key_vals, vec![row.clone()]));
}
}
let mut out = Vec::new();
for (key_vals, g_rows) in groups {
let mut out_row = key_vals;
for a in aggs {
out_row.push(self.eval_agg(&g_rows, a)?);
}
out.push(out_row);
}
Ok(out)
}
pub(crate) fn execute_sort_rows(
&self,
input_rows: &[Row],
keys: &[u32],
dirs: &[SortDir],
) -> Result<RowSet, ExecutionError> {
if keys.len() != dirs.len() {
return Err(ExecutionError::SortArityMismatch {
keys: keys.len(),
dirs: dirs.len(),
});
}
let mut out = input_rows.to_vec();
out.sort_by(|a, b| {
for (k, dir) in keys.iter().zip(dirs) {
let av = a.get(*k as usize).unwrap_or(&Value::Null);
let bv = b.get(*k as usize).unwrap_or(&Value::Null);
let ord = cmp_ordering(av, bv).unwrap_or(Ordering::Equal);
if ord != Ordering::Equal {
return match dir {
SortDir::Asc => ord,
SortDir::Desc => ord.reverse(),
};
}
}
Ordering::Equal
});
Ok(out)
}
pub(crate) fn execute_limit_rows(&self, input_rows: &[Row], count: i64, skip: i64) -> RowSet {
let start = skip.max(0) as usize;
let iter = input_rows.iter().skip(start).cloned();
if count >= 0 {
iter.take(count as usize).collect()
} else {
iter.collect()
}
}
pub(crate) fn execute_union_rows(
&self,
lhs_rows: &[Row],
rhs_rows: &[Row],
all: bool,
) -> RowSet {
if all {
let mut out = lhs_rows.to_vec();
out.extend(rhs_rows.to_vec());
return out;
}
let mut out = lhs_rows.to_vec();
for row in rhs_rows {
if !out.contains(row) {
out.push(row.clone());
}
}
out
}
}