Skip to main content

cypherlite_query/executor/
mod.rs

1// Executor: Volcano/Iterator model physical operators
2/// Expression evaluator for the executor.
3pub mod eval;
4/// Physical operator implementations (scan, expand, filter, etc.).
5pub mod operators;
6
7use crate::parser::ast::*;
8use crate::planner::{LogicalPlan, TemporalFilterPlan};
9use cypherlite_core::{EdgeId, LabelRegistry, NodeId, PropertyValue};
10use cypherlite_storage::StorageEngine;
11use std::collections::HashMap;
12
13// ---------------------------------------------------------------------------
14// Plugin: ScalarFnLookup trait for pluggable function dispatch
15// ---------------------------------------------------------------------------
16
17/// Trait for resolving unknown scalar functions at evaluation time.
18///
19/// Implemented for `()` (no-op) and for `PluginRegistry<dyn ScalarFunction>`
20/// when the `plugin` feature is enabled.
21pub trait ScalarFnLookup {
22    /// Try to call a scalar function by name with the given arguments.
23    ///
24    /// Returns `None` if the function is not found, allowing the caller to
25    /// produce a default "unknown function" error.
26    fn call_scalar(&self, name: &str, args: &[Value]) -> Option<Result<Value, ExecutionError>>;
27}
28
29/// No-op implementation: always returns `None` (function not found).
30impl ScalarFnLookup for () {
31    fn call_scalar(&self, _name: &str, _args: &[Value]) -> Option<Result<Value, ExecutionError>> {
32        None
33    }
34}
35
36// ---------------------------------------------------------------------------
37// Plugin: TriggerLookup trait for trigger hook dispatch
38// ---------------------------------------------------------------------------
39
40/// Trait for firing trigger hooks during mutation operations.
41///
42/// Implemented for `()` (no-op) and for `PluginRegistry<dyn Trigger>`
43/// when the `plugin` feature is enabled.
44pub trait TriggerLookup {
45    /// Fire all before-create triggers. Returns error to abort the operation.
46    fn fire_before_create(
47        &self,
48        ctx: &cypherlite_core::TriggerContext,
49    ) -> Result<(), ExecutionError>;
50
51    /// Fire all after-create triggers.
52    fn fire_after_create(
53        &self,
54        ctx: &cypherlite_core::TriggerContext,
55    ) -> Result<(), ExecutionError>;
56
57    /// Fire all before-update triggers. Returns error to abort the operation.
58    fn fire_before_update(
59        &self,
60        ctx: &cypherlite_core::TriggerContext,
61    ) -> Result<(), ExecutionError>;
62
63    /// Fire all after-update triggers.
64    fn fire_after_update(
65        &self,
66        ctx: &cypherlite_core::TriggerContext,
67    ) -> Result<(), ExecutionError>;
68
69    /// Fire all before-delete triggers. Returns error to abort the operation.
70    fn fire_before_delete(
71        &self,
72        ctx: &cypherlite_core::TriggerContext,
73    ) -> Result<(), ExecutionError>;
74
75    /// Fire all after-delete triggers.
76    fn fire_after_delete(
77        &self,
78        ctx: &cypherlite_core::TriggerContext,
79    ) -> Result<(), ExecutionError>;
80}
81
82/// No-op implementation: all triggers succeed with no action.
83impl TriggerLookup for () {
84    fn fire_before_create(
85        &self,
86        _ctx: &cypherlite_core::TriggerContext,
87    ) -> Result<(), ExecutionError> {
88        Ok(())
89    }
90    fn fire_after_create(
91        &self,
92        _ctx: &cypherlite_core::TriggerContext,
93    ) -> Result<(), ExecutionError> {
94        Ok(())
95    }
96    fn fire_before_update(
97        &self,
98        _ctx: &cypherlite_core::TriggerContext,
99    ) -> Result<(), ExecutionError> {
100        Ok(())
101    }
102    fn fire_after_update(
103        &self,
104        _ctx: &cypherlite_core::TriggerContext,
105    ) -> Result<(), ExecutionError> {
106        Ok(())
107    }
108    fn fire_before_delete(
109        &self,
110        _ctx: &cypherlite_core::TriggerContext,
111    ) -> Result<(), ExecutionError> {
112        Ok(())
113    }
114    fn fire_after_delete(
115        &self,
116        _ctx: &cypherlite_core::TriggerContext,
117    ) -> Result<(), ExecutionError> {
118        Ok(())
119    }
120}
121
122#[cfg(feature = "plugin")]
123impl TriggerLookup
124    for cypherlite_core::plugin::PluginRegistry<dyn cypherlite_core::plugin::Trigger>
125{
126    fn fire_before_create(
127        &self,
128        ctx: &cypherlite_core::TriggerContext,
129    ) -> Result<(), ExecutionError> {
130        for name in self.list() {
131            if let Some(trigger) = self.get(name) {
132                trigger.on_before_create(ctx).map_err(|e| ExecutionError {
133                    message: e.to_string(),
134                })?;
135            }
136        }
137        Ok(())
138    }
139    fn fire_after_create(
140        &self,
141        ctx: &cypherlite_core::TriggerContext,
142    ) -> Result<(), ExecutionError> {
143        for name in self.list() {
144            if let Some(trigger) = self.get(name) {
145                trigger.on_after_create(ctx).map_err(|e| ExecutionError {
146                    message: e.to_string(),
147                })?;
148            }
149        }
150        Ok(())
151    }
152    fn fire_before_update(
153        &self,
154        ctx: &cypherlite_core::TriggerContext,
155    ) -> Result<(), ExecutionError> {
156        for name in self.list() {
157            if let Some(trigger) = self.get(name) {
158                trigger.on_before_update(ctx).map_err(|e| ExecutionError {
159                    message: e.to_string(),
160                })?;
161            }
162        }
163        Ok(())
164    }
165    fn fire_after_update(
166        &self,
167        ctx: &cypherlite_core::TriggerContext,
168    ) -> Result<(), ExecutionError> {
169        for name in self.list() {
170            if let Some(trigger) = self.get(name) {
171                trigger.on_after_update(ctx).map_err(|e| ExecutionError {
172                    message: e.to_string(),
173                })?;
174            }
175        }
176        Ok(())
177    }
178    fn fire_before_delete(
179        &self,
180        ctx: &cypherlite_core::TriggerContext,
181    ) -> Result<(), ExecutionError> {
182        for name in self.list() {
183            if let Some(trigger) = self.get(name) {
184                trigger.on_before_delete(ctx).map_err(|e| ExecutionError {
185                    message: e.to_string(),
186                })?;
187            }
188        }
189        Ok(())
190    }
191    fn fire_after_delete(
192        &self,
193        ctx: &cypherlite_core::TriggerContext,
194    ) -> Result<(), ExecutionError> {
195        for name in self.list() {
196            if let Some(trigger) = self.get(name) {
197                trigger.on_after_delete(ctx).map_err(|e| ExecutionError {
198                    message: e.to_string(),
199                })?;
200            }
201        }
202        Ok(())
203    }
204}
205
206#[cfg(feature = "plugin")]
207impl ScalarFnLookup
208    for cypherlite_core::plugin::PluginRegistry<dyn cypherlite_core::plugin::ScalarFunction>
209{
210    fn call_scalar(&self, name: &str, args: &[Value]) -> Option<Result<Value, ExecutionError>> {
211        let func = self.get(name)?;
212        // Convert Value -> PropertyValue for each argument.
213        let pv_args: Result<Vec<PropertyValue>, String> =
214            args.iter().cloned().map(PropertyValue::try_from).collect();
215        let pv_args = match pv_args {
216            Ok(a) => a,
217            Err(e) => {
218                return Some(Err(ExecutionError {
219                    message: format!("plugin function argument conversion: {}", e),
220                }))
221            }
222        };
223        match func.call(&pv_args) {
224            Ok(result) => Some(Ok(Value::from(result))),
225            Err(e) => Some(Err(ExecutionError {
226                message: e.to_string(),
227            })),
228        }
229    }
230}
231
232/// Runtime value in query execution. Extends PropertyValue with graph entity references.
233#[derive(Debug, Clone, PartialEq)]
234pub enum Value {
235    /// Null value.
236    Null,
237    /// Boolean value.
238    Bool(bool),
239    /// 64-bit integer value.
240    Int64(i64),
241    /// 64-bit floating-point value.
242    Float64(f64),
243    /// UTF-8 string value.
244    String(String),
245    /// Raw byte array value.
246    Bytes(Vec<u8>),
247    /// Ordered list of values.
248    List(Vec<Value>),
249    /// A node entity reference.
250    Node(NodeId),
251    /// An edge entity reference.
252    Edge(EdgeId),
253    /// DateTime as milliseconds since Unix epoch.
254    DateTime(i64),
255    /// A subgraph entity reference.
256    #[cfg(feature = "subgraph")]
257    Subgraph(cypherlite_core::SubgraphId),
258    /// A hyperedge entity reference.
259    #[cfg(feature = "hypergraph")]
260    Hyperedge(cypherlite_core::HyperEdgeId),
261    /// A node reference at a specific point in time (lazy VersionStore resolution).
262    /// NN-001: TemporalRef resolves to the versioned node state when properties are accessed.
263    #[cfg(feature = "hypergraph")]
264    TemporalNode(NodeId, i64),
265}
266
267/// Convert from storage PropertyValue to executor Value.
268impl From<PropertyValue> for Value {
269    fn from(pv: PropertyValue) -> Self {
270        match pv {
271            PropertyValue::Null => Value::Null,
272            PropertyValue::Bool(b) => Value::Bool(b),
273            PropertyValue::Int64(i) => Value::Int64(i),
274            PropertyValue::Float64(f) => Value::Float64(f),
275            PropertyValue::String(s) => Value::String(s),
276            PropertyValue::Bytes(b) => Value::Bytes(b),
277            PropertyValue::Array(a) => Value::List(a.into_iter().map(Value::from).collect()),
278            PropertyValue::DateTime(ms) => Value::DateTime(ms),
279        }
280    }
281}
282
283/// Convert from executor Value to storage PropertyValue (for SET operations).
284impl TryFrom<Value> for PropertyValue {
285    type Error = String;
286    fn try_from(v: Value) -> Result<Self, Self::Error> {
287        match v {
288            Value::Null => Ok(PropertyValue::Null),
289            Value::Bool(b) => Ok(PropertyValue::Bool(b)),
290            Value::Int64(i) => Ok(PropertyValue::Int64(i)),
291            Value::Float64(f) => Ok(PropertyValue::Float64(f)),
292            Value::String(s) => Ok(PropertyValue::String(s)),
293            Value::Bytes(b) => Ok(PropertyValue::Bytes(b)),
294            Value::List(l) => {
295                let items: Result<Vec<_>, _> = l.into_iter().map(PropertyValue::try_from).collect();
296                Ok(PropertyValue::Array(items?))
297            }
298            Value::DateTime(ms) => Ok(PropertyValue::DateTime(ms)),
299            Value::Node(_) | Value::Edge(_) => {
300                Err("cannot convert graph entity to property".into())
301            }
302            #[cfg(feature = "subgraph")]
303            Value::Subgraph(_) => Err("cannot convert graph entity to property".into()),
304            #[cfg(feature = "hypergraph")]
305            Value::Hyperedge(_) => Err("cannot convert graph entity to property".into()),
306            #[cfg(feature = "hypergraph")]
307            Value::TemporalNode(_, _) => Err("cannot convert graph entity to property".into()),
308        }
309    }
310}
311
312/// A row of named values produced during query execution.
313pub type Record = HashMap<String, Value>;
314
315/// Query parameters passed by the user.
316pub type Params = HashMap<String, Value>;
317
318/// Execution error.
319#[derive(Debug, Clone, PartialEq)]
320pub struct ExecutionError {
321    /// Human-readable error description.
322    pub message: String,
323}
324
325impl std::fmt::Display for ExecutionError {
326    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
327        write!(f, "Execution error: {}", self.message)
328    }
329}
330
331impl std::error::Error for ExecutionError {}
332
333/// Execute a logical plan against a storage engine.
334// @MX:ANCHOR: Main executor dispatch - called by api layer and all recursive plan nodes
335// @MX:REASON: Central entry point for query execution; fan_in >= 3 (recursive + api + tests)
336pub fn execute(
337    plan: &LogicalPlan,
338    engine: &mut StorageEngine,
339    params: &Params,
340    scalar_fns: &dyn ScalarFnLookup,
341    trigger_fns: &dyn TriggerLookup,
342) -> Result<Vec<Record>, ExecutionError> {
343    match plan {
344        LogicalPlan::EmptySource => Ok(vec![Record::new()]),
345        LogicalPlan::NodeScan {
346            variable,
347            label_id,
348            limit,
349            ..
350        } => {
351            let mut records = operators::node_scan::execute_node_scan(variable, *label_id, engine);
352            if let Some(lim) = limit {
353                records.truncate(*lim);
354            }
355            Ok(records)
356        }
357        LogicalPlan::Expand {
358            source,
359            src_var,
360            rel_var,
361            target_var,
362            rel_type_id,
363            direction,
364            temporal_filter,
365        } => {
366            let source_records = execute(source, engine, params, scalar_fns, trigger_fns)?;
367            let tf = resolve_temporal_filter(temporal_filter, engine, params, scalar_fns)?;
368            Ok(operators::expand::execute_expand(
369                source_records,
370                src_var,
371                rel_var.as_deref(),
372                target_var,
373                *rel_type_id,
374                direction,
375                engine,
376                tf.as_ref(),
377            ))
378        }
379        LogicalPlan::Filter { source, predicate } => {
380            let source_records = execute(source, engine, params, scalar_fns, trigger_fns)?;
381            operators::filter::execute_filter(source_records, predicate, engine, params, scalar_fns)
382        }
383        LogicalPlan::Project {
384            source,
385            items,
386            distinct,
387        } => {
388            let source_records = execute(source, engine, params, scalar_fns, trigger_fns)?;
389            let mut result = operators::project::execute_project(
390                source_records,
391                items,
392                engine,
393                params,
394                scalar_fns,
395            )?;
396            if *distinct {
397                deduplicate_records(&mut result);
398            }
399            Ok(result)
400        }
401        LogicalPlan::Sort { source, items } => {
402            let source_records = execute(source, engine, params, scalar_fns, trigger_fns)?;
403            Ok(operators::sort::execute_sort(
404                source_records,
405                items,
406                engine,
407                params,
408                scalar_fns,
409            ))
410        }
411        LogicalPlan::Skip { source, count } => {
412            let source_records = execute(source, engine, params, scalar_fns, trigger_fns)?;
413            let n = eval_count_expr(count)?;
414            Ok(operators::limit::execute_skip(source_records, n))
415        }
416        LogicalPlan::Limit { source, count } => {
417            let source_records = execute(source, engine, params, scalar_fns, trigger_fns)?;
418            let n = eval_count_expr(count)?;
419            Ok(operators::limit::execute_limit(source_records, n))
420        }
421        LogicalPlan::Aggregate {
422            source,
423            group_keys,
424            aggregates,
425        } => {
426            let source_records = execute(source, engine, params, scalar_fns, trigger_fns)?;
427            operators::aggregate::execute_aggregate(
428                source_records,
429                group_keys,
430                aggregates,
431                engine,
432                params,
433                scalar_fns,
434            )
435        }
436        LogicalPlan::CreateOp { source, pattern } => {
437            let source_records = match source {
438                Some(s) => execute(s, engine, params, scalar_fns, trigger_fns)?,
439                None => vec![Record::new()],
440            };
441            operators::create::execute_create(
442                source_records,
443                pattern,
444                engine,
445                params,
446                scalar_fns,
447                trigger_fns,
448            )
449        }
450        LogicalPlan::DeleteOp {
451            source,
452            exprs,
453            detach,
454        } => {
455            let source_records = execute(source, engine, params, scalar_fns, trigger_fns)?;
456            operators::delete::execute_delete(
457                source_records,
458                exprs,
459                *detach,
460                engine,
461                params,
462                scalar_fns,
463                trigger_fns,
464            )
465        }
466        LogicalPlan::SetOp { source, items } => {
467            let source_records = execute(source, engine, params, scalar_fns, trigger_fns)?;
468            operators::set_props::execute_set(
469                source_records,
470                items,
471                engine,
472                params,
473                scalar_fns,
474                trigger_fns,
475            )
476        }
477        LogicalPlan::RemoveOp { source, items } => {
478            let source_records = execute(source, engine, params, scalar_fns, trigger_fns)?;
479            operators::set_props::execute_remove(
480                source_records,
481                items,
482                engine,
483                params,
484                scalar_fns,
485                trigger_fns,
486            )
487        }
488        LogicalPlan::Unwind {
489            source,
490            expr,
491            variable,
492        } => {
493            let source_records = execute(source, engine, params, scalar_fns, trigger_fns)?;
494            operators::unwind::execute_unwind(
495                source_records,
496                expr,
497                variable,
498                engine,
499                params,
500                scalar_fns,
501            )
502        }
503        LogicalPlan::With {
504            source,
505            items,
506            where_clause,
507            distinct,
508        } => {
509            let source_records = execute(source, engine, params, scalar_fns, trigger_fns)?;
510            let mut result =
511                operators::with::execute_with(source_records, items, engine, params, scalar_fns)?;
512            if *distinct {
513                deduplicate_records(&mut result);
514            }
515            if let Some(ref predicate) = where_clause {
516                result = operators::filter::execute_filter(
517                    result, predicate, engine, params, scalar_fns,
518                )?;
519            }
520            Ok(result)
521        }
522        LogicalPlan::MergeOp {
523            source,
524            pattern,
525            on_match,
526            on_create,
527        } => {
528            let source_records = match source {
529                Some(s) => execute(s, engine, params, scalar_fns, trigger_fns)?,
530                None => vec![Record::new()],
531            };
532            operators::merge::execute_merge(
533                source_records,
534                pattern,
535                on_match,
536                on_create,
537                engine,
538                params,
539                scalar_fns,
540                trigger_fns,
541            )
542        }
543        LogicalPlan::CreateIndex {
544            name,
545            label,
546            property,
547        } => {
548            // Resolve label and property names via catalog
549            let label_id = engine.get_or_create_label(label);
550            let prop_key_id = engine.get_or_create_prop_key(property);
551
552            // Generate index name if not provided
553            let index_name = match name {
554                Some(n) => n.clone(),
555                None => format!("idx_{}_{}", label, property),
556            };
557
558            // Create the index
559            engine
560                .index_manager_mut()
561                .create_index(index_name.clone(), label_id, prop_key_id)
562                .map_err(|e| ExecutionError {
563                    message: e.to_string(),
564                })?;
565
566            // Register in catalog
567            engine
568                .catalog_mut()
569                .add_index_definition(cypherlite_storage::index::IndexDefinition {
570                    name: index_name,
571                    label_id,
572                    prop_key_id,
573                });
574
575            // Backfill: index existing nodes that match the label + property
576            let nodes: Vec<(
577                cypherlite_core::NodeId,
578                Vec<(u32, cypherlite_core::PropertyValue)>,
579            )> = engine
580                .scan_nodes_by_label(label_id)
581                .iter()
582                .map(|n| (n.node_id, n.properties.clone()))
583                .collect();
584            for (nid, props) in &nodes {
585                for (pk, v) in props {
586                    if *pk == prop_key_id {
587                        if let Some(idx) = engine
588                            .index_manager_mut()
589                            .find_index_mut(label_id, prop_key_id)
590                        {
591                            idx.insert(v, *nid);
592                        }
593                    }
594                }
595            }
596
597            Ok(vec![])
598        }
599        LogicalPlan::CreateEdgeIndex {
600            name,
601            rel_type,
602            property,
603        } => {
604            // Resolve relationship type and property names via catalog
605            let rel_type_id = engine.get_or_create_rel_type(rel_type);
606            let prop_key_id = engine.get_or_create_prop_key(property);
607
608            // Generate index name if not provided
609            let index_name = match name {
610                Some(n) => n.clone(),
611                None => format!("eidx_{}_{}", rel_type, property),
612            };
613
614            // Create the edge index
615            engine
616                .edge_index_manager_mut()
617                .create_index(index_name.clone(), rel_type_id, prop_key_id)
618                .map_err(|e| ExecutionError {
619                    message: e.to_string(),
620                })?;
621
622            // Backfill: index existing edges that match the rel_type + property
623            let edges: Vec<(
624                cypherlite_core::EdgeId,
625                Vec<(u32, cypherlite_core::PropertyValue)>,
626            )> = engine
627                .scan_edges_by_type(rel_type_id)
628                .iter()
629                .map(|e| (e.edge_id, e.properties.clone()))
630                .collect();
631            for (eid, props) in &edges {
632                for (pk, v) in props {
633                    if *pk == prop_key_id {
634                        if let Some(idx) = engine
635                            .edge_index_manager_mut()
636                            .find_index_mut(rel_type_id, prop_key_id)
637                        {
638                            idx.insert(v, *eid);
639                        }
640                    }
641                }
642            }
643
644            Ok(vec![])
645        }
646        LogicalPlan::DropIndex { name } => {
647            // Try to remove from node index manager first
648            let removed_node = engine.index_manager_mut().drop_index(name);
649            if removed_node.is_ok() {
650                engine.catalog_mut().remove_index_definition(name);
651                return Ok(vec![]);
652            }
653
654            // If not found as node index, try edge index manager
655            engine
656                .edge_index_manager_mut()
657                .drop_index(name)
658                .map_err(|e| ExecutionError {
659                    message: e.to_string(),
660                })?;
661
662            Ok(vec![])
663        }
664        LogicalPlan::VarLengthExpand {
665            source,
666            src_var,
667            rel_var,
668            target_var,
669            rel_type_id,
670            direction,
671            min_hops,
672            max_hops,
673            temporal_filter,
674        } => {
675            let source_records = execute(source, engine, params, scalar_fns, trigger_fns)?;
676            let tf = resolve_temporal_filter(temporal_filter, engine, params, scalar_fns)?;
677            Ok(operators::var_length_expand::execute_var_length_expand(
678                source_records,
679                src_var,
680                rel_var.as_deref(),
681                target_var,
682                *rel_type_id,
683                direction,
684                *min_hops,
685                *max_hops,
686                engine,
687                tf.as_ref(),
688            ))
689        }
690        LogicalPlan::OptionalExpand {
691            source,
692            src_var,
693            rel_var,
694            target_var,
695            rel_type_id,
696            direction,
697        } => {
698            let source_records = execute(source, engine, params, scalar_fns, trigger_fns)?;
699            Ok(operators::optional_expand::execute_optional_expand(
700                source_records,
701                src_var,
702                rel_var.as_deref(),
703                target_var,
704                *rel_type_id,
705                direction,
706                engine,
707            ))
708        }
709        LogicalPlan::IndexScan {
710            variable,
711            label_id,
712            prop_key,
713            lookup_value,
714        } => operators::index_scan::execute_index_scan(
715            variable,
716            *label_id,
717            prop_key,
718            lookup_value,
719            engine,
720            params,
721            scalar_fns,
722        ),
723        #[cfg(feature = "subgraph")]
724        LogicalPlan::SubgraphScan { variable } => Ok(
725            operators::subgraph_scan::execute_subgraph_scan(variable, engine),
726        ),
727        #[cfg(feature = "subgraph")]
728        LogicalPlan::CreateSnapshotOp {
729            variable,
730            labels: _,
731            properties,
732            temporal_anchor,
733            sub_plan,
734            return_vars,
735        } => execute_create_snapshot(
736            variable.as_deref(),
737            properties,
738            temporal_anchor.as_ref(),
739            sub_plan,
740            return_vars,
741            engine,
742            params,
743        ),
744        #[cfg(feature = "hypergraph")]
745        LogicalPlan::HyperEdgeScan { variable } => Ok(
746            operators::hyperedge_scan::execute_hyperedge_scan(variable, engine),
747        ),
748        #[cfg(feature = "hypergraph")]
749        LogicalPlan::CreateHyperedgeOp {
750            source,
751            variable,
752            labels,
753            sources,
754            targets,
755        } => {
756            let source_records = match source {
757                Some(s) => execute(s, engine, params, scalar_fns, trigger_fns)?,
758                None => vec![Record::new()],
759            };
760            execute_create_hyperedge(
761                variable.as_deref(),
762                labels,
763                sources,
764                targets,
765                &source_records,
766                engine,
767                params,
768            )
769        }
770        LogicalPlan::AsOfScan {
771            source,
772            timestamp_expr,
773        } => {
774            let source_records = execute(source, engine, params, scalar_fns, trigger_fns)?;
775            operators::temporal_scan::execute_as_of_scan(
776                source_records,
777                timestamp_expr,
778                engine,
779                params,
780                scalar_fns,
781            )
782        }
783        LogicalPlan::TemporalRangeScan {
784            source,
785            start_expr,
786            end_expr,
787        } => {
788            let source_records = execute(source, engine, params, scalar_fns, trigger_fns)?;
789            operators::temporal_scan::execute_temporal_range_scan(
790                source_records,
791                start_expr,
792                end_expr,
793                engine,
794                params,
795                scalar_fns,
796            )
797        }
798    }
799}
800
801/// Resolve a TemporalFilterPlan into a concrete TemporalFilter by evaluating expressions.
802fn resolve_temporal_filter(
803    plan: &Option<TemporalFilterPlan>,
804    engine: &mut StorageEngine,
805    params: &Params,
806    scalar_fns: &dyn ScalarFnLookup,
807) -> Result<Option<operators::temporal_filter::TemporalFilter>, ExecutionError> {
808    let tfp = match plan {
809        Some(p) => p,
810        None => return Ok(None),
811    };
812    let empty_record = Record::new();
813    match tfp {
814        TemporalFilterPlan::AsOf(expr) => {
815            let val = eval::eval(expr, &empty_record, engine, params, scalar_fns)?;
816            let ms = extract_timestamp(val)?;
817            Ok(Some(operators::temporal_filter::TemporalFilter::AsOf(ms)))
818        }
819        TemporalFilterPlan::Between(start_expr, end_expr) => {
820            let start_val = eval::eval(start_expr, &empty_record, engine, params, scalar_fns)?;
821            let end_val = eval::eval(end_expr, &empty_record, engine, params, scalar_fns)?;
822            let start_ms = extract_timestamp(start_val)?;
823            let end_ms = extract_timestamp(end_val)?;
824            Ok(Some(operators::temporal_filter::TemporalFilter::Between(
825                start_ms, end_ms,
826            )))
827        }
828    }
829}
830
831/// Extract a timestamp (i64 millis) from a Value.
832fn extract_timestamp(val: Value) -> Result<i64, ExecutionError> {
833    match val {
834        Value::DateTime(ms) => Ok(ms),
835        Value::Int64(ms) => Ok(ms),
836        _ => Err(ExecutionError {
837            message: "temporal filter expression must evaluate to DateTime or integer".to_string(),
838        }),
839    }
840}
841
842/// Evaluate a SKIP/LIMIT count expression (must be a literal integer).
843fn eval_count_expr(expr: &Expression) -> Result<usize, ExecutionError> {
844    match expr {
845        Expression::Literal(Literal::Integer(n)) => {
846            if *n < 0 {
847                return Err(ExecutionError {
848                    message: "SKIP/LIMIT count must be non-negative".to_string(),
849                });
850            }
851            Ok(*n as usize)
852        }
853        _ => Err(ExecutionError {
854            message: "SKIP/LIMIT count must be a literal integer".to_string(),
855        }),
856    }
857}
858
859/// Execute a CREATE SNAPSHOT operation.
860/// 1. Execute the sub-plan (FROM MATCH ... RETURN ...) to collect node IDs.
861/// 2. Create a SubgraphRecord with properties and temporal anchor.
862/// 3. Add all matched nodes as members via MembershipIndex.
863/// 4. Return a record with the subgraph variable bound if specified.
864#[cfg(feature = "subgraph")]
865#[allow(clippy::too_many_arguments)]
866fn execute_create_snapshot(
867    variable: Option<&str>,
868    properties: &Option<crate::parser::ast::MapLiteral>,
869    temporal_anchor_expr: Option<&crate::parser::ast::Expression>,
870    sub_plan: &LogicalPlan,
871    return_vars: &[String],
872    engine: &mut StorageEngine,
873    params: &Params,
874) -> Result<Vec<Record>, ExecutionError> {
875    use cypherlite_core::{LabelRegistry, PropertyValue, SubgraphId};
876
877    // 1. Execute sub-plan to collect results.
878    let sub_records = execute(sub_plan, engine, params, &(), &())?;
879
880    // 2. Collect unique node IDs from the results.
881    let mut node_ids = Vec::new();
882    for record in &sub_records {
883        for var in return_vars {
884            if let Some(Value::Node(nid)) = record.get(var) {
885                if !node_ids.contains(nid) {
886                    node_ids.push(*nid);
887                }
888            }
889        }
890    }
891
892    // 3. Resolve properties for the subgraph.
893    let empty_record = Record::new();
894    let sg_props = match properties {
895        Some(map) => {
896            let mut result = Vec::new();
897            for (key, expr) in map {
898                let value = eval::eval(expr, &empty_record, engine, params, &())?;
899                let pv = PropertyValue::try_from(value).map_err(|e| ExecutionError {
900                    message: format!("invalid property value for '{}': {}", key, e),
901                })?;
902                let key_id = engine.get_or_create_prop_key(key);
903                result.push((key_id, pv));
904            }
905            result
906        }
907        None => vec![],
908    };
909
910    // 4. Resolve temporal anchor.
911    let temporal_anchor = match temporal_anchor_expr {
912        Some(expr) => {
913            let val = eval::eval(expr, &empty_record, engine, params, &())?;
914            let ms = extract_timestamp(val)?;
915            Some(ms)
916        }
917        None => {
918            // HH-003: Default temporal anchor is current timestamp.
919            match params.get("__query_start_ms__") {
920                Some(Value::Int64(ms)) => Some(*ms),
921                _ => None,
922            }
923        }
924    };
925
926    // 5. Inject _created_at timestamp.
927    let mut final_props = sg_props;
928    let now = match params.get("__query_start_ms__") {
929        Some(Value::Int64(ms)) => *ms,
930        _ => 0,
931    };
932    let created_key = engine.get_or_create_prop_key("_created_at");
933    final_props.push((created_key, PropertyValue::DateTime(now)));
934
935    // 6. Create the subgraph.
936    let sg_id: SubgraphId = engine.create_subgraph(final_props, temporal_anchor);
937
938    // 7. Add all matched nodes as members.
939    for nid in &node_ids {
940        engine.add_member(sg_id, *nid).map_err(|e| ExecutionError {
941            message: format!("failed to add member to subgraph: {}", e),
942        })?;
943    }
944
945    // 8. Return result record.
946    let mut result_record = Record::new();
947    if let Some(var) = variable {
948        result_record.insert(var.to_string(), Value::Subgraph(sg_id));
949    }
950    Ok(vec![result_record])
951}
952
953/// Execute a CREATE HYPEREDGE operation.
954/// 1. Resolve source/target participant expressions to GraphEntity values.
955/// 2. Look up or create the rel_type_id from labels via catalog.
956/// 3. Call engine.create_hyperedge() with sources, targets, and empty properties.
957/// 4. Return a record with the hyperedge variable bound if specified.
958#[cfg(feature = "hypergraph")]
959#[allow(clippy::too_many_arguments)]
960fn execute_create_hyperedge(
961    variable: Option<&str>,
962    labels: &[String],
963    sources: &[Expression],
964    targets: &[Expression],
965    source_records: &[Record],
966    engine: &mut StorageEngine,
967    params: &Params,
968) -> Result<Vec<Record>, ExecutionError> {
969    use cypherlite_core::{HyperEdgeId, LabelRegistry};
970
971    // Use the first source record for variable resolution (from preceding MATCH).
972    // If no source records, use an empty record.
973    let record = source_records.first().cloned().unwrap_or_default();
974
975    // Resolve the rel_type_id from the first label.
976    let rel_type_id = if let Some(label) = labels.first() {
977        engine.get_or_create_rel_type(label)
978    } else {
979        0
980    };
981
982    // Resolve source participants.
983    let resolved_sources = resolve_hyperedge_participants(sources, &record, engine, params)?;
984    // Resolve target participants.
985    let resolved_targets = resolve_hyperedge_participants(targets, &record, engine, params)?;
986
987    // Create the hyperedge (properties come from subsequent SET clause).
988    let he_id: HyperEdgeId =
989        engine.create_hyperedge(rel_type_id, resolved_sources, resolved_targets, vec![]);
990
991    // Return result record with all bindings from source plus the new hyperedge.
992    let mut result_record = record;
993    if let Some(var) = variable {
994        result_record.insert(var.to_string(), Value::Hyperedge(he_id));
995    }
996    Ok(vec![result_record])
997}
998
999/// Resolve a list of hyperedge participant expressions to GraphEntity values.
1000#[cfg(feature = "hypergraph")]
1001fn resolve_hyperedge_participants(
1002    expressions: &[Expression],
1003    record: &Record,
1004    engine: &mut StorageEngine,
1005    params: &Params,
1006) -> Result<Vec<cypherlite_core::GraphEntity>, ExecutionError> {
1007    use cypherlite_core::GraphEntity;
1008
1009    let mut entities = Vec::new();
1010    for expr in expressions {
1011        match expr {
1012            Expression::TemporalRef { node, timestamp } => {
1013                // Evaluate node expression to get NodeId.
1014                let node_val = eval::eval(node, record, engine, params, &())?;
1015                let ts_val = eval::eval(timestamp, record, engine, params, &())?;
1016                let node_id = match node_val {
1017                    Value::Node(nid) => nid,
1018                    _ => {
1019                        return Err(ExecutionError {
1020                            message: "temporal reference node must resolve to a Node".to_string(),
1021                        })
1022                    }
1023                };
1024                let ts_ms = extract_timestamp(ts_val)?;
1025                entities.push(GraphEntity::TemporalRef(node_id, ts_ms));
1026            }
1027            _ => {
1028                // Evaluate the expression to get a Value.
1029                let val = eval::eval(expr, record, engine, params, &())?;
1030                match val {
1031                    Value::Node(nid) => entities.push(GraphEntity::Node(nid)),
1032                    #[cfg(feature = "subgraph")]
1033                    Value::Subgraph(sid) => entities.push(GraphEntity::Subgraph(sid)),
1034                    Value::Hyperedge(hid) => {
1035                        entities.push(GraphEntity::HyperEdge(hid));
1036                    }
1037                    _ => {
1038                        return Err(ExecutionError {
1039                            message: "hyperedge participant must resolve to a graph entity"
1040                                .to_string(),
1041                        })
1042                    }
1043                }
1044            }
1045        }
1046    }
1047    Ok(entities)
1048}
1049
1050/// Deduplicate records by comparing all key-value pairs.
1051fn deduplicate_records(records: &mut Vec<Record>) {
1052    let mut seen: Vec<Record> = Vec::new();
1053    records.retain(|r| {
1054        if seen.contains(r) {
1055            false
1056        } else {
1057            seen.push(r.clone());
1058            true
1059        }
1060    });
1061}
1062
1063#[cfg(test)]
1064mod tests {
1065    use super::*;
1066
1067    // ======================================================================
1068    // TASK-044: Value, Record, Params, ExecutionError tests
1069    // ======================================================================
1070
1071    #[test]
1072    fn test_value_from_property_value_null() {
1073        assert_eq!(Value::from(PropertyValue::Null), Value::Null);
1074    }
1075
1076    #[test]
1077    fn test_value_from_property_value_bool() {
1078        assert_eq!(Value::from(PropertyValue::Bool(true)), Value::Bool(true));
1079    }
1080
1081    #[test]
1082    fn test_value_from_property_value_int64() {
1083        assert_eq!(Value::from(PropertyValue::Int64(42)), Value::Int64(42));
1084    }
1085
1086    #[test]
1087    fn test_value_from_property_value_float64() {
1088        assert_eq!(
1089            Value::from(PropertyValue::Float64(3.15)),
1090            Value::Float64(3.15)
1091        );
1092    }
1093
1094    #[test]
1095    fn test_value_from_property_value_string() {
1096        assert_eq!(
1097            Value::from(PropertyValue::String("hello".into())),
1098            Value::String("hello".into())
1099        );
1100    }
1101
1102    #[test]
1103    fn test_value_from_property_value_bytes() {
1104        assert_eq!(
1105            Value::from(PropertyValue::Bytes(vec![1, 2, 3])),
1106            Value::Bytes(vec![1, 2, 3])
1107        );
1108    }
1109
1110    #[test]
1111    fn test_value_from_property_value_array() {
1112        let pv = PropertyValue::Array(vec![PropertyValue::Int64(1), PropertyValue::Null]);
1113        assert_eq!(
1114            Value::from(pv),
1115            Value::List(vec![Value::Int64(1), Value::Null])
1116        );
1117    }
1118
1119    #[test]
1120    fn test_value_try_into_property_value_success() {
1121        assert_eq!(
1122            PropertyValue::try_from(Value::Null),
1123            Ok(PropertyValue::Null)
1124        );
1125        assert_eq!(
1126            PropertyValue::try_from(Value::Bool(false)),
1127            Ok(PropertyValue::Bool(false))
1128        );
1129        assert_eq!(
1130            PropertyValue::try_from(Value::Int64(10)),
1131            Ok(PropertyValue::Int64(10))
1132        );
1133        assert_eq!(
1134            PropertyValue::try_from(Value::Float64(1.5)),
1135            Ok(PropertyValue::Float64(1.5))
1136        );
1137        assert_eq!(
1138            PropertyValue::try_from(Value::String("x".into())),
1139            Ok(PropertyValue::String("x".into()))
1140        );
1141        assert_eq!(
1142            PropertyValue::try_from(Value::Bytes(vec![0xAB])),
1143            Ok(PropertyValue::Bytes(vec![0xAB]))
1144        );
1145    }
1146
1147    #[test]
1148    fn test_value_try_into_property_value_list() {
1149        let v = Value::List(vec![Value::Int64(1), Value::Bool(true)]);
1150        let pv = PropertyValue::try_from(v);
1151        assert_eq!(
1152            pv,
1153            Ok(PropertyValue::Array(vec![
1154                PropertyValue::Int64(1),
1155                PropertyValue::Bool(true)
1156            ]))
1157        );
1158    }
1159
1160    #[test]
1161    fn test_value_try_into_property_value_node_fails() {
1162        let result = PropertyValue::try_from(Value::Node(NodeId(1)));
1163        assert!(result.is_err());
1164        assert!(result
1165            .expect_err("should error")
1166            .contains("cannot convert graph entity"));
1167    }
1168
1169    #[test]
1170    fn test_value_try_into_property_value_edge_fails() {
1171        let result = PropertyValue::try_from(Value::Edge(EdgeId(1)));
1172        assert!(result.is_err());
1173    }
1174
1175    #[test]
1176    fn test_execution_error_display() {
1177        let err = ExecutionError {
1178            message: "test error".to_string(),
1179        };
1180        assert_eq!(err.to_string(), "Execution error: test error");
1181    }
1182
1183    #[test]
1184    fn test_execution_error_is_error_trait() {
1185        let err = ExecutionError {
1186            message: "test".to_string(),
1187        };
1188        // Verify it implements std::error::Error
1189        let _: &dyn std::error::Error = &err;
1190    }
1191
1192    #[test]
1193    fn test_record_type_is_hashmap() {
1194        let mut record: Record = Record::new();
1195        record.insert("n".to_string(), Value::Node(NodeId(1)));
1196        assert_eq!(record.get("n"), Some(&Value::Node(NodeId(1))));
1197    }
1198
1199    #[test]
1200    fn test_params_type_is_hashmap() {
1201        let mut params: Params = Params::new();
1202        params.insert("name".to_string(), Value::String("Alice".into()));
1203        assert_eq!(
1204            params.get("name"),
1205            Some(&Value::String("Alice".to_string()))
1206        );
1207    }
1208
1209    #[test]
1210    fn test_eval_count_expr_positive() {
1211        let expr = Expression::Literal(Literal::Integer(10));
1212        assert_eq!(eval_count_expr(&expr), Ok(10));
1213    }
1214
1215    #[test]
1216    fn test_eval_count_expr_zero() {
1217        let expr = Expression::Literal(Literal::Integer(0));
1218        assert_eq!(eval_count_expr(&expr), Ok(0));
1219    }
1220
1221    #[test]
1222    fn test_eval_count_expr_negative_fails() {
1223        let expr = Expression::Literal(Literal::Integer(-5));
1224        assert!(eval_count_expr(&expr).is_err());
1225    }
1226
1227    #[test]
1228    fn test_eval_count_expr_non_integer_fails() {
1229        let expr = Expression::Variable("n".to_string());
1230        assert!(eval_count_expr(&expr).is_err());
1231    }
1232
1233    #[test]
1234    fn test_deduplicate_records() {
1235        let mut r1 = Record::new();
1236        r1.insert("x".to_string(), Value::Int64(1));
1237        let mut r2 = Record::new();
1238        r2.insert("x".to_string(), Value::Int64(2));
1239        let r3 = r1.clone();
1240
1241        let mut records = vec![r1, r2, r3];
1242        deduplicate_records(&mut records);
1243        assert_eq!(records.len(), 2);
1244    }
1245
1246    // ======================================================================
1247    // U-001: Value::DateTime variant
1248    // ======================================================================
1249
1250    #[test]
1251    fn test_value_from_property_value_datetime() {
1252        assert_eq!(
1253            Value::from(PropertyValue::DateTime(1_700_000_000_000)),
1254            Value::DateTime(1_700_000_000_000)
1255        );
1256    }
1257
1258    #[test]
1259    fn test_value_try_into_property_value_datetime() {
1260        assert_eq!(
1261            PropertyValue::try_from(Value::DateTime(1_700_000_000_000)),
1262            Ok(PropertyValue::DateTime(1_700_000_000_000))
1263        );
1264    }
1265
1266    // ======================================================================
1267    // II-002: Value::Subgraph variant (cfg-gated)
1268    // ======================================================================
1269
1270    // ======================================================================
1271    // MM-001: Value::Hyperedge variant (cfg-gated)
1272    // ======================================================================
1273
1274    #[cfg(feature = "hypergraph")]
1275    mod hyperedge_value_tests {
1276        use super::*;
1277        use cypherlite_core::HyperEdgeId;
1278
1279        // MM-001: Value::Hyperedge construction
1280        #[test]
1281        fn test_value_hyperedge_creation() {
1282            let val = Value::Hyperedge(HyperEdgeId(42));
1283            assert_eq!(val, Value::Hyperedge(HyperEdgeId(42)));
1284        }
1285
1286        // MM-001: Value::Hyperedge is not convertible to PropertyValue
1287        #[test]
1288        fn test_value_hyperedge_try_into_property_value_fails() {
1289            let result = PropertyValue::try_from(Value::Hyperedge(HyperEdgeId(1)));
1290            assert!(result.is_err());
1291            assert!(result
1292                .expect_err("should error")
1293                .contains("cannot convert graph entity"));
1294        }
1295
1296        // MM-001: Value::Hyperedge inequality with Node
1297        #[test]
1298        fn test_value_hyperedge_ne_node() {
1299            let node_val = Value::Node(NodeId(1));
1300            let hyperedge_val = Value::Hyperedge(HyperEdgeId(1));
1301            assert_ne!(node_val, hyperedge_val);
1302        }
1303
1304        // MM-001: Value::Hyperedge clone
1305        #[test]
1306        fn test_value_hyperedge_clone() {
1307            let val = Value::Hyperedge(HyperEdgeId(7));
1308            let cloned = val.clone();
1309            assert_eq!(val, cloned);
1310        }
1311
1312        // MM-001: Value::Hyperedge debug
1313        #[test]
1314        fn test_value_hyperedge_debug() {
1315            let val = Value::Hyperedge(HyperEdgeId(99));
1316            let debug = format!("{:?}", val);
1317            assert!(debug.contains("Hyperedge"));
1318            assert!(debug.contains("99"));
1319        }
1320    }
1321
1322    #[cfg(feature = "subgraph")]
1323    mod subgraph_value_tests {
1324        use super::*;
1325        use cypherlite_core::SubgraphId;
1326
1327        // II-002: Value::Subgraph construction
1328        #[test]
1329        fn test_value_subgraph_creation() {
1330            let val = Value::Subgraph(SubgraphId(42));
1331            assert_eq!(val, Value::Subgraph(SubgraphId(42)));
1332        }
1333
1334        // II-002: Value::Subgraph is not convertible to PropertyValue
1335        #[test]
1336        fn test_value_subgraph_try_into_property_value_fails() {
1337            let result = PropertyValue::try_from(Value::Subgraph(SubgraphId(1)));
1338            assert!(result.is_err());
1339            assert!(result
1340                .expect_err("should error")
1341                .contains("cannot convert graph entity"));
1342        }
1343
1344        // II-002: Value::Subgraph inequality with Node
1345        #[test]
1346        fn test_value_subgraph_ne_node() {
1347            let node_val = Value::Node(NodeId(1));
1348            let subgraph_val = Value::Subgraph(SubgraphId(1));
1349            assert_ne!(node_val, subgraph_val);
1350        }
1351
1352        // II-002: Value::Subgraph clone
1353        #[test]
1354        fn test_value_subgraph_clone() {
1355            let val = Value::Subgraph(SubgraphId(7));
1356            let cloned = val.clone();
1357            assert_eq!(val, cloned);
1358        }
1359
1360        // II-002: Value::Subgraph debug
1361        #[test]
1362        fn test_value_subgraph_debug() {
1363            let val = Value::Subgraph(SubgraphId(99));
1364            let debug = format!("{:?}", val);
1365            assert!(debug.contains("Subgraph"));
1366            assert!(debug.contains("99"));
1367        }
1368    }
1369}