1use super::merge;
2use crate::*;
3
4impl InMemoryEngine {
5 fn input_rows<'a>(
6 &self,
7 outputs: &'a [Option<RowSet>],
8 input: u32,
9 ) -> Result<&'a RowSet, ExecutionError> {
10 outputs
11 .get(input as usize)
12 .ok_or(ExecutionError::InvalidOpRef(input))?
13 .as_ref()
14 .ok_or(ExecutionError::MissingOpOutput(input))
15 }
16}
17
18impl MutationEngine for InMemoryEngine {
19 type Error = ExecutionError;
20
21 fn create_node(
22 &mut self,
23 labels: &[String],
24 props: HashMap<String, Value>,
25 ) -> Result<u64, Self::Error> {
26 let next_id = self.graph.nodes.iter().map(|n| n.id).max().unwrap_or(0) + 1;
27 self.graph.nodes.push(Node {
28 id: next_id,
29 labels: labels.iter().cloned().collect(),
30 props,
31 });
32 Ok(next_id)
33 }
34
35 fn create_rel(
36 &mut self,
37 src: u64,
38 dst: u64,
39 rel_type: &str,
40 props: HashMap<String, Value>,
41 ) -> Result<u64, Self::Error> {
42 if self.graph.node_by_id(src).is_none() {
43 return Err(ExecutionError::UnknownNode(src));
44 }
45 if self.graph.node_by_id(dst).is_none() {
46 return Err(ExecutionError::UnknownNode(dst));
47 }
48 let next_id = self.graph.rels.iter().map(|r| r.id).max().unwrap_or(0) + 1;
49 self.graph.rels.push(Relationship {
50 id: next_id,
51 src,
52 dst,
53 typ: rel_type.to_string(),
54 props,
55 });
56 Ok(next_id)
57 }
58
59 fn merge_pattern(
60 &mut self,
61 pattern: &Expr,
62 on_create_props: &Expr,
63 on_match_props: &Expr,
64 schema: &[plexus_serde::ColDef],
65 row: &Row,
66 ) -> Result<(), Self::Error> {
67 merge::execute_merge_pattern(self, pattern, on_create_props, on_match_props, schema, row)
68 }
69
70 fn delete_entity(&mut self, target: &Value, detach: bool) -> Result<(), Self::Error> {
71 match target {
72 Value::NodeRef(node_id) => {
73 if self.graph.node_by_id(*node_id).is_none() {
74 return Err(ExecutionError::UnknownNode(*node_id));
75 }
76 let has_incident = self
77 .graph
78 .rels
79 .iter()
80 .any(|r| r.src == *node_id || r.dst == *node_id);
81 if has_incident && !detach {
82 return Err(ExecutionError::DeleteRequiresDetach(*node_id));
83 }
84 if detach {
85 self.graph
86 .rels
87 .retain(|r| r.src != *node_id && r.dst != *node_id);
88 }
89 self.graph.nodes.retain(|n| n.id != *node_id);
90 Ok(())
91 }
92 Value::RelRef(rel_id) => {
93 if self.graph.rel_by_id(*rel_id).is_none() {
94 return Err(ExecutionError::UnknownRel(*rel_id));
95 }
96 self.graph.rels.retain(|r| r.id != *rel_id);
97 Ok(())
98 }
99 Value::Null => Ok(()),
100 _ => Err(ExecutionError::ExpectedEntityRef { idx: 0 }),
101 }
102 }
103
104 fn set_property(&mut self, target: &Value, key: &str, value: Value) -> Result<(), Self::Error> {
105 match target {
106 Value::NodeRef(node_id) => {
107 let node = self
108 .graph
109 .node_by_id_mut(*node_id)
110 .ok_or(ExecutionError::UnknownNode(*node_id))?;
111 node.props.insert(key.to_string(), value);
112 Ok(())
113 }
114 Value::RelRef(rel_id) => {
115 let rel = self
116 .graph
117 .rel_by_id_mut(*rel_id)
118 .ok_or(ExecutionError::UnknownRel(*rel_id))?;
119 rel.props.insert(key.to_string(), value);
120 Ok(())
121 }
122 Value::Null => Ok(()),
123 _ => Err(ExecutionError::ExpectedEntityRef { idx: 0 }),
124 }
125 }
126
127 fn remove_property(&mut self, target: &Value, key: &str) -> Result<(), Self::Error> {
128 match target {
129 Value::NodeRef(node_id) => {
130 let node = self
131 .graph
132 .node_by_id_mut(*node_id)
133 .ok_or(ExecutionError::UnknownNode(*node_id))?;
134 node.props.remove(key);
135 Ok(())
136 }
137 Value::RelRef(rel_id) => {
138 let rel = self
139 .graph
140 .rel_by_id_mut(*rel_id)
141 .ok_or(ExecutionError::UnknownRel(*rel_id))?;
142 rel.props.remove(key);
143 Ok(())
144 }
145 Value::Null => Ok(()),
146 _ => Err(ExecutionError::ExpectedEntityRef { idx: 0 }),
147 }
148 }
149}
150
151impl PlanEngine for InMemoryEngine {
152 type Error = ExecutionError;
153
154 fn execute_plan(&mut self, plan: &Plan) -> Result<QueryResult, Self::Error> {
155 let mut seen_ref: Option<&str> = None;
158 for op in &plan.ops {
159 let graph_ref = match op {
160 Op::ScanNodes { graph_ref, .. }
161 | Op::Expand { graph_ref, .. }
162 | Op::OptionalExpand { graph_ref, .. }
163 | Op::ExpandVarLen { graph_ref, .. } => graph_ref.as_deref(),
164 _ => None,
165 };
166 if let Some(r) = graph_ref.map(str::trim).filter(|s| !s.is_empty()) {
167 match seen_ref {
168 None => seen_ref = Some(r),
169 Some(prev) if prev != r => {
170 return Err(ExecutionError::MultiGraphUnsupported);
171 }
172 _ => {}
173 }
174 }
175 }
176
177 let mut outputs: Vec<Option<RowSet>> = vec![None; plan.ops.len()];
178
179 for (idx, op) in plan.ops.iter().enumerate() {
180 let rows = match op {
181 Op::ScanNodes {
182 labels,
183 must_labels,
184 forbidden_labels,
185 ..
186 } => self.execute_scan_nodes(labels, must_labels, forbidden_labels),
187 Op::ScanRels {
188 types,
189 src_labels,
190 dst_labels,
191 ..
192 } => self.execute_scan_rels(types, src_labels, dst_labels),
193 Op::Expand {
194 input,
195 src_col,
196 types,
197 dir,
198 legal_src_labels,
199 legal_dst_labels,
200 ..
201 } => {
202 let input_rows = self.input_rows(&outputs, *input)?;
203 self.execute_expand(
204 input_rows,
205 *src_col,
206 types,
207 *dir,
208 legal_src_labels,
209 legal_dst_labels,
210 )?
211 }
212 Op::OptionalExpand {
213 input,
214 src_col,
215 types,
216 dir,
217 legal_src_labels,
218 legal_dst_labels,
219 ..
220 } => {
221 let input_rows = self.input_rows(&outputs, *input)?;
222 self.execute_optional_expand(
223 input_rows,
224 *src_col,
225 types,
226 *dir,
227 legal_src_labels,
228 legal_dst_labels,
229 )?
230 }
231 Op::SemiExpand {
232 input,
233 src_col,
234 types,
235 dir,
236 legal_src_labels,
237 legal_dst_labels,
238 ..
239 } => {
240 let input_rows = self.input_rows(&outputs, *input)?;
241 self.execute_semi_expand(
242 input_rows,
243 *src_col,
244 types,
245 *dir,
246 legal_src_labels,
247 legal_dst_labels,
248 )?
249 }
250 Op::ExpandVarLen {
251 input,
252 src_col,
253 types,
254 dir,
255 min_hops,
256 max_hops,
257 ..
258 } => {
259 let input_rows = self.input_rows(&outputs, *input)?;
260 self.execute_expand_var_len(
261 input_rows, *src_col, types, *dir, *min_hops, *max_hops,
262 )?
263 }
264 Op::Filter { input, predicate } => {
265 let input_rows = self.input_rows(&outputs, *input)?;
266 self.execute_filter_rows(input_rows, predicate)?
267 }
268 Op::BlockMarker { input, .. } => self.input_rows(&outputs, *input)?.clone(),
269 Op::Project { input, exprs, .. } => {
270 let input_rows = self.input_rows(&outputs, *input)?;
271 self.execute_project_rows(input_rows, exprs)?
272 }
273 Op::Aggregate {
274 input, keys, aggs, ..
275 } => {
276 let input_rows = self.input_rows(&outputs, *input)?;
277 self.execute_aggregate_rows(input_rows, keys, aggs)?
278 }
279 Op::Sort { input, keys, dirs } => {
280 let input_rows = self.input_rows(&outputs, *input)?;
281 self.execute_sort_rows(input_rows, keys, dirs)?
282 }
283 Op::Limit { input, count, skip, .. } => {
284 let input_rows = self.input_rows(&outputs, *input)?;
285 self.execute_limit_rows(input_rows, *count, *skip)
286 }
287 Op::Return { input } => self.input_rows(&outputs, *input)?.clone(),
288 Op::Unwind {
289 input, list_expr, ..
290 } => {
291 let input_rows = self.input_rows(&outputs, *input)?;
292 self.execute_unwind(input_rows, list_expr)?
293 }
294 Op::PathConstruct {
295 input, rel_cols, ..
296 } => {
297 let input_rows = self.input_rows(&outputs, *input)?;
298 self.execute_path_construct(input_rows, rel_cols)?
299 }
300 Op::Union { lhs, rhs, all, .. } => {
301 let lhs_rows = self.input_rows(&outputs, *lhs)?;
302 let rhs_rows = self.input_rows(&outputs, *rhs)?;
303 self.execute_union_rows(lhs_rows, rhs_rows, *all)
304 }
305 Op::CreateNode {
306 input,
307 labels,
308 props,
309 ..
310 } => {
311 let input_rows = self.input_rows(&outputs, *input)?.clone();
312 self.execute_create_node_rows(&input_rows, labels, props)?
313 }
314 Op::CreateRel {
315 input,
316 src_col,
317 dst_col,
318 rel_type,
319 props,
320 ..
321 } => {
322 let input_rows = self.input_rows(&outputs, *input)?.clone();
323 self.execute_create_rel_rows(&input_rows, *src_col, *dst_col, rel_type, props)?
324 }
325 Op::Merge {
326 input,
327 pattern,
328 on_create_props,
329 on_match_props,
330 schema,
331 ..
332 } => {
333 let input_rows = self.input_rows(&outputs, *input)?.clone();
334 self.execute_merge_rows(
335 &input_rows,
336 pattern,
337 on_create_props,
338 on_match_props,
339 schema,
340 )?
341 }
342 Op::Delete {
343 input,
344 target_col,
345 detach,
346 ..
347 } => {
348 let input_rows = self.input_rows(&outputs, *input)?.clone();
349 self.execute_delete_rows(&input_rows, *target_col, *detach)?
350 }
351 Op::SetProperty {
352 input,
353 target_col,
354 key,
355 value_expr,
356 ..
357 } => {
358 let input_rows = self.input_rows(&outputs, *input)?.clone();
359 self.execute_set_property_rows(&input_rows, *target_col, key, value_expr)?
360 }
361 Op::RemoveProperty {
362 input,
363 target_col,
364 key,
365 ..
366 } => {
367 let input_rows = self.input_rows(&outputs, *input)?.clone();
368 self.execute_remove_property_rows(&input_rows, *target_col, key)?
369 }
370 Op::VectorScan { .. } => return Err(ExecutionError::UnsupportedOp("vector_scan")),
371 Op::Rerank { .. } => return Err(ExecutionError::UnsupportedOp("rerank")),
372 Op::ConstRow => vec![vec![]],
373 };
374 outputs[idx] = Some(rows);
375 }
376
377 let root_rows = outputs
378 .get(plan.root_op as usize)
379 .ok_or(ExecutionError::InvalidRootOp(plan.root_op))?
380 .clone()
381 .ok_or(ExecutionError::InvalidRootOp(plan.root_op))?;
382
383 Ok(QueryResult {
384 rows: root_rows,
385 continuation: None,
386 })
387 }
388}