use super::merge;
use crate::*;
impl InMemoryEngine {
fn input_rows<'a>(
&self,
outputs: &'a [Option<RowSet>],
input: u32,
) -> Result<&'a RowSet, ExecutionError> {
outputs
.get(input as usize)
.ok_or(ExecutionError::InvalidOpRef(input))?
.as_ref()
.ok_or(ExecutionError::MissingOpOutput(input))
}
}
impl MutationEngine for InMemoryEngine {
type Error = ExecutionError;
fn create_node(
&mut self,
labels: &[String],
props: HashMap<String, Value>,
) -> Result<u64, Self::Error> {
let next_id = self.graph.nodes.iter().map(|n| n.id).max().unwrap_or(0) + 1;
self.graph.nodes.push(Node {
id: next_id,
labels: labels.iter().cloned().collect(),
props,
});
Ok(next_id)
}
fn create_rel(
&mut self,
src: u64,
dst: u64,
rel_type: &str,
props: HashMap<String, Value>,
) -> Result<u64, Self::Error> {
if self.graph.node_by_id(src).is_none() {
return Err(ExecutionError::UnknownNode(src));
}
if self.graph.node_by_id(dst).is_none() {
return Err(ExecutionError::UnknownNode(dst));
}
let next_id = self.graph.rels.iter().map(|r| r.id).max().unwrap_or(0) + 1;
self.graph.rels.push(Relationship {
id: next_id,
src,
dst,
typ: rel_type.to_string(),
props,
});
Ok(next_id)
}
fn merge_pattern(
&mut self,
pattern: &Expr,
on_create_props: &Expr,
on_match_props: &Expr,
schema: &[plexus_serde::ColDef],
row: &Row,
) -> Result<(), Self::Error> {
merge::execute_merge_pattern(self, pattern, on_create_props, on_match_props, schema, row)
}
fn delete_entity(&mut self, target: &Value, detach: bool) -> Result<(), Self::Error> {
match target {
Value::NodeRef(node_id) => {
if self.graph.node_by_id(*node_id).is_none() {
return Err(ExecutionError::UnknownNode(*node_id));
}
let has_incident = self
.graph
.rels
.iter()
.any(|r| r.src == *node_id || r.dst == *node_id);
if has_incident && !detach {
return Err(ExecutionError::DeleteRequiresDetach(*node_id));
}
if detach {
self.graph
.rels
.retain(|r| r.src != *node_id && r.dst != *node_id);
}
self.graph.nodes.retain(|n| n.id != *node_id);
Ok(())
}
Value::RelRef(rel_id) => {
if self.graph.rel_by_id(*rel_id).is_none() {
return Err(ExecutionError::UnknownRel(*rel_id));
}
self.graph.rels.retain(|r| r.id != *rel_id);
Ok(())
}
Value::Null => Ok(()),
_ => Err(ExecutionError::ExpectedEntityRef { idx: 0 }),
}
}
fn set_property(&mut self, target: &Value, key: &str, value: Value) -> Result<(), Self::Error> {
match target {
Value::NodeRef(node_id) => {
let node = self
.graph
.node_by_id_mut(*node_id)
.ok_or(ExecutionError::UnknownNode(*node_id))?;
node.props.insert(key.to_string(), value);
Ok(())
}
Value::RelRef(rel_id) => {
let rel = self
.graph
.rel_by_id_mut(*rel_id)
.ok_or(ExecutionError::UnknownRel(*rel_id))?;
rel.props.insert(key.to_string(), value);
Ok(())
}
Value::Null => Ok(()),
_ => Err(ExecutionError::ExpectedEntityRef { idx: 0 }),
}
}
fn remove_property(&mut self, target: &Value, key: &str) -> Result<(), Self::Error> {
match target {
Value::NodeRef(node_id) => {
let node = self
.graph
.node_by_id_mut(*node_id)
.ok_or(ExecutionError::UnknownNode(*node_id))?;
node.props.remove(key);
Ok(())
}
Value::RelRef(rel_id) => {
let rel = self
.graph
.rel_by_id_mut(*rel_id)
.ok_or(ExecutionError::UnknownRel(*rel_id))?;
rel.props.remove(key);
Ok(())
}
Value::Null => Ok(()),
_ => Err(ExecutionError::ExpectedEntityRef { idx: 0 }),
}
}
}
impl PlanEngine for InMemoryEngine {
type Error = ExecutionError;
fn execute_plan(&mut self, plan: &Plan) -> Result<QueryResult, Self::Error> {
let mut seen_ref: Option<&str> = None;
for op in &plan.ops {
let graph_ref = match op {
Op::ScanNodes { graph_ref, .. }
| Op::Expand { graph_ref, .. }
| Op::OptionalExpand { graph_ref, .. }
| Op::ExpandVarLen { graph_ref, .. } => graph_ref.as_deref(),
_ => None,
};
if let Some(r) = graph_ref.map(str::trim).filter(|s| !s.is_empty()) {
match seen_ref {
None => seen_ref = Some(r),
Some(prev) if prev != r => {
return Err(ExecutionError::MultiGraphUnsupported);
}
_ => {}
}
}
}
let mut outputs: Vec<Option<RowSet>> = vec![None; plan.ops.len()];
for (idx, op) in plan.ops.iter().enumerate() {
let rows = match op {
Op::ScanNodes {
labels,
must_labels,
forbidden_labels,
..
} => self.execute_scan_nodes(labels, must_labels, forbidden_labels),
Op::ScanRels {
types,
src_labels,
dst_labels,
..
} => self.execute_scan_rels(types, src_labels, dst_labels),
Op::Expand {
input,
src_col,
types,
dir,
legal_src_labels,
legal_dst_labels,
..
} => {
let input_rows = self.input_rows(&outputs, *input)?;
self.execute_expand(
input_rows,
*src_col,
types,
*dir,
legal_src_labels,
legal_dst_labels,
)?
}
Op::OptionalExpand {
input,
src_col,
types,
dir,
legal_src_labels,
legal_dst_labels,
..
} => {
let input_rows = self.input_rows(&outputs, *input)?;
self.execute_optional_expand(
input_rows,
*src_col,
types,
*dir,
legal_src_labels,
legal_dst_labels,
)?
}
Op::SemiExpand {
input,
src_col,
types,
dir,
legal_src_labels,
legal_dst_labels,
..
} => {
let input_rows = self.input_rows(&outputs, *input)?;
self.execute_semi_expand(
input_rows,
*src_col,
types,
*dir,
legal_src_labels,
legal_dst_labels,
)?
}
Op::ExpandVarLen {
input,
src_col,
types,
dir,
min_hops,
max_hops,
..
} => {
let input_rows = self.input_rows(&outputs, *input)?;
self.execute_expand_var_len(
input_rows, *src_col, types, *dir, *min_hops, *max_hops,
)?
}
Op::Filter { input, predicate } => {
let input_rows = self.input_rows(&outputs, *input)?;
self.execute_filter_rows(input_rows, predicate)?
}
Op::BlockMarker { input, .. } => self.input_rows(&outputs, *input)?.clone(),
Op::Project { input, exprs, .. } => {
let input_rows = self.input_rows(&outputs, *input)?;
self.execute_project_rows(input_rows, exprs)?
}
Op::Aggregate {
input, keys, aggs, ..
} => {
let input_rows = self.input_rows(&outputs, *input)?;
self.execute_aggregate_rows(input_rows, keys, aggs)?
}
Op::Sort { input, keys, dirs } => {
let input_rows = self.input_rows(&outputs, *input)?;
self.execute_sort_rows(input_rows, keys, dirs)?
}
Op::Limit { input, count, skip } => {
let input_rows = self.input_rows(&outputs, *input)?;
self.execute_limit_rows(input_rows, *count, *skip)
}
Op::Return { input } => self.input_rows(&outputs, *input)?.clone(),
Op::Unwind {
input, list_expr, ..
} => {
let input_rows = self.input_rows(&outputs, *input)?;
self.execute_unwind(input_rows, list_expr)?
}
Op::PathConstruct {
input, rel_cols, ..
} => {
let input_rows = self.input_rows(&outputs, *input)?;
self.execute_path_construct(input_rows, rel_cols)?
}
Op::Union { lhs, rhs, all, .. } => {
let lhs_rows = self.input_rows(&outputs, *lhs)?;
let rhs_rows = self.input_rows(&outputs, *rhs)?;
self.execute_union_rows(lhs_rows, rhs_rows, *all)
}
Op::CreateNode {
input,
labels,
props,
..
} => {
let input_rows = self.input_rows(&outputs, *input)?.clone();
self.execute_create_node_rows(&input_rows, labels, props)?
}
Op::CreateRel {
input,
src_col,
dst_col,
rel_type,
props,
..
} => {
let input_rows = self.input_rows(&outputs, *input)?.clone();
self.execute_create_rel_rows(&input_rows, *src_col, *dst_col, rel_type, props)?
}
Op::Merge {
input,
pattern,
on_create_props,
on_match_props,
schema,
..
} => {
let input_rows = self.input_rows(&outputs, *input)?.clone();
self.execute_merge_rows(
&input_rows,
pattern,
on_create_props,
on_match_props,
schema,
)?
}
Op::Delete {
input,
target_col,
detach,
..
} => {
let input_rows = self.input_rows(&outputs, *input)?.clone();
self.execute_delete_rows(&input_rows, *target_col, *detach)?
}
Op::SetProperty {
input,
target_col,
key,
value_expr,
..
} => {
let input_rows = self.input_rows(&outputs, *input)?.clone();
self.execute_set_property_rows(&input_rows, *target_col, key, value_expr)?
}
Op::RemoveProperty {
input,
target_col,
key,
..
} => {
let input_rows = self.input_rows(&outputs, *input)?.clone();
self.execute_remove_property_rows(&input_rows, *target_col, key)?
}
Op::VectorScan { .. } => return Err(ExecutionError::UnsupportedOp("vector_scan")),
Op::Rerank { .. } => return Err(ExecutionError::UnsupportedOp("rerank")),
Op::ConstRow => vec![vec![]],
};
outputs[idx] = Some(rows);
}
let root_rows = outputs
.get(plan.root_op as usize)
.ok_or(ExecutionError::InvalidRootOp(plan.root_op))?
.clone()
.ok_or(ExecutionError::InvalidRootOp(plan.root_op))?;
Ok(QueryResult { rows: root_rows })
}
}