Skip to main content

cypherlite_query/executor/operators/
create.rs

1// CreateOp: node and edge creation via storage engine
2
3use crate::executor::eval::eval;
4use crate::executor::{ExecutionError, Params, Record, ScalarFnLookup, TriggerLookup, Value};
5use crate::parser::ast::*;
6use cypherlite_core::{LabelRegistry, PropertyValue};
7use cypherlite_storage::StorageEngine;
8
9/// System property names that are automatically managed.
10pub const SYSTEM_PROP_CREATED_AT: &str = "_created_at";
11/// System property name for last-updated timestamp.
12pub const SYSTEM_PROP_UPDATED_AT: &str = "_updated_at";
13/// Temporal edge property: validity start timestamp.
14pub const TEMPORAL_PROP_VALID_FROM: &str = "_valid_from";
15/// Temporal edge property: validity end timestamp.
16pub const TEMPORAL_PROP_VALID_TO: &str = "_valid_to";
17
18/// Check if a property name is a system-managed (read-only) property.
19/// Note: _valid_from and _valid_to are temporal but user-settable, so they are NOT system properties.
20pub fn is_system_property(name: &str) -> bool {
21    name == SYSTEM_PROP_CREATED_AT || name == SYSTEM_PROP_UPDATED_AT
22}
23
24/// Check if a property name is a temporal edge property (user-settable).
25pub fn is_temporal_edge_property(name: &str) -> bool {
26    name == TEMPORAL_PROP_VALID_FROM || name == TEMPORAL_PROP_VALID_TO
27}
28
29/// Get the current query timestamp from params.
30fn get_query_timestamp(params: &Params) -> i64 {
31    match params.get("__query_start_ms__") {
32        Some(Value::Int64(ms)) => *ms,
33        _ => std::time::SystemTime::now()
34            .duration_since(std::time::UNIX_EPOCH)
35            .map(|d| d.as_millis() as i64)
36            .unwrap_or(0),
37    }
38}
39
40/// Inject _created_at and _updated_at into property list.
41fn inject_create_timestamps(
42    properties: &mut Vec<(u32, PropertyValue)>,
43    engine: &mut StorageEngine,
44    params: &Params,
45) {
46    let now = get_query_timestamp(params);
47    let created_key = engine.get_or_create_prop_key(SYSTEM_PROP_CREATED_AT);
48    let updated_key = engine.get_or_create_prop_key(SYSTEM_PROP_UPDATED_AT);
49    properties.push((created_key, PropertyValue::DateTime(now)));
50    properties.push((updated_key, PropertyValue::DateTime(now)));
51}
52
53/// Inject _valid_from on edge creation if not already provided by user.
54fn inject_edge_valid_from(
55    properties: &mut Vec<(u32, PropertyValue)>,
56    engine: &mut StorageEngine,
57    params: &Params,
58) {
59    let valid_from_key = engine.get_or_create_prop_key(TEMPORAL_PROP_VALID_FROM);
60    // Only inject if user didn't already set _valid_from
61    let already_set = properties.iter().any(|(k, _)| *k == valid_from_key);
62    if !already_set {
63        let now = get_query_timestamp(params);
64        properties.push((valid_from_key, PropertyValue::DateTime(now)));
65    }
66}
67
68/// Validate that no system properties are being set by the user in a map literal.
69pub fn validate_no_system_properties(props: &Option<MapLiteral>) -> Result<(), ExecutionError> {
70    if let Some(map) = props {
71        for (key, _) in map {
72            if is_system_property(key) {
73                return Err(ExecutionError {
74                    message: format!("System property is read-only: {}", key),
75                });
76            }
77        }
78    }
79    Ok(())
80}
81
82/// Create nodes and edges from a pattern.
83/// Walks each pattern chain, creating nodes and edges as specified.
84pub fn execute_create(
85    source_records: Vec<Record>,
86    pattern: &Pattern,
87    engine: &mut StorageEngine,
88    params: &Params,
89    scalar_fns: &dyn ScalarFnLookup,
90    trigger_fns: &dyn TriggerLookup,
91) -> Result<Vec<Record>, ExecutionError> {
92    let mut results = Vec::new();
93
94    for record in source_records {
95        let mut new_record = record.clone();
96
97        for chain in &pattern.chains {
98            create_chain(
99                chain,
100                &mut new_record,
101                engine,
102                params,
103                scalar_fns,
104                trigger_fns,
105            )?;
106        }
107
108        results.push(new_record);
109    }
110
111    Ok(results)
112}
113
114/// Build a TriggerContext for a node.
115fn build_node_trigger_context(
116    entity_id: u64,
117    label: Option<&str>,
118    properties: &[(u32, PropertyValue)],
119    engine: &StorageEngine,
120    operation: cypherlite_core::TriggerOperation,
121) -> cypherlite_core::TriggerContext {
122    let props_map = properties
123        .iter()
124        .map(|(k, v)| {
125            let name = engine
126                .catalog()
127                .prop_key_name(*k)
128                .unwrap_or("?")
129                .to_string();
130            (name, v.clone())
131        })
132        .collect();
133    cypherlite_core::TriggerContext {
134        entity_type: cypherlite_core::EntityType::Node,
135        entity_id,
136        label_or_type: label.map(|s| s.to_string()),
137        properties: props_map,
138        operation,
139    }
140}
141
142/// Build a TriggerContext for an edge.
143fn build_edge_trigger_context(
144    entity_id: u64,
145    rel_type: Option<&str>,
146    properties: &[(u32, PropertyValue)],
147    engine: &StorageEngine,
148    operation: cypherlite_core::TriggerOperation,
149) -> cypherlite_core::TriggerContext {
150    let props_map = properties
151        .iter()
152        .map(|(k, v)| {
153            let name = engine
154                .catalog()
155                .prop_key_name(*k)
156                .unwrap_or("?")
157                .to_string();
158            (name, v.clone())
159        })
160        .collect();
161    cypherlite_core::TriggerContext {
162        entity_type: cypherlite_core::EntityType::Edge,
163        entity_id,
164        label_or_type: rel_type.map(|s| s.to_string()),
165        properties: props_map,
166        operation,
167    }
168}
169
170/// Create nodes and edges from a single pattern chain.
171fn create_chain(
172    chain: &PatternChain,
173    record: &mut Record,
174    engine: &mut StorageEngine,
175    params: &Params,
176    scalar_fns: &dyn ScalarFnLookup,
177    trigger_fns: &dyn TriggerLookup,
178) -> Result<(), ExecutionError> {
179    let mut elements = chain.elements.iter();
180    let mut prev_var: Option<String> = None;
181    let temporal_enabled = engine.config().temporal_tracking_enabled;
182
183    while let Some(element) = elements.next() {
184        match element {
185            PatternElement::Node(np) => {
186                let var_name = np.variable.as_deref().unwrap_or("");
187
188                // If variable already bound in record, skip node creation
189                if !var_name.is_empty() && record.contains_key(var_name) {
190                    prev_var = Some(var_name.to_string());
191                    continue;
192                }
193
194                // Validate no system properties in user-specified properties
195                validate_no_system_properties(&np.properties)?;
196
197                // Resolve labels
198                let labels: Vec<u32> = np
199                    .labels
200                    .iter()
201                    .map(|l| engine.get_or_create_label(l))
202                    .collect();
203
204                // Resolve properties
205                let mut properties =
206                    resolve_properties(&np.properties, record, engine, params, scalar_fns)?;
207
208                // Inject timestamps if temporal tracking is enabled
209                if temporal_enabled {
210                    inject_create_timestamps(&mut properties, engine, params);
211                }
212
213                // Fire before_create trigger
214                let first_label = np.labels.first().map(|s| s.as_str());
215                let before_ctx = build_node_trigger_context(
216                    0,
217                    first_label,
218                    &properties,
219                    engine,
220                    cypherlite_core::TriggerOperation::Create,
221                );
222                trigger_fns.fire_before_create(&before_ctx)?;
223
224                let node_id = engine.create_node(labels, properties.clone());
225
226                // Fire after_create trigger with actual node_id
227                let after_ctx = build_node_trigger_context(
228                    node_id.0,
229                    first_label,
230                    &properties,
231                    engine,
232                    cypherlite_core::TriggerOperation::Create,
233                );
234                trigger_fns.fire_after_create(&after_ctx)?;
235
236                if !var_name.is_empty() {
237                    record.insert(var_name.to_string(), Value::Node(node_id));
238                }
239                prev_var = if var_name.is_empty() {
240                    None
241                } else {
242                    Some(var_name.to_string())
243                };
244            }
245            PatternElement::Relationship(rp) => {
246                // Next element should be a node
247                let next_node = elements.next().ok_or_else(|| ExecutionError {
248                    message: "relationship must be followed by a node in CREATE pattern"
249                        .to_string(),
250                })?;
251
252                let target_np = match next_node {
253                    PatternElement::Node(np) => np,
254                    _ => {
255                        return Err(ExecutionError {
256                            message: "expected node after relationship in CREATE".to_string(),
257                        })
258                    }
259                };
260
261                // Create or resolve target node
262                let target_var_name = target_np.variable.as_deref().unwrap_or("");
263                let target_node_id = if !target_var_name.is_empty()
264                    && record.contains_key(target_var_name)
265                {
266                    match record.get(target_var_name) {
267                        Some(Value::Node(nid)) => *nid,
268                        _ => {
269                            return Err(ExecutionError {
270                                message: format!("variable '{}' is not a node", target_var_name),
271                            })
272                        }
273                    }
274                } else {
275                    // Validate no system properties in target node
276                    validate_no_system_properties(&target_np.properties)?;
277
278                    let labels: Vec<u32> = target_np
279                        .labels
280                        .iter()
281                        .map(|l| engine.get_or_create_label(l))
282                        .collect();
283                    let mut properties = resolve_properties(
284                        &target_np.properties,
285                        record,
286                        engine,
287                        params,
288                        scalar_fns,
289                    )?;
290
291                    if temporal_enabled {
292                        inject_create_timestamps(&mut properties, engine, params);
293                    }
294
295                    // Fire before_create trigger for target node
296                    let first_label = target_np.labels.first().map(|s| s.as_str());
297                    let before_ctx = build_node_trigger_context(
298                        0,
299                        first_label,
300                        &properties,
301                        engine,
302                        cypherlite_core::TriggerOperation::Create,
303                    );
304                    trigger_fns.fire_before_create(&before_ctx)?;
305
306                    let nid = engine.create_node(labels, properties.clone());
307
308                    // Fire after_create trigger for target node
309                    let after_ctx = build_node_trigger_context(
310                        nid.0,
311                        first_label,
312                        &properties,
313                        engine,
314                        cypherlite_core::TriggerOperation::Create,
315                    );
316                    trigger_fns.fire_after_create(&after_ctx)?;
317
318                    if !target_var_name.is_empty() {
319                        record.insert(target_var_name.to_string(), Value::Node(nid));
320                    }
321                    nid
322                };
323
324                // Resolve source node
325                let src_node_id = match &prev_var {
326                    Some(pv) => match record.get(pv) {
327                        Some(Value::Node(nid)) => *nid,
328                        _ => {
329                            return Err(ExecutionError {
330                                message: format!("variable '{}' is not a node", pv),
331                            })
332                        }
333                    },
334                    None => {
335                        return Err(ExecutionError {
336                            message: "no source node for relationship in CREATE".to_string(),
337                        })
338                    }
339                };
340
341                // Resolve relationship type
342                let rel_type_name = rp.rel_types.first().ok_or_else(|| ExecutionError {
343                    message: "CREATE relationship requires a type".to_string(),
344                })?;
345                let rel_type_id = engine.get_or_create_rel_type(rel_type_name);
346
347                // Validate no system properties in relationship
348                validate_no_system_properties(&rp.properties)?;
349
350                // Resolve relationship properties
351                let mut rel_props =
352                    resolve_properties(&rp.properties, record, engine, params, scalar_fns)?;
353
354                if temporal_enabled {
355                    inject_create_timestamps(&mut rel_props, engine, params);
356                    // BB-T3: Auto-inject _valid_from on edge CREATE
357                    inject_edge_valid_from(&mut rel_props, engine, params);
358                }
359
360                // Create edge based on direction
361                let (start, end) = match rp.direction {
362                    RelDirection::Outgoing | RelDirection::Undirected => {
363                        (src_node_id, target_node_id)
364                    }
365                    RelDirection::Incoming => (target_node_id, src_node_id),
366                };
367
368                // Fire before_create trigger for edge
369                let before_edge_ctx = build_edge_trigger_context(
370                    0,
371                    Some(rel_type_name),
372                    &rel_props,
373                    engine,
374                    cypherlite_core::TriggerOperation::Create,
375                );
376                trigger_fns.fire_before_create(&before_edge_ctx)?;
377
378                let edge_id = engine
379                    .create_edge(start, end, rel_type_id, rel_props.clone())
380                    .map_err(|e| ExecutionError {
381                        message: format!("failed to create edge: {}", e),
382                    })?;
383
384                // Fire after_create trigger for edge
385                let after_edge_ctx = build_edge_trigger_context(
386                    edge_id.0,
387                    Some(rel_type_name),
388                    &rel_props,
389                    engine,
390                    cypherlite_core::TriggerOperation::Create,
391                );
392                trigger_fns.fire_after_create(&after_edge_ctx)?;
393
394                if let Some(rv) = &rp.variable {
395                    record.insert(rv.clone(), Value::Edge(edge_id));
396                }
397
398                prev_var = if target_var_name.is_empty() {
399                    None
400                } else {
401                    Some(target_var_name.to_string())
402                };
403            }
404        }
405    }
406
407    Ok(())
408}
409
410/// Resolve properties from a MapLiteral, evaluating expressions.
411fn resolve_properties(
412    props: &Option<MapLiteral>,
413    record: &Record,
414    engine: &StorageEngine,
415    params: &Params,
416    scalar_fns: &dyn ScalarFnLookup,
417) -> Result<Vec<(u32, PropertyValue)>, ExecutionError> {
418    match props {
419        None => Ok(vec![]),
420        Some(map) => {
421            let mut result = Vec::new();
422            for (key, expr) in map {
423                let value = eval(expr, record, engine, params, scalar_fns)?;
424                let pv = PropertyValue::try_from(value).map_err(|e| ExecutionError {
425                    message: format!("invalid property value for '{}': {}", key, e),
426                })?;
427                let key_id = engine.catalog().prop_key_id(key).unwrap_or(0);
428                result.push((key_id, pv));
429            }
430            Ok(result)
431        }
432    }
433}
434
435/// Resolve properties from a MapLiteral using mutable engine access.
436/// This is the preferred version that can register new property keys.
437pub fn resolve_properties_mut(
438    props: &Option<MapLiteral>,
439    record: &Record,
440    engine: &mut StorageEngine,
441    params: &Params,
442    scalar_fns: &dyn ScalarFnLookup,
443) -> Result<Vec<(u32, PropertyValue)>, ExecutionError> {
444    match props {
445        None => Ok(vec![]),
446        Some(map) => {
447            let mut result = Vec::new();
448            for (key, expr) in map {
449                let value = eval(expr, record, &*engine, params, scalar_fns)?;
450                let pv = PropertyValue::try_from(value).map_err(|e| ExecutionError {
451                    message: format!("invalid property value for '{}': {}", key, e),
452                })?;
453                let key_id = engine.get_or_create_prop_key(key);
454                result.push((key_id, pv));
455            }
456            Ok(result)
457        }
458    }
459}
460
461#[cfg(test)]
462mod tests {
463    use super::*;
464    use cypherlite_core::{DatabaseConfig, SyncMode};
465    use tempfile::tempdir;
466
467    fn test_engine(dir: &std::path::Path) -> StorageEngine {
468        let config = DatabaseConfig {
469            path: dir.join("test.cyl"),
470            wal_sync_mode: SyncMode::Normal,
471            ..Default::default()
472        };
473        StorageEngine::open(config).expect("open")
474    }
475
476    // EXEC-T005: CreateOp node creation
477    #[test]
478    fn test_create_single_node() {
479        let dir = tempdir().expect("tempdir");
480        let mut engine = test_engine(dir.path());
481
482        let pattern = Pattern {
483            chains: vec![PatternChain {
484                elements: vec![PatternElement::Node(NodePattern {
485                    variable: Some("n".to_string()),
486                    labels: vec!["Person".to_string()],
487                    properties: None,
488                })],
489            }],
490        };
491
492        let params = Params::new();
493        let result = execute_create(
494            vec![Record::new()],
495            &pattern,
496            &mut engine,
497            &params,
498            &(),
499            &(),
500        );
501        let records = result.expect("should succeed");
502        assert_eq!(records.len(), 1);
503        assert!(records[0].contains_key("n"));
504        assert!(matches!(records[0].get("n"), Some(Value::Node(_))));
505
506        // Verify node was created in engine
507        assert_eq!(engine.node_count(), 1);
508    }
509
510    #[test]
511    fn test_create_node_with_properties() {
512        let dir = tempdir().expect("tempdir");
513        let mut engine = test_engine(dir.path());
514
515        // Pre-register property key
516        let _name_key = engine.get_or_create_prop_key("name");
517
518        let pattern = Pattern {
519            chains: vec![PatternChain {
520                elements: vec![PatternElement::Node(NodePattern {
521                    variable: Some("n".to_string()),
522                    labels: vec!["Person".to_string()],
523                    properties: Some(vec![(
524                        "name".to_string(),
525                        Expression::Literal(Literal::String("Alice".into())),
526                    )]),
527                })],
528            }],
529        };
530
531        let params = Params::new();
532        let result = execute_create(
533            vec![Record::new()],
534            &pattern,
535            &mut engine,
536            &params,
537            &(),
538            &(),
539        );
540        let records = result.expect("should succeed");
541        assert_eq!(records.len(), 1);
542
543        // Verify node properties
544        if let Some(Value::Node(nid)) = records[0].get("n") {
545            let node = engine.get_node(*nid).expect("node exists");
546            assert!(!node.properties.is_empty());
547        } else {
548            panic!("expected node value");
549        }
550    }
551
552    #[test]
553    fn test_create_node_and_relationship() {
554        let dir = tempdir().expect("tempdir");
555        let mut engine = test_engine(dir.path());
556
557        let pattern = Pattern {
558            chains: vec![PatternChain {
559                elements: vec![
560                    PatternElement::Node(NodePattern {
561                        variable: Some("a".to_string()),
562                        labels: vec!["Person".to_string()],
563                        properties: None,
564                    }),
565                    PatternElement::Relationship(RelationshipPattern {
566                        variable: Some("r".to_string()),
567                        rel_types: vec!["KNOWS".to_string()],
568                        direction: RelDirection::Outgoing,
569                        properties: None,
570                        min_hops: None,
571                        max_hops: None,
572                    }),
573                    PatternElement::Node(NodePattern {
574                        variable: Some("b".to_string()),
575                        labels: vec!["Person".to_string()],
576                        properties: None,
577                    }),
578                ],
579            }],
580        };
581
582        let params = Params::new();
583        let result = execute_create(
584            vec![Record::new()],
585            &pattern,
586            &mut engine,
587            &params,
588            &(),
589            &(),
590        );
591        let records = result.expect("should succeed");
592        assert_eq!(records.len(), 1);
593        assert!(records[0].contains_key("a"));
594        assert!(records[0].contains_key("r"));
595        assert!(records[0].contains_key("b"));
596        assert_eq!(engine.node_count(), 2);
597        assert_eq!(engine.edge_count(), 1);
598    }
599
600    // BB-T1: is_system_property does NOT include _valid_from/_valid_to
601    #[test]
602    fn test_valid_from_is_not_system_property() {
603        assert!(!is_system_property("_valid_from"));
604        assert!(!is_system_property("_valid_to"));
605    }
606
607    // BB-T1: is_temporal_edge_property recognizes _valid_from/_valid_to
608    #[test]
609    fn test_temporal_edge_property_detection() {
610        assert!(is_temporal_edge_property("_valid_from"));
611        assert!(is_temporal_edge_property("_valid_to"));
612        assert!(!is_temporal_edge_property("_created_at"));
613        assert!(!is_temporal_edge_property("name"));
614    }
615
616    // BB-T3: Edge CREATE injects _valid_from
617    #[test]
618    fn test_create_edge_injects_valid_from() {
619        let dir = tempdir().expect("tempdir");
620        let mut engine = test_engine(dir.path());
621
622        let pattern = Pattern {
623            chains: vec![PatternChain {
624                elements: vec![
625                    PatternElement::Node(NodePattern {
626                        variable: Some("a".to_string()),
627                        labels: vec!["Person".to_string()],
628                        properties: None,
629                    }),
630                    PatternElement::Relationship(RelationshipPattern {
631                        variable: Some("r".to_string()),
632                        rel_types: vec!["KNOWS".to_string()],
633                        direction: RelDirection::Outgoing,
634                        properties: None,
635                        min_hops: None,
636                        max_hops: None,
637                    }),
638                    PatternElement::Node(NodePattern {
639                        variable: Some("b".to_string()),
640                        labels: vec!["Person".to_string()],
641                        properties: None,
642                    }),
643                ],
644            }],
645        };
646
647        let mut params = Params::new();
648        params.insert(
649            "__query_start_ms__".to_string(),
650            Value::Int64(1_700_000_000_000),
651        );
652        let result = execute_create(
653            vec![Record::new()],
654            &pattern,
655            &mut engine,
656            &params,
657            &(),
658            &(),
659        );
660        let records = result.expect("should succeed");
661
662        // Get the edge and verify it has _valid_from
663        if let Some(Value::Edge(eid)) = records[0].get("r") {
664            let edge = engine.get_edge(*eid).expect("edge exists");
665            let valid_from_key = engine
666                .catalog()
667                .prop_key_id("_valid_from")
668                .expect("_valid_from key");
669            let has_valid_from = edge.properties.iter().any(|(k, _)| *k == valid_from_key);
670            assert!(has_valid_from, "edge should have _valid_from property");
671
672            // Also verify _created_at and _updated_at
673            let created_key = engine
674                .catalog()
675                .prop_key_id("_created_at")
676                .expect("_created_at key");
677            let updated_key = engine
678                .catalog()
679                .prop_key_id("_updated_at")
680                .expect("_updated_at key");
681            assert!(edge.properties.iter().any(|(k, _)| *k == created_key));
682            assert!(edge.properties.iter().any(|(k, _)| *k == updated_key));
683        } else {
684            panic!("expected edge value for 'r'");
685        }
686    }
687
688    #[test]
689    fn test_create_reuses_existing_variable() {
690        let dir = tempdir().expect("tempdir");
691        let mut engine = test_engine(dir.path());
692
693        // Pre-create a node bound to "a"
694        let existing_id = engine.create_node(vec![], vec![]);
695        let mut initial_record = Record::new();
696        initial_record.insert("a".to_string(), Value::Node(existing_id));
697
698        let pattern = Pattern {
699            chains: vec![PatternChain {
700                elements: vec![
701                    PatternElement::Node(NodePattern {
702                        variable: Some("a".to_string()),
703                        labels: vec![],
704                        properties: None,
705                    }),
706                    PatternElement::Relationship(RelationshipPattern {
707                        variable: None,
708                        rel_types: vec!["KNOWS".to_string()],
709                        direction: RelDirection::Outgoing,
710                        properties: None,
711                        min_hops: None,
712                        max_hops: None,
713                    }),
714                    PatternElement::Node(NodePattern {
715                        variable: Some("b".to_string()),
716                        labels: vec![],
717                        properties: None,
718                    }),
719                ],
720            }],
721        };
722
723        let params = Params::new();
724        let result = execute_create(
725            vec![initial_record],
726            &pattern,
727            &mut engine,
728            &params,
729            &(),
730            &(),
731        );
732        let records = result.expect("should succeed");
733
734        // Should reuse existing node "a" and create only "b"
735        assert_eq!(engine.node_count(), 2); // existing + new b
736        assert_eq!(records[0].get("a"), Some(&Value::Node(existing_id)));
737    }
738}