1pub mod eval;
4pub mod operators;
6
7use crate::parser::ast::*;
8use crate::planner::{LogicalPlan, TemporalFilterPlan};
9use cypherlite_core::{EdgeId, LabelRegistry, NodeId, PropertyValue};
10use cypherlite_storage::StorageEngine;
11use std::collections::HashMap;
12
13pub trait ScalarFnLookup {
22 fn call_scalar(&self, name: &str, args: &[Value]) -> Option<Result<Value, ExecutionError>>;
27}
28
29impl ScalarFnLookup for () {
31 fn call_scalar(&self, _name: &str, _args: &[Value]) -> Option<Result<Value, ExecutionError>> {
32 None
33 }
34}
35
36pub trait TriggerLookup {
45 fn fire_before_create(
47 &self,
48 ctx: &cypherlite_core::TriggerContext,
49 ) -> Result<(), ExecutionError>;
50
51 fn fire_after_create(
53 &self,
54 ctx: &cypherlite_core::TriggerContext,
55 ) -> Result<(), ExecutionError>;
56
57 fn fire_before_update(
59 &self,
60 ctx: &cypherlite_core::TriggerContext,
61 ) -> Result<(), ExecutionError>;
62
63 fn fire_after_update(
65 &self,
66 ctx: &cypherlite_core::TriggerContext,
67 ) -> Result<(), ExecutionError>;
68
69 fn fire_before_delete(
71 &self,
72 ctx: &cypherlite_core::TriggerContext,
73 ) -> Result<(), ExecutionError>;
74
75 fn fire_after_delete(
77 &self,
78 ctx: &cypherlite_core::TriggerContext,
79 ) -> Result<(), ExecutionError>;
80}
81
82impl TriggerLookup for () {
84 fn fire_before_create(
85 &self,
86 _ctx: &cypherlite_core::TriggerContext,
87 ) -> Result<(), ExecutionError> {
88 Ok(())
89 }
90 fn fire_after_create(
91 &self,
92 _ctx: &cypherlite_core::TriggerContext,
93 ) -> Result<(), ExecutionError> {
94 Ok(())
95 }
96 fn fire_before_update(
97 &self,
98 _ctx: &cypherlite_core::TriggerContext,
99 ) -> Result<(), ExecutionError> {
100 Ok(())
101 }
102 fn fire_after_update(
103 &self,
104 _ctx: &cypherlite_core::TriggerContext,
105 ) -> Result<(), ExecutionError> {
106 Ok(())
107 }
108 fn fire_before_delete(
109 &self,
110 _ctx: &cypherlite_core::TriggerContext,
111 ) -> Result<(), ExecutionError> {
112 Ok(())
113 }
114 fn fire_after_delete(
115 &self,
116 _ctx: &cypherlite_core::TriggerContext,
117 ) -> Result<(), ExecutionError> {
118 Ok(())
119 }
120}
121
122#[cfg(feature = "plugin")]
123impl TriggerLookup
124 for cypherlite_core::plugin::PluginRegistry<dyn cypherlite_core::plugin::Trigger>
125{
126 fn fire_before_create(
127 &self,
128 ctx: &cypherlite_core::TriggerContext,
129 ) -> Result<(), ExecutionError> {
130 for name in self.list() {
131 if let Some(trigger) = self.get(name) {
132 trigger.on_before_create(ctx).map_err(|e| ExecutionError {
133 message: e.to_string(),
134 })?;
135 }
136 }
137 Ok(())
138 }
139 fn fire_after_create(
140 &self,
141 ctx: &cypherlite_core::TriggerContext,
142 ) -> Result<(), ExecutionError> {
143 for name in self.list() {
144 if let Some(trigger) = self.get(name) {
145 trigger.on_after_create(ctx).map_err(|e| ExecutionError {
146 message: e.to_string(),
147 })?;
148 }
149 }
150 Ok(())
151 }
152 fn fire_before_update(
153 &self,
154 ctx: &cypherlite_core::TriggerContext,
155 ) -> Result<(), ExecutionError> {
156 for name in self.list() {
157 if let Some(trigger) = self.get(name) {
158 trigger.on_before_update(ctx).map_err(|e| ExecutionError {
159 message: e.to_string(),
160 })?;
161 }
162 }
163 Ok(())
164 }
165 fn fire_after_update(
166 &self,
167 ctx: &cypherlite_core::TriggerContext,
168 ) -> Result<(), ExecutionError> {
169 for name in self.list() {
170 if let Some(trigger) = self.get(name) {
171 trigger.on_after_update(ctx).map_err(|e| ExecutionError {
172 message: e.to_string(),
173 })?;
174 }
175 }
176 Ok(())
177 }
178 fn fire_before_delete(
179 &self,
180 ctx: &cypherlite_core::TriggerContext,
181 ) -> Result<(), ExecutionError> {
182 for name in self.list() {
183 if let Some(trigger) = self.get(name) {
184 trigger.on_before_delete(ctx).map_err(|e| ExecutionError {
185 message: e.to_string(),
186 })?;
187 }
188 }
189 Ok(())
190 }
191 fn fire_after_delete(
192 &self,
193 ctx: &cypherlite_core::TriggerContext,
194 ) -> Result<(), ExecutionError> {
195 for name in self.list() {
196 if let Some(trigger) = self.get(name) {
197 trigger.on_after_delete(ctx).map_err(|e| ExecutionError {
198 message: e.to_string(),
199 })?;
200 }
201 }
202 Ok(())
203 }
204}
205
206#[cfg(feature = "plugin")]
207impl ScalarFnLookup
208 for cypherlite_core::plugin::PluginRegistry<dyn cypherlite_core::plugin::ScalarFunction>
209{
210 fn call_scalar(&self, name: &str, args: &[Value]) -> Option<Result<Value, ExecutionError>> {
211 let func = self.get(name)?;
212 let pv_args: Result<Vec<PropertyValue>, String> =
214 args.iter().cloned().map(PropertyValue::try_from).collect();
215 let pv_args = match pv_args {
216 Ok(a) => a,
217 Err(e) => {
218 return Some(Err(ExecutionError {
219 message: format!("plugin function argument conversion: {}", e),
220 }))
221 }
222 };
223 match func.call(&pv_args) {
224 Ok(result) => Some(Ok(Value::from(result))),
225 Err(e) => Some(Err(ExecutionError {
226 message: e.to_string(),
227 })),
228 }
229 }
230}
231
232#[derive(Debug, Clone, PartialEq)]
234pub enum Value {
235 Null,
237 Bool(bool),
239 Int64(i64),
241 Float64(f64),
243 String(String),
245 Bytes(Vec<u8>),
247 List(Vec<Value>),
249 Node(NodeId),
251 Edge(EdgeId),
253 DateTime(i64),
255 #[cfg(feature = "subgraph")]
257 Subgraph(cypherlite_core::SubgraphId),
258 #[cfg(feature = "hypergraph")]
260 Hyperedge(cypherlite_core::HyperEdgeId),
261 #[cfg(feature = "hypergraph")]
264 TemporalNode(NodeId, i64),
265}
266
267impl From<PropertyValue> for Value {
269 fn from(pv: PropertyValue) -> Self {
270 match pv {
271 PropertyValue::Null => Value::Null,
272 PropertyValue::Bool(b) => Value::Bool(b),
273 PropertyValue::Int64(i) => Value::Int64(i),
274 PropertyValue::Float64(f) => Value::Float64(f),
275 PropertyValue::String(s) => Value::String(s),
276 PropertyValue::Bytes(b) => Value::Bytes(b),
277 PropertyValue::Array(a) => Value::List(a.into_iter().map(Value::from).collect()),
278 PropertyValue::DateTime(ms) => Value::DateTime(ms),
279 }
280 }
281}
282
283impl TryFrom<Value> for PropertyValue {
285 type Error = String;
286 fn try_from(v: Value) -> Result<Self, Self::Error> {
287 match v {
288 Value::Null => Ok(PropertyValue::Null),
289 Value::Bool(b) => Ok(PropertyValue::Bool(b)),
290 Value::Int64(i) => Ok(PropertyValue::Int64(i)),
291 Value::Float64(f) => Ok(PropertyValue::Float64(f)),
292 Value::String(s) => Ok(PropertyValue::String(s)),
293 Value::Bytes(b) => Ok(PropertyValue::Bytes(b)),
294 Value::List(l) => {
295 let items: Result<Vec<_>, _> = l.into_iter().map(PropertyValue::try_from).collect();
296 Ok(PropertyValue::Array(items?))
297 }
298 Value::DateTime(ms) => Ok(PropertyValue::DateTime(ms)),
299 Value::Node(_) | Value::Edge(_) => {
300 Err("cannot convert graph entity to property".into())
301 }
302 #[cfg(feature = "subgraph")]
303 Value::Subgraph(_) => Err("cannot convert graph entity to property".into()),
304 #[cfg(feature = "hypergraph")]
305 Value::Hyperedge(_) => Err("cannot convert graph entity to property".into()),
306 #[cfg(feature = "hypergraph")]
307 Value::TemporalNode(_, _) => Err("cannot convert graph entity to property".into()),
308 }
309 }
310}
311
312pub type Record = HashMap<String, Value>;
314
315pub type Params = HashMap<String, Value>;
317
318#[derive(Debug, Clone, PartialEq)]
320pub struct ExecutionError {
321 pub message: String,
323}
324
325impl std::fmt::Display for ExecutionError {
326 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
327 write!(f, "Execution error: {}", self.message)
328 }
329}
330
331impl std::error::Error for ExecutionError {}
332
333pub fn execute(
337 plan: &LogicalPlan,
338 engine: &mut StorageEngine,
339 params: &Params,
340 scalar_fns: &dyn ScalarFnLookup,
341 trigger_fns: &dyn TriggerLookup,
342) -> Result<Vec<Record>, ExecutionError> {
343 match plan {
344 LogicalPlan::EmptySource => Ok(vec![Record::new()]),
345 LogicalPlan::NodeScan {
346 variable,
347 label_id,
348 limit,
349 ..
350 } => {
351 let mut records = operators::node_scan::execute_node_scan(variable, *label_id, engine);
352 if let Some(lim) = limit {
353 records.truncate(*lim);
354 }
355 Ok(records)
356 }
357 LogicalPlan::Expand {
358 source,
359 src_var,
360 rel_var,
361 target_var,
362 rel_type_id,
363 direction,
364 temporal_filter,
365 } => {
366 let source_records = execute(source, engine, params, scalar_fns, trigger_fns)?;
367 let tf = resolve_temporal_filter(temporal_filter, engine, params, scalar_fns)?;
368 Ok(operators::expand::execute_expand(
369 source_records,
370 src_var,
371 rel_var.as_deref(),
372 target_var,
373 *rel_type_id,
374 direction,
375 engine,
376 tf.as_ref(),
377 ))
378 }
379 LogicalPlan::Filter { source, predicate } => {
380 let source_records = execute(source, engine, params, scalar_fns, trigger_fns)?;
381 operators::filter::execute_filter(source_records, predicate, engine, params, scalar_fns)
382 }
383 LogicalPlan::Project {
384 source,
385 items,
386 distinct,
387 } => {
388 let source_records = execute(source, engine, params, scalar_fns, trigger_fns)?;
389 let mut result = operators::project::execute_project(
390 source_records,
391 items,
392 engine,
393 params,
394 scalar_fns,
395 )?;
396 if *distinct {
397 deduplicate_records(&mut result);
398 }
399 Ok(result)
400 }
401 LogicalPlan::Sort { source, items } => {
402 let source_records = execute(source, engine, params, scalar_fns, trigger_fns)?;
403 Ok(operators::sort::execute_sort(
404 source_records,
405 items,
406 engine,
407 params,
408 scalar_fns,
409 ))
410 }
411 LogicalPlan::Skip { source, count } => {
412 let source_records = execute(source, engine, params, scalar_fns, trigger_fns)?;
413 let n = eval_count_expr(count)?;
414 Ok(operators::limit::execute_skip(source_records, n))
415 }
416 LogicalPlan::Limit { source, count } => {
417 let source_records = execute(source, engine, params, scalar_fns, trigger_fns)?;
418 let n = eval_count_expr(count)?;
419 Ok(operators::limit::execute_limit(source_records, n))
420 }
421 LogicalPlan::Aggregate {
422 source,
423 group_keys,
424 aggregates,
425 } => {
426 let source_records = execute(source, engine, params, scalar_fns, trigger_fns)?;
427 operators::aggregate::execute_aggregate(
428 source_records,
429 group_keys,
430 aggregates,
431 engine,
432 params,
433 scalar_fns,
434 )
435 }
436 LogicalPlan::CreateOp { source, pattern } => {
437 let source_records = match source {
438 Some(s) => execute(s, engine, params, scalar_fns, trigger_fns)?,
439 None => vec![Record::new()],
440 };
441 operators::create::execute_create(
442 source_records,
443 pattern,
444 engine,
445 params,
446 scalar_fns,
447 trigger_fns,
448 )
449 }
450 LogicalPlan::DeleteOp {
451 source,
452 exprs,
453 detach,
454 } => {
455 let source_records = execute(source, engine, params, scalar_fns, trigger_fns)?;
456 operators::delete::execute_delete(
457 source_records,
458 exprs,
459 *detach,
460 engine,
461 params,
462 scalar_fns,
463 trigger_fns,
464 )
465 }
466 LogicalPlan::SetOp { source, items } => {
467 let source_records = execute(source, engine, params, scalar_fns, trigger_fns)?;
468 operators::set_props::execute_set(
469 source_records,
470 items,
471 engine,
472 params,
473 scalar_fns,
474 trigger_fns,
475 )
476 }
477 LogicalPlan::RemoveOp { source, items } => {
478 let source_records = execute(source, engine, params, scalar_fns, trigger_fns)?;
479 operators::set_props::execute_remove(
480 source_records,
481 items,
482 engine,
483 params,
484 scalar_fns,
485 trigger_fns,
486 )
487 }
488 LogicalPlan::Unwind {
489 source,
490 expr,
491 variable,
492 } => {
493 let source_records = execute(source, engine, params, scalar_fns, trigger_fns)?;
494 operators::unwind::execute_unwind(
495 source_records,
496 expr,
497 variable,
498 engine,
499 params,
500 scalar_fns,
501 )
502 }
503 LogicalPlan::With {
504 source,
505 items,
506 where_clause,
507 distinct,
508 } => {
509 let source_records = execute(source, engine, params, scalar_fns, trigger_fns)?;
510 let mut result =
511 operators::with::execute_with(source_records, items, engine, params, scalar_fns)?;
512 if *distinct {
513 deduplicate_records(&mut result);
514 }
515 if let Some(ref predicate) = where_clause {
516 result = operators::filter::execute_filter(
517 result, predicate, engine, params, scalar_fns,
518 )?;
519 }
520 Ok(result)
521 }
522 LogicalPlan::MergeOp {
523 source,
524 pattern,
525 on_match,
526 on_create,
527 } => {
528 let source_records = match source {
529 Some(s) => execute(s, engine, params, scalar_fns, trigger_fns)?,
530 None => vec![Record::new()],
531 };
532 operators::merge::execute_merge(
533 source_records,
534 pattern,
535 on_match,
536 on_create,
537 engine,
538 params,
539 scalar_fns,
540 trigger_fns,
541 )
542 }
543 LogicalPlan::CreateIndex {
544 name,
545 label,
546 property,
547 } => {
548 let label_id = engine.get_or_create_label(label);
550 let prop_key_id = engine.get_or_create_prop_key(property);
551
552 let index_name = match name {
554 Some(n) => n.clone(),
555 None => format!("idx_{}_{}", label, property),
556 };
557
558 engine
560 .index_manager_mut()
561 .create_index(index_name.clone(), label_id, prop_key_id)
562 .map_err(|e| ExecutionError {
563 message: e.to_string(),
564 })?;
565
566 engine
568 .catalog_mut()
569 .add_index_definition(cypherlite_storage::index::IndexDefinition {
570 name: index_name,
571 label_id,
572 prop_key_id,
573 });
574
575 let nodes: Vec<(
577 cypherlite_core::NodeId,
578 Vec<(u32, cypherlite_core::PropertyValue)>,
579 )> = engine
580 .scan_nodes_by_label(label_id)
581 .iter()
582 .map(|n| (n.node_id, n.properties.clone()))
583 .collect();
584 for (nid, props) in &nodes {
585 for (pk, v) in props {
586 if *pk == prop_key_id {
587 if let Some(idx) = engine
588 .index_manager_mut()
589 .find_index_mut(label_id, prop_key_id)
590 {
591 idx.insert(v, *nid);
592 }
593 }
594 }
595 }
596
597 Ok(vec![])
598 }
599 LogicalPlan::CreateEdgeIndex {
600 name,
601 rel_type,
602 property,
603 } => {
604 let rel_type_id = engine.get_or_create_rel_type(rel_type);
606 let prop_key_id = engine.get_or_create_prop_key(property);
607
608 let index_name = match name {
610 Some(n) => n.clone(),
611 None => format!("eidx_{}_{}", rel_type, property),
612 };
613
614 engine
616 .edge_index_manager_mut()
617 .create_index(index_name.clone(), rel_type_id, prop_key_id)
618 .map_err(|e| ExecutionError {
619 message: e.to_string(),
620 })?;
621
622 let edges: Vec<(
624 cypherlite_core::EdgeId,
625 Vec<(u32, cypherlite_core::PropertyValue)>,
626 )> = engine
627 .scan_edges_by_type(rel_type_id)
628 .iter()
629 .map(|e| (e.edge_id, e.properties.clone()))
630 .collect();
631 for (eid, props) in &edges {
632 for (pk, v) in props {
633 if *pk == prop_key_id {
634 if let Some(idx) = engine
635 .edge_index_manager_mut()
636 .find_index_mut(rel_type_id, prop_key_id)
637 {
638 idx.insert(v, *eid);
639 }
640 }
641 }
642 }
643
644 Ok(vec![])
645 }
646 LogicalPlan::DropIndex { name } => {
647 let removed_node = engine.index_manager_mut().drop_index(name);
649 if removed_node.is_ok() {
650 engine.catalog_mut().remove_index_definition(name);
651 return Ok(vec![]);
652 }
653
654 engine
656 .edge_index_manager_mut()
657 .drop_index(name)
658 .map_err(|e| ExecutionError {
659 message: e.to_string(),
660 })?;
661
662 Ok(vec![])
663 }
664 LogicalPlan::VarLengthExpand {
665 source,
666 src_var,
667 rel_var,
668 target_var,
669 rel_type_id,
670 direction,
671 min_hops,
672 max_hops,
673 temporal_filter,
674 } => {
675 let source_records = execute(source, engine, params, scalar_fns, trigger_fns)?;
676 let tf = resolve_temporal_filter(temporal_filter, engine, params, scalar_fns)?;
677 Ok(operators::var_length_expand::execute_var_length_expand(
678 source_records,
679 src_var,
680 rel_var.as_deref(),
681 target_var,
682 *rel_type_id,
683 direction,
684 *min_hops,
685 *max_hops,
686 engine,
687 tf.as_ref(),
688 ))
689 }
690 LogicalPlan::OptionalExpand {
691 source,
692 src_var,
693 rel_var,
694 target_var,
695 rel_type_id,
696 direction,
697 } => {
698 let source_records = execute(source, engine, params, scalar_fns, trigger_fns)?;
699 Ok(operators::optional_expand::execute_optional_expand(
700 source_records,
701 src_var,
702 rel_var.as_deref(),
703 target_var,
704 *rel_type_id,
705 direction,
706 engine,
707 ))
708 }
709 LogicalPlan::IndexScan {
710 variable,
711 label_id,
712 prop_key,
713 lookup_value,
714 } => operators::index_scan::execute_index_scan(
715 variable,
716 *label_id,
717 prop_key,
718 lookup_value,
719 engine,
720 params,
721 scalar_fns,
722 ),
723 #[cfg(feature = "subgraph")]
724 LogicalPlan::SubgraphScan { variable } => Ok(
725 operators::subgraph_scan::execute_subgraph_scan(variable, engine),
726 ),
727 #[cfg(feature = "subgraph")]
728 LogicalPlan::CreateSnapshotOp {
729 variable,
730 labels: _,
731 properties,
732 temporal_anchor,
733 sub_plan,
734 return_vars,
735 } => execute_create_snapshot(
736 variable.as_deref(),
737 properties,
738 temporal_anchor.as_ref(),
739 sub_plan,
740 return_vars,
741 engine,
742 params,
743 ),
744 #[cfg(feature = "hypergraph")]
745 LogicalPlan::HyperEdgeScan { variable } => Ok(
746 operators::hyperedge_scan::execute_hyperedge_scan(variable, engine),
747 ),
748 #[cfg(feature = "hypergraph")]
749 LogicalPlan::CreateHyperedgeOp {
750 source,
751 variable,
752 labels,
753 sources,
754 targets,
755 } => {
756 let source_records = match source {
757 Some(s) => execute(s, engine, params, scalar_fns, trigger_fns)?,
758 None => vec![Record::new()],
759 };
760 execute_create_hyperedge(
761 variable.as_deref(),
762 labels,
763 sources,
764 targets,
765 &source_records,
766 engine,
767 params,
768 )
769 }
770 LogicalPlan::AsOfScan {
771 source,
772 timestamp_expr,
773 } => {
774 let source_records = execute(source, engine, params, scalar_fns, trigger_fns)?;
775 operators::temporal_scan::execute_as_of_scan(
776 source_records,
777 timestamp_expr,
778 engine,
779 params,
780 scalar_fns,
781 )
782 }
783 LogicalPlan::TemporalRangeScan {
784 source,
785 start_expr,
786 end_expr,
787 } => {
788 let source_records = execute(source, engine, params, scalar_fns, trigger_fns)?;
789 operators::temporal_scan::execute_temporal_range_scan(
790 source_records,
791 start_expr,
792 end_expr,
793 engine,
794 params,
795 scalar_fns,
796 )
797 }
798 }
799}
800
801fn resolve_temporal_filter(
803 plan: &Option<TemporalFilterPlan>,
804 engine: &mut StorageEngine,
805 params: &Params,
806 scalar_fns: &dyn ScalarFnLookup,
807) -> Result<Option<operators::temporal_filter::TemporalFilter>, ExecutionError> {
808 let tfp = match plan {
809 Some(p) => p,
810 None => return Ok(None),
811 };
812 let empty_record = Record::new();
813 match tfp {
814 TemporalFilterPlan::AsOf(expr) => {
815 let val = eval::eval(expr, &empty_record, engine, params, scalar_fns)?;
816 let ms = extract_timestamp(val)?;
817 Ok(Some(operators::temporal_filter::TemporalFilter::AsOf(ms)))
818 }
819 TemporalFilterPlan::Between(start_expr, end_expr) => {
820 let start_val = eval::eval(start_expr, &empty_record, engine, params, scalar_fns)?;
821 let end_val = eval::eval(end_expr, &empty_record, engine, params, scalar_fns)?;
822 let start_ms = extract_timestamp(start_val)?;
823 let end_ms = extract_timestamp(end_val)?;
824 Ok(Some(operators::temporal_filter::TemporalFilter::Between(
825 start_ms, end_ms,
826 )))
827 }
828 }
829}
830
831fn extract_timestamp(val: Value) -> Result<i64, ExecutionError> {
833 match val {
834 Value::DateTime(ms) => Ok(ms),
835 Value::Int64(ms) => Ok(ms),
836 _ => Err(ExecutionError {
837 message: "temporal filter expression must evaluate to DateTime or integer".to_string(),
838 }),
839 }
840}
841
842fn eval_count_expr(expr: &Expression) -> Result<usize, ExecutionError> {
844 match expr {
845 Expression::Literal(Literal::Integer(n)) => {
846 if *n < 0 {
847 return Err(ExecutionError {
848 message: "SKIP/LIMIT count must be non-negative".to_string(),
849 });
850 }
851 Ok(*n as usize)
852 }
853 _ => Err(ExecutionError {
854 message: "SKIP/LIMIT count must be a literal integer".to_string(),
855 }),
856 }
857}
858
859#[cfg(feature = "subgraph")]
865#[allow(clippy::too_many_arguments)]
866fn execute_create_snapshot(
867 variable: Option<&str>,
868 properties: &Option<crate::parser::ast::MapLiteral>,
869 temporal_anchor_expr: Option<&crate::parser::ast::Expression>,
870 sub_plan: &LogicalPlan,
871 return_vars: &[String],
872 engine: &mut StorageEngine,
873 params: &Params,
874) -> Result<Vec<Record>, ExecutionError> {
875 use cypherlite_core::{LabelRegistry, PropertyValue, SubgraphId};
876
877 let sub_records = execute(sub_plan, engine, params, &(), &())?;
879
880 let mut node_ids = Vec::new();
882 for record in &sub_records {
883 for var in return_vars {
884 if let Some(Value::Node(nid)) = record.get(var) {
885 if !node_ids.contains(nid) {
886 node_ids.push(*nid);
887 }
888 }
889 }
890 }
891
892 let empty_record = Record::new();
894 let sg_props = match properties {
895 Some(map) => {
896 let mut result = Vec::new();
897 for (key, expr) in map {
898 let value = eval::eval(expr, &empty_record, engine, params, &())?;
899 let pv = PropertyValue::try_from(value).map_err(|e| ExecutionError {
900 message: format!("invalid property value for '{}': {}", key, e),
901 })?;
902 let key_id = engine.get_or_create_prop_key(key);
903 result.push((key_id, pv));
904 }
905 result
906 }
907 None => vec![],
908 };
909
910 let temporal_anchor = match temporal_anchor_expr {
912 Some(expr) => {
913 let val = eval::eval(expr, &empty_record, engine, params, &())?;
914 let ms = extract_timestamp(val)?;
915 Some(ms)
916 }
917 None => {
918 match params.get("__query_start_ms__") {
920 Some(Value::Int64(ms)) => Some(*ms),
921 _ => None,
922 }
923 }
924 };
925
926 let mut final_props = sg_props;
928 let now = match params.get("__query_start_ms__") {
929 Some(Value::Int64(ms)) => *ms,
930 _ => 0,
931 };
932 let created_key = engine.get_or_create_prop_key("_created_at");
933 final_props.push((created_key, PropertyValue::DateTime(now)));
934
935 let sg_id: SubgraphId = engine.create_subgraph(final_props, temporal_anchor);
937
938 for nid in &node_ids {
940 engine.add_member(sg_id, *nid).map_err(|e| ExecutionError {
941 message: format!("failed to add member to subgraph: {}", e),
942 })?;
943 }
944
945 let mut result_record = Record::new();
947 if let Some(var) = variable {
948 result_record.insert(var.to_string(), Value::Subgraph(sg_id));
949 }
950 Ok(vec![result_record])
951}
952
953#[cfg(feature = "hypergraph")]
959#[allow(clippy::too_many_arguments)]
960fn execute_create_hyperedge(
961 variable: Option<&str>,
962 labels: &[String],
963 sources: &[Expression],
964 targets: &[Expression],
965 source_records: &[Record],
966 engine: &mut StorageEngine,
967 params: &Params,
968) -> Result<Vec<Record>, ExecutionError> {
969 use cypherlite_core::{HyperEdgeId, LabelRegistry};
970
971 let record = source_records.first().cloned().unwrap_or_default();
974
975 let rel_type_id = if let Some(label) = labels.first() {
977 engine.get_or_create_rel_type(label)
978 } else {
979 0
980 };
981
982 let resolved_sources = resolve_hyperedge_participants(sources, &record, engine, params)?;
984 let resolved_targets = resolve_hyperedge_participants(targets, &record, engine, params)?;
986
987 let he_id: HyperEdgeId =
989 engine.create_hyperedge(rel_type_id, resolved_sources, resolved_targets, vec![]);
990
991 let mut result_record = record;
993 if let Some(var) = variable {
994 result_record.insert(var.to_string(), Value::Hyperedge(he_id));
995 }
996 Ok(vec![result_record])
997}
998
999#[cfg(feature = "hypergraph")]
1001fn resolve_hyperedge_participants(
1002 expressions: &[Expression],
1003 record: &Record,
1004 engine: &mut StorageEngine,
1005 params: &Params,
1006) -> Result<Vec<cypherlite_core::GraphEntity>, ExecutionError> {
1007 use cypherlite_core::GraphEntity;
1008
1009 let mut entities = Vec::new();
1010 for expr in expressions {
1011 match expr {
1012 Expression::TemporalRef { node, timestamp } => {
1013 let node_val = eval::eval(node, record, engine, params, &())?;
1015 let ts_val = eval::eval(timestamp, record, engine, params, &())?;
1016 let node_id = match node_val {
1017 Value::Node(nid) => nid,
1018 _ => {
1019 return Err(ExecutionError {
1020 message: "temporal reference node must resolve to a Node".to_string(),
1021 })
1022 }
1023 };
1024 let ts_ms = extract_timestamp(ts_val)?;
1025 entities.push(GraphEntity::TemporalRef(node_id, ts_ms));
1026 }
1027 _ => {
1028 let val = eval::eval(expr, record, engine, params, &())?;
1030 match val {
1031 Value::Node(nid) => entities.push(GraphEntity::Node(nid)),
1032 #[cfg(feature = "subgraph")]
1033 Value::Subgraph(sid) => entities.push(GraphEntity::Subgraph(sid)),
1034 Value::Hyperedge(hid) => {
1035 entities.push(GraphEntity::HyperEdge(hid));
1036 }
1037 _ => {
1038 return Err(ExecutionError {
1039 message: "hyperedge participant must resolve to a graph entity"
1040 .to_string(),
1041 })
1042 }
1043 }
1044 }
1045 }
1046 }
1047 Ok(entities)
1048}
1049
1050fn deduplicate_records(records: &mut Vec<Record>) {
1052 let mut seen: Vec<Record> = Vec::new();
1053 records.retain(|r| {
1054 if seen.contains(r) {
1055 false
1056 } else {
1057 seen.push(r.clone());
1058 true
1059 }
1060 });
1061}
1062
1063#[cfg(test)]
1064mod tests {
1065 use super::*;
1066
1067 #[test]
1072 fn test_value_from_property_value_null() {
1073 assert_eq!(Value::from(PropertyValue::Null), Value::Null);
1074 }
1075
1076 #[test]
1077 fn test_value_from_property_value_bool() {
1078 assert_eq!(Value::from(PropertyValue::Bool(true)), Value::Bool(true));
1079 }
1080
1081 #[test]
1082 fn test_value_from_property_value_int64() {
1083 assert_eq!(Value::from(PropertyValue::Int64(42)), Value::Int64(42));
1084 }
1085
1086 #[test]
1087 fn test_value_from_property_value_float64() {
1088 assert_eq!(
1089 Value::from(PropertyValue::Float64(3.15)),
1090 Value::Float64(3.15)
1091 );
1092 }
1093
1094 #[test]
1095 fn test_value_from_property_value_string() {
1096 assert_eq!(
1097 Value::from(PropertyValue::String("hello".into())),
1098 Value::String("hello".into())
1099 );
1100 }
1101
1102 #[test]
1103 fn test_value_from_property_value_bytes() {
1104 assert_eq!(
1105 Value::from(PropertyValue::Bytes(vec![1, 2, 3])),
1106 Value::Bytes(vec![1, 2, 3])
1107 );
1108 }
1109
1110 #[test]
1111 fn test_value_from_property_value_array() {
1112 let pv = PropertyValue::Array(vec![PropertyValue::Int64(1), PropertyValue::Null]);
1113 assert_eq!(
1114 Value::from(pv),
1115 Value::List(vec![Value::Int64(1), Value::Null])
1116 );
1117 }
1118
1119 #[test]
1120 fn test_value_try_into_property_value_success() {
1121 assert_eq!(
1122 PropertyValue::try_from(Value::Null),
1123 Ok(PropertyValue::Null)
1124 );
1125 assert_eq!(
1126 PropertyValue::try_from(Value::Bool(false)),
1127 Ok(PropertyValue::Bool(false))
1128 );
1129 assert_eq!(
1130 PropertyValue::try_from(Value::Int64(10)),
1131 Ok(PropertyValue::Int64(10))
1132 );
1133 assert_eq!(
1134 PropertyValue::try_from(Value::Float64(1.5)),
1135 Ok(PropertyValue::Float64(1.5))
1136 );
1137 assert_eq!(
1138 PropertyValue::try_from(Value::String("x".into())),
1139 Ok(PropertyValue::String("x".into()))
1140 );
1141 assert_eq!(
1142 PropertyValue::try_from(Value::Bytes(vec![0xAB])),
1143 Ok(PropertyValue::Bytes(vec![0xAB]))
1144 );
1145 }
1146
1147 #[test]
1148 fn test_value_try_into_property_value_list() {
1149 let v = Value::List(vec![Value::Int64(1), Value::Bool(true)]);
1150 let pv = PropertyValue::try_from(v);
1151 assert_eq!(
1152 pv,
1153 Ok(PropertyValue::Array(vec![
1154 PropertyValue::Int64(1),
1155 PropertyValue::Bool(true)
1156 ]))
1157 );
1158 }
1159
1160 #[test]
1161 fn test_value_try_into_property_value_node_fails() {
1162 let result = PropertyValue::try_from(Value::Node(NodeId(1)));
1163 assert!(result.is_err());
1164 assert!(result
1165 .expect_err("should error")
1166 .contains("cannot convert graph entity"));
1167 }
1168
1169 #[test]
1170 fn test_value_try_into_property_value_edge_fails() {
1171 let result = PropertyValue::try_from(Value::Edge(EdgeId(1)));
1172 assert!(result.is_err());
1173 }
1174
1175 #[test]
1176 fn test_execution_error_display() {
1177 let err = ExecutionError {
1178 message: "test error".to_string(),
1179 };
1180 assert_eq!(err.to_string(), "Execution error: test error");
1181 }
1182
1183 #[test]
1184 fn test_execution_error_is_error_trait() {
1185 let err = ExecutionError {
1186 message: "test".to_string(),
1187 };
1188 let _: &dyn std::error::Error = &err;
1190 }
1191
1192 #[test]
1193 fn test_record_type_is_hashmap() {
1194 let mut record: Record = Record::new();
1195 record.insert("n".to_string(), Value::Node(NodeId(1)));
1196 assert_eq!(record.get("n"), Some(&Value::Node(NodeId(1))));
1197 }
1198
1199 #[test]
1200 fn test_params_type_is_hashmap() {
1201 let mut params: Params = Params::new();
1202 params.insert("name".to_string(), Value::String("Alice".into()));
1203 assert_eq!(
1204 params.get("name"),
1205 Some(&Value::String("Alice".to_string()))
1206 );
1207 }
1208
1209 #[test]
1210 fn test_eval_count_expr_positive() {
1211 let expr = Expression::Literal(Literal::Integer(10));
1212 assert_eq!(eval_count_expr(&expr), Ok(10));
1213 }
1214
1215 #[test]
1216 fn test_eval_count_expr_zero() {
1217 let expr = Expression::Literal(Literal::Integer(0));
1218 assert_eq!(eval_count_expr(&expr), Ok(0));
1219 }
1220
1221 #[test]
1222 fn test_eval_count_expr_negative_fails() {
1223 let expr = Expression::Literal(Literal::Integer(-5));
1224 assert!(eval_count_expr(&expr).is_err());
1225 }
1226
1227 #[test]
1228 fn test_eval_count_expr_non_integer_fails() {
1229 let expr = Expression::Variable("n".to_string());
1230 assert!(eval_count_expr(&expr).is_err());
1231 }
1232
1233 #[test]
1234 fn test_deduplicate_records() {
1235 let mut r1 = Record::new();
1236 r1.insert("x".to_string(), Value::Int64(1));
1237 let mut r2 = Record::new();
1238 r2.insert("x".to_string(), Value::Int64(2));
1239 let r3 = r1.clone();
1240
1241 let mut records = vec![r1, r2, r3];
1242 deduplicate_records(&mut records);
1243 assert_eq!(records.len(), 2);
1244 }
1245
1246 #[test]
1251 fn test_value_from_property_value_datetime() {
1252 assert_eq!(
1253 Value::from(PropertyValue::DateTime(1_700_000_000_000)),
1254 Value::DateTime(1_700_000_000_000)
1255 );
1256 }
1257
1258 #[test]
1259 fn test_value_try_into_property_value_datetime() {
1260 assert_eq!(
1261 PropertyValue::try_from(Value::DateTime(1_700_000_000_000)),
1262 Ok(PropertyValue::DateTime(1_700_000_000_000))
1263 );
1264 }
1265
1266 #[cfg(feature = "hypergraph")]
1275 mod hyperedge_value_tests {
1276 use super::*;
1277 use cypherlite_core::HyperEdgeId;
1278
1279 #[test]
1281 fn test_value_hyperedge_creation() {
1282 let val = Value::Hyperedge(HyperEdgeId(42));
1283 assert_eq!(val, Value::Hyperedge(HyperEdgeId(42)));
1284 }
1285
1286 #[test]
1288 fn test_value_hyperedge_try_into_property_value_fails() {
1289 let result = PropertyValue::try_from(Value::Hyperedge(HyperEdgeId(1)));
1290 assert!(result.is_err());
1291 assert!(result
1292 .expect_err("should error")
1293 .contains("cannot convert graph entity"));
1294 }
1295
1296 #[test]
1298 fn test_value_hyperedge_ne_node() {
1299 let node_val = Value::Node(NodeId(1));
1300 let hyperedge_val = Value::Hyperedge(HyperEdgeId(1));
1301 assert_ne!(node_val, hyperedge_val);
1302 }
1303
1304 #[test]
1306 fn test_value_hyperedge_clone() {
1307 let val = Value::Hyperedge(HyperEdgeId(7));
1308 let cloned = val.clone();
1309 assert_eq!(val, cloned);
1310 }
1311
1312 #[test]
1314 fn test_value_hyperedge_debug() {
1315 let val = Value::Hyperedge(HyperEdgeId(99));
1316 let debug = format!("{:?}", val);
1317 assert!(debug.contains("Hyperedge"));
1318 assert!(debug.contains("99"));
1319 }
1320 }
1321
1322 #[cfg(feature = "subgraph")]
1323 mod subgraph_value_tests {
1324 use super::*;
1325 use cypherlite_core::SubgraphId;
1326
1327 #[test]
1329 fn test_value_subgraph_creation() {
1330 let val = Value::Subgraph(SubgraphId(42));
1331 assert_eq!(val, Value::Subgraph(SubgraphId(42)));
1332 }
1333
1334 #[test]
1336 fn test_value_subgraph_try_into_property_value_fails() {
1337 let result = PropertyValue::try_from(Value::Subgraph(SubgraphId(1)));
1338 assert!(result.is_err());
1339 assert!(result
1340 .expect_err("should error")
1341 .contains("cannot convert graph entity"));
1342 }
1343
1344 #[test]
1346 fn test_value_subgraph_ne_node() {
1347 let node_val = Value::Node(NodeId(1));
1348 let subgraph_val = Value::Subgraph(SubgraphId(1));
1349 assert_ne!(node_val, subgraph_val);
1350 }
1351
1352 #[test]
1354 fn test_value_subgraph_clone() {
1355 let val = Value::Subgraph(SubgraphId(7));
1356 let cloned = val.clone();
1357 assert_eq!(val, cloned);
1358 }
1359
1360 #[test]
1362 fn test_value_subgraph_debug() {
1363 let val = Value::Subgraph(SubgraphId(99));
1364 let debug = format!("{:?}", val);
1365 assert!(debug.contains("Subgraph"));
1366 assert!(debug.contains("99"));
1367 }
1368 }
1369}