Skip to main content

plexus_engine/engine/
execute.rs

1use super::merge;
2use crate::*;
3
4impl InMemoryEngine {
5    fn input_rows<'a>(
6        &self,
7        outputs: &'a [Option<RowSet>],
8        input: u32,
9    ) -> Result<&'a RowSet, ExecutionError> {
10        outputs
11            .get(input as usize)
12            .ok_or(ExecutionError::InvalidOpRef(input))?
13            .as_ref()
14            .ok_or(ExecutionError::MissingOpOutput(input))
15    }
16}
17
18impl MutationEngine for InMemoryEngine {
19    type Error = ExecutionError;
20
21    fn create_node(
22        &mut self,
23        labels: &[String],
24        props: HashMap<String, Value>,
25    ) -> Result<u64, Self::Error> {
26        let next_id = self.graph.nodes.iter().map(|n| n.id).max().unwrap_or(0) + 1;
27        self.graph.nodes.push(Node {
28            id: next_id,
29            labels: labels.iter().cloned().collect(),
30            props,
31        });
32        Ok(next_id)
33    }
34
35    fn create_rel(
36        &mut self,
37        src: u64,
38        dst: u64,
39        rel_type: &str,
40        props: HashMap<String, Value>,
41    ) -> Result<u64, Self::Error> {
42        if self.graph.node_by_id(src).is_none() {
43            return Err(ExecutionError::UnknownNode(src));
44        }
45        if self.graph.node_by_id(dst).is_none() {
46            return Err(ExecutionError::UnknownNode(dst));
47        }
48        let next_id = self.graph.rels.iter().map(|r| r.id).max().unwrap_or(0) + 1;
49        self.graph.rels.push(Relationship {
50            id: next_id,
51            src,
52            dst,
53            typ: rel_type.to_string(),
54            props,
55        });
56        Ok(next_id)
57    }
58
59    fn merge_pattern(
60        &mut self,
61        pattern: &Expr,
62        on_create_props: &Expr,
63        on_match_props: &Expr,
64        schema: &[plexus_serde::ColDef],
65        row: &Row,
66    ) -> Result<(), Self::Error> {
67        merge::execute_merge_pattern(self, pattern, on_create_props, on_match_props, schema, row)
68    }
69
70    fn delete_entity(&mut self, target: &Value, detach: bool) -> Result<(), Self::Error> {
71        match target {
72            Value::NodeRef(node_id) => {
73                if self.graph.node_by_id(*node_id).is_none() {
74                    return Err(ExecutionError::UnknownNode(*node_id));
75                }
76                let has_incident = self
77                    .graph
78                    .rels
79                    .iter()
80                    .any(|r| r.src == *node_id || r.dst == *node_id);
81                if has_incident && !detach {
82                    return Err(ExecutionError::DeleteRequiresDetach(*node_id));
83                }
84                if detach {
85                    self.graph
86                        .rels
87                        .retain(|r| r.src != *node_id && r.dst != *node_id);
88                }
89                self.graph.nodes.retain(|n| n.id != *node_id);
90                Ok(())
91            }
92            Value::RelRef(rel_id) => {
93                if self.graph.rel_by_id(*rel_id).is_none() {
94                    return Err(ExecutionError::UnknownRel(*rel_id));
95                }
96                self.graph.rels.retain(|r| r.id != *rel_id);
97                Ok(())
98            }
99            Value::Null => Ok(()),
100            _ => Err(ExecutionError::ExpectedEntityRef { idx: 0 }),
101        }
102    }
103
104    fn set_property(&mut self, target: &Value, key: &str, value: Value) -> Result<(), Self::Error> {
105        match target {
106            Value::NodeRef(node_id) => {
107                let node = self
108                    .graph
109                    .node_by_id_mut(*node_id)
110                    .ok_or(ExecutionError::UnknownNode(*node_id))?;
111                node.props.insert(key.to_string(), value);
112                Ok(())
113            }
114            Value::RelRef(rel_id) => {
115                let rel = self
116                    .graph
117                    .rel_by_id_mut(*rel_id)
118                    .ok_or(ExecutionError::UnknownRel(*rel_id))?;
119                rel.props.insert(key.to_string(), value);
120                Ok(())
121            }
122            Value::Null => Ok(()),
123            _ => Err(ExecutionError::ExpectedEntityRef { idx: 0 }),
124        }
125    }
126
127    fn remove_property(&mut self, target: &Value, key: &str) -> Result<(), Self::Error> {
128        match target {
129            Value::NodeRef(node_id) => {
130                let node = self
131                    .graph
132                    .node_by_id_mut(*node_id)
133                    .ok_or(ExecutionError::UnknownNode(*node_id))?;
134                node.props.remove(key);
135                Ok(())
136            }
137            Value::RelRef(rel_id) => {
138                let rel = self
139                    .graph
140                    .rel_by_id_mut(*rel_id)
141                    .ok_or(ExecutionError::UnknownRel(*rel_id))?;
142                rel.props.remove(key);
143                Ok(())
144            }
145            Value::Null => Ok(()),
146            _ => Err(ExecutionError::ExpectedEntityRef { idx: 0 }),
147        }
148    }
149}
150
151impl PlanEngine for InMemoryEngine {
152    type Error = ExecutionError;
153
154    fn execute_plan(&mut self, plan: &Plan) -> Result<QueryResult, Self::Error> {
155        // The reference engine does not support multi-graph queries.
156        // Reject plans that reference more than one distinct graph_ref.
157        let mut seen_ref: Option<&str> = None;
158        for op in &plan.ops {
159            let graph_ref = match op {
160                Op::ScanNodes { graph_ref, .. }
161                | Op::Expand { graph_ref, .. }
162                | Op::OptionalExpand { graph_ref, .. }
163                | Op::ExpandVarLen { graph_ref, .. } => graph_ref.as_deref(),
164                _ => None,
165            };
166            if let Some(r) = graph_ref.map(str::trim).filter(|s| !s.is_empty()) {
167                match seen_ref {
168                    None => seen_ref = Some(r),
169                    Some(prev) if prev != r => {
170                        return Err(ExecutionError::MultiGraphUnsupported);
171                    }
172                    _ => {}
173                }
174            }
175        }
176
177        let mut outputs: Vec<Option<RowSet>> = vec![None; plan.ops.len()];
178
179        for (idx, op) in plan.ops.iter().enumerate() {
180            let rows = match op {
181                Op::ScanNodes {
182                    labels,
183                    must_labels,
184                    forbidden_labels,
185                    ..
186                } => self.execute_scan_nodes(labels, must_labels, forbidden_labels),
187                Op::ScanRels {
188                    types,
189                    src_labels,
190                    dst_labels,
191                    ..
192                } => self.execute_scan_rels(types, src_labels, dst_labels),
193                Op::Expand {
194                    input,
195                    src_col,
196                    types,
197                    dir,
198                    legal_src_labels,
199                    legal_dst_labels,
200                    ..
201                } => {
202                    let input_rows = self.input_rows(&outputs, *input)?;
203                    self.execute_expand(
204                        input_rows,
205                        *src_col,
206                        types,
207                        *dir,
208                        legal_src_labels,
209                        legal_dst_labels,
210                    )?
211                }
212                Op::OptionalExpand {
213                    input,
214                    src_col,
215                    types,
216                    dir,
217                    legal_src_labels,
218                    legal_dst_labels,
219                    ..
220                } => {
221                    let input_rows = self.input_rows(&outputs, *input)?;
222                    self.execute_optional_expand(
223                        input_rows,
224                        *src_col,
225                        types,
226                        *dir,
227                        legal_src_labels,
228                        legal_dst_labels,
229                    )?
230                }
231                Op::SemiExpand {
232                    input,
233                    src_col,
234                    types,
235                    dir,
236                    legal_src_labels,
237                    legal_dst_labels,
238                    ..
239                } => {
240                    let input_rows = self.input_rows(&outputs, *input)?;
241                    self.execute_semi_expand(
242                        input_rows,
243                        *src_col,
244                        types,
245                        *dir,
246                        legal_src_labels,
247                        legal_dst_labels,
248                    )?
249                }
250                Op::ExpandVarLen {
251                    input,
252                    src_col,
253                    types,
254                    dir,
255                    min_hops,
256                    max_hops,
257                    ..
258                } => {
259                    let input_rows = self.input_rows(&outputs, *input)?;
260                    self.execute_expand_var_len(
261                        input_rows, *src_col, types, *dir, *min_hops, *max_hops,
262                    )?
263                }
264                Op::Filter { input, predicate } => {
265                    let input_rows = self.input_rows(&outputs, *input)?;
266                    self.execute_filter_rows(input_rows, predicate)?
267                }
268                Op::BlockMarker { input, .. } => self.input_rows(&outputs, *input)?.clone(),
269                Op::Project { input, exprs, .. } => {
270                    let input_rows = self.input_rows(&outputs, *input)?;
271                    self.execute_project_rows(input_rows, exprs)?
272                }
273                Op::Aggregate {
274                    input, keys, aggs, ..
275                } => {
276                    let input_rows = self.input_rows(&outputs, *input)?;
277                    self.execute_aggregate_rows(input_rows, keys, aggs)?
278                }
279                Op::Sort { input, keys, dirs } => {
280                    let input_rows = self.input_rows(&outputs, *input)?;
281                    self.execute_sort_rows(input_rows, keys, dirs)?
282                }
283                Op::Limit { input, count, skip, .. } => {
284                    let input_rows = self.input_rows(&outputs, *input)?;
285                    self.execute_limit_rows(input_rows, *count, *skip)
286                }
287                Op::Return { input } => self.input_rows(&outputs, *input)?.clone(),
288                Op::Unwind {
289                    input, list_expr, ..
290                } => {
291                    let input_rows = self.input_rows(&outputs, *input)?;
292                    self.execute_unwind(input_rows, list_expr)?
293                }
294                Op::PathConstruct {
295                    input, rel_cols, ..
296                } => {
297                    let input_rows = self.input_rows(&outputs, *input)?;
298                    self.execute_path_construct(input_rows, rel_cols)?
299                }
300                Op::Union { lhs, rhs, all, .. } => {
301                    let lhs_rows = self.input_rows(&outputs, *lhs)?;
302                    let rhs_rows = self.input_rows(&outputs, *rhs)?;
303                    self.execute_union_rows(lhs_rows, rhs_rows, *all)
304                }
305                Op::CreateNode {
306                    input,
307                    labels,
308                    props,
309                    ..
310                } => {
311                    let input_rows = self.input_rows(&outputs, *input)?.clone();
312                    self.execute_create_node_rows(&input_rows, labels, props)?
313                }
314                Op::CreateRel {
315                    input,
316                    src_col,
317                    dst_col,
318                    rel_type,
319                    props,
320                    ..
321                } => {
322                    let input_rows = self.input_rows(&outputs, *input)?.clone();
323                    self.execute_create_rel_rows(&input_rows, *src_col, *dst_col, rel_type, props)?
324                }
325                Op::Merge {
326                    input,
327                    pattern,
328                    on_create_props,
329                    on_match_props,
330                    schema,
331                    ..
332                } => {
333                    let input_rows = self.input_rows(&outputs, *input)?.clone();
334                    self.execute_merge_rows(
335                        &input_rows,
336                        pattern,
337                        on_create_props,
338                        on_match_props,
339                        schema,
340                    )?
341                }
342                Op::Delete {
343                    input,
344                    target_col,
345                    detach,
346                    ..
347                } => {
348                    let input_rows = self.input_rows(&outputs, *input)?.clone();
349                    self.execute_delete_rows(&input_rows, *target_col, *detach)?
350                }
351                Op::SetProperty {
352                    input,
353                    target_col,
354                    key,
355                    value_expr,
356                    ..
357                } => {
358                    let input_rows = self.input_rows(&outputs, *input)?.clone();
359                    self.execute_set_property_rows(&input_rows, *target_col, key, value_expr)?
360                }
361                Op::RemoveProperty {
362                    input,
363                    target_col,
364                    key,
365                    ..
366                } => {
367                    let input_rows = self.input_rows(&outputs, *input)?.clone();
368                    self.execute_remove_property_rows(&input_rows, *target_col, key)?
369                }
370                Op::VectorScan { .. } => return Err(ExecutionError::UnsupportedOp("vector_scan")),
371                Op::Rerank { .. } => return Err(ExecutionError::UnsupportedOp("rerank")),
372                Op::ConstRow => vec![vec![]],
373            };
374            outputs[idx] = Some(rows);
375        }
376
377        let root_rows = outputs
378            .get(plan.root_op as usize)
379            .ok_or(ExecutionError::InvalidRootOp(plan.root_op))?
380            .clone()
381            .ok_or(ExecutionError::InvalidRootOp(plan.root_op))?;
382
383        Ok(QueryResult {
384            rows: root_rows,
385            continuation: None,
386        })
387    }
388}