1use crate::executor::eval::eval;
4use crate::executor::{ExecutionError, Params, Record, ScalarFnLookup, TriggerLookup, Value};
5use crate::parser::ast::*;
6use cypherlite_core::{LabelRegistry, PropertyValue};
7use cypherlite_storage::StorageEngine;
8
9pub const SYSTEM_PROP_CREATED_AT: &str = "_created_at";
11pub const SYSTEM_PROP_UPDATED_AT: &str = "_updated_at";
13pub const TEMPORAL_PROP_VALID_FROM: &str = "_valid_from";
15pub const TEMPORAL_PROP_VALID_TO: &str = "_valid_to";
17
18pub fn is_system_property(name: &str) -> bool {
21 name == SYSTEM_PROP_CREATED_AT || name == SYSTEM_PROP_UPDATED_AT
22}
23
24pub fn is_temporal_edge_property(name: &str) -> bool {
26 name == TEMPORAL_PROP_VALID_FROM || name == TEMPORAL_PROP_VALID_TO
27}
28
29fn get_query_timestamp(params: &Params) -> i64 {
31 match params.get("__query_start_ms__") {
32 Some(Value::Int64(ms)) => *ms,
33 _ => std::time::SystemTime::now()
34 .duration_since(std::time::UNIX_EPOCH)
35 .map(|d| d.as_millis() as i64)
36 .unwrap_or(0),
37 }
38}
39
40fn inject_create_timestamps(
42 properties: &mut Vec<(u32, PropertyValue)>,
43 engine: &mut StorageEngine,
44 params: &Params,
45) {
46 let now = get_query_timestamp(params);
47 let created_key = engine.get_or_create_prop_key(SYSTEM_PROP_CREATED_AT);
48 let updated_key = engine.get_or_create_prop_key(SYSTEM_PROP_UPDATED_AT);
49 properties.push((created_key, PropertyValue::DateTime(now)));
50 properties.push((updated_key, PropertyValue::DateTime(now)));
51}
52
53fn inject_edge_valid_from(
55 properties: &mut Vec<(u32, PropertyValue)>,
56 engine: &mut StorageEngine,
57 params: &Params,
58) {
59 let valid_from_key = engine.get_or_create_prop_key(TEMPORAL_PROP_VALID_FROM);
60 let already_set = properties.iter().any(|(k, _)| *k == valid_from_key);
62 if !already_set {
63 let now = get_query_timestamp(params);
64 properties.push((valid_from_key, PropertyValue::DateTime(now)));
65 }
66}
67
68pub fn validate_no_system_properties(props: &Option<MapLiteral>) -> Result<(), ExecutionError> {
70 if let Some(map) = props {
71 for (key, _) in map {
72 if is_system_property(key) {
73 return Err(ExecutionError {
74 message: format!("System property is read-only: {}", key),
75 });
76 }
77 }
78 }
79 Ok(())
80}
81
82pub fn execute_create(
85 source_records: Vec<Record>,
86 pattern: &Pattern,
87 engine: &mut StorageEngine,
88 params: &Params,
89 scalar_fns: &dyn ScalarFnLookup,
90 trigger_fns: &dyn TriggerLookup,
91) -> Result<Vec<Record>, ExecutionError> {
92 let mut results = Vec::new();
93
94 for record in source_records {
95 let mut new_record = record.clone();
96
97 for chain in &pattern.chains {
98 create_chain(
99 chain,
100 &mut new_record,
101 engine,
102 params,
103 scalar_fns,
104 trigger_fns,
105 )?;
106 }
107
108 results.push(new_record);
109 }
110
111 Ok(results)
112}
113
114fn build_node_trigger_context(
116 entity_id: u64,
117 label: Option<&str>,
118 properties: &[(u32, PropertyValue)],
119 engine: &StorageEngine,
120 operation: cypherlite_core::TriggerOperation,
121) -> cypherlite_core::TriggerContext {
122 let props_map = properties
123 .iter()
124 .map(|(k, v)| {
125 let name = engine
126 .catalog()
127 .prop_key_name(*k)
128 .unwrap_or("?")
129 .to_string();
130 (name, v.clone())
131 })
132 .collect();
133 cypherlite_core::TriggerContext {
134 entity_type: cypherlite_core::EntityType::Node,
135 entity_id,
136 label_or_type: label.map(|s| s.to_string()),
137 properties: props_map,
138 operation,
139 }
140}
141
142fn build_edge_trigger_context(
144 entity_id: u64,
145 rel_type: Option<&str>,
146 properties: &[(u32, PropertyValue)],
147 engine: &StorageEngine,
148 operation: cypherlite_core::TriggerOperation,
149) -> cypherlite_core::TriggerContext {
150 let props_map = properties
151 .iter()
152 .map(|(k, v)| {
153 let name = engine
154 .catalog()
155 .prop_key_name(*k)
156 .unwrap_or("?")
157 .to_string();
158 (name, v.clone())
159 })
160 .collect();
161 cypherlite_core::TriggerContext {
162 entity_type: cypherlite_core::EntityType::Edge,
163 entity_id,
164 label_or_type: rel_type.map(|s| s.to_string()),
165 properties: props_map,
166 operation,
167 }
168}
169
170fn create_chain(
172 chain: &PatternChain,
173 record: &mut Record,
174 engine: &mut StorageEngine,
175 params: &Params,
176 scalar_fns: &dyn ScalarFnLookup,
177 trigger_fns: &dyn TriggerLookup,
178) -> Result<(), ExecutionError> {
179 let mut elements = chain.elements.iter();
180 let mut prev_var: Option<String> = None;
181 let temporal_enabled = engine.config().temporal_tracking_enabled;
182
183 while let Some(element) = elements.next() {
184 match element {
185 PatternElement::Node(np) => {
186 let var_name = np.variable.as_deref().unwrap_or("");
187
188 if !var_name.is_empty() && record.contains_key(var_name) {
190 prev_var = Some(var_name.to_string());
191 continue;
192 }
193
194 validate_no_system_properties(&np.properties)?;
196
197 let labels: Vec<u32> = np
199 .labels
200 .iter()
201 .map(|l| engine.get_or_create_label(l))
202 .collect();
203
204 let mut properties =
206 resolve_properties(&np.properties, record, engine, params, scalar_fns)?;
207
208 if temporal_enabled {
210 inject_create_timestamps(&mut properties, engine, params);
211 }
212
213 let first_label = np.labels.first().map(|s| s.as_str());
215 let before_ctx = build_node_trigger_context(
216 0,
217 first_label,
218 &properties,
219 engine,
220 cypherlite_core::TriggerOperation::Create,
221 );
222 trigger_fns.fire_before_create(&before_ctx)?;
223
224 let node_id = engine.create_node(labels, properties.clone());
225
226 let after_ctx = build_node_trigger_context(
228 node_id.0,
229 first_label,
230 &properties,
231 engine,
232 cypherlite_core::TriggerOperation::Create,
233 );
234 trigger_fns.fire_after_create(&after_ctx)?;
235
236 if !var_name.is_empty() {
237 record.insert(var_name.to_string(), Value::Node(node_id));
238 }
239 prev_var = if var_name.is_empty() {
240 None
241 } else {
242 Some(var_name.to_string())
243 };
244 }
245 PatternElement::Relationship(rp) => {
246 let next_node = elements.next().ok_or_else(|| ExecutionError {
248 message: "relationship must be followed by a node in CREATE pattern"
249 .to_string(),
250 })?;
251
252 let target_np = match next_node {
253 PatternElement::Node(np) => np,
254 _ => {
255 return Err(ExecutionError {
256 message: "expected node after relationship in CREATE".to_string(),
257 })
258 }
259 };
260
261 let target_var_name = target_np.variable.as_deref().unwrap_or("");
263 let target_node_id = if !target_var_name.is_empty()
264 && record.contains_key(target_var_name)
265 {
266 match record.get(target_var_name) {
267 Some(Value::Node(nid)) => *nid,
268 _ => {
269 return Err(ExecutionError {
270 message: format!("variable '{}' is not a node", target_var_name),
271 })
272 }
273 }
274 } else {
275 validate_no_system_properties(&target_np.properties)?;
277
278 let labels: Vec<u32> = target_np
279 .labels
280 .iter()
281 .map(|l| engine.get_or_create_label(l))
282 .collect();
283 let mut properties = resolve_properties(
284 &target_np.properties,
285 record,
286 engine,
287 params,
288 scalar_fns,
289 )?;
290
291 if temporal_enabled {
292 inject_create_timestamps(&mut properties, engine, params);
293 }
294
295 let first_label = target_np.labels.first().map(|s| s.as_str());
297 let before_ctx = build_node_trigger_context(
298 0,
299 first_label,
300 &properties,
301 engine,
302 cypherlite_core::TriggerOperation::Create,
303 );
304 trigger_fns.fire_before_create(&before_ctx)?;
305
306 let nid = engine.create_node(labels, properties.clone());
307
308 let after_ctx = build_node_trigger_context(
310 nid.0,
311 first_label,
312 &properties,
313 engine,
314 cypherlite_core::TriggerOperation::Create,
315 );
316 trigger_fns.fire_after_create(&after_ctx)?;
317
318 if !target_var_name.is_empty() {
319 record.insert(target_var_name.to_string(), Value::Node(nid));
320 }
321 nid
322 };
323
324 let src_node_id = match &prev_var {
326 Some(pv) => match record.get(pv) {
327 Some(Value::Node(nid)) => *nid,
328 _ => {
329 return Err(ExecutionError {
330 message: format!("variable '{}' is not a node", pv),
331 })
332 }
333 },
334 None => {
335 return Err(ExecutionError {
336 message: "no source node for relationship in CREATE".to_string(),
337 })
338 }
339 };
340
341 let rel_type_name = rp.rel_types.first().ok_or_else(|| ExecutionError {
343 message: "CREATE relationship requires a type".to_string(),
344 })?;
345 let rel_type_id = engine.get_or_create_rel_type(rel_type_name);
346
347 validate_no_system_properties(&rp.properties)?;
349
350 let mut rel_props =
352 resolve_properties(&rp.properties, record, engine, params, scalar_fns)?;
353
354 if temporal_enabled {
355 inject_create_timestamps(&mut rel_props, engine, params);
356 inject_edge_valid_from(&mut rel_props, engine, params);
358 }
359
360 let (start, end) = match rp.direction {
362 RelDirection::Outgoing | RelDirection::Undirected => {
363 (src_node_id, target_node_id)
364 }
365 RelDirection::Incoming => (target_node_id, src_node_id),
366 };
367
368 let before_edge_ctx = build_edge_trigger_context(
370 0,
371 Some(rel_type_name),
372 &rel_props,
373 engine,
374 cypherlite_core::TriggerOperation::Create,
375 );
376 trigger_fns.fire_before_create(&before_edge_ctx)?;
377
378 let edge_id = engine
379 .create_edge(start, end, rel_type_id, rel_props.clone())
380 .map_err(|e| ExecutionError {
381 message: format!("failed to create edge: {}", e),
382 })?;
383
384 let after_edge_ctx = build_edge_trigger_context(
386 edge_id.0,
387 Some(rel_type_name),
388 &rel_props,
389 engine,
390 cypherlite_core::TriggerOperation::Create,
391 );
392 trigger_fns.fire_after_create(&after_edge_ctx)?;
393
394 if let Some(rv) = &rp.variable {
395 record.insert(rv.clone(), Value::Edge(edge_id));
396 }
397
398 prev_var = if target_var_name.is_empty() {
399 None
400 } else {
401 Some(target_var_name.to_string())
402 };
403 }
404 }
405 }
406
407 Ok(())
408}
409
410fn resolve_properties(
412 props: &Option<MapLiteral>,
413 record: &Record,
414 engine: &StorageEngine,
415 params: &Params,
416 scalar_fns: &dyn ScalarFnLookup,
417) -> Result<Vec<(u32, PropertyValue)>, ExecutionError> {
418 match props {
419 None => Ok(vec![]),
420 Some(map) => {
421 let mut result = Vec::new();
422 for (key, expr) in map {
423 let value = eval(expr, record, engine, params, scalar_fns)?;
424 let pv = PropertyValue::try_from(value).map_err(|e| ExecutionError {
425 message: format!("invalid property value for '{}': {}", key, e),
426 })?;
427 let key_id = engine.catalog().prop_key_id(key).unwrap_or(0);
428 result.push((key_id, pv));
429 }
430 Ok(result)
431 }
432 }
433}
434
435pub fn resolve_properties_mut(
438 props: &Option<MapLiteral>,
439 record: &Record,
440 engine: &mut StorageEngine,
441 params: &Params,
442 scalar_fns: &dyn ScalarFnLookup,
443) -> Result<Vec<(u32, PropertyValue)>, ExecutionError> {
444 match props {
445 None => Ok(vec![]),
446 Some(map) => {
447 let mut result = Vec::new();
448 for (key, expr) in map {
449 let value = eval(expr, record, &*engine, params, scalar_fns)?;
450 let pv = PropertyValue::try_from(value).map_err(|e| ExecutionError {
451 message: format!("invalid property value for '{}': {}", key, e),
452 })?;
453 let key_id = engine.get_or_create_prop_key(key);
454 result.push((key_id, pv));
455 }
456 Ok(result)
457 }
458 }
459}
460
461#[cfg(test)]
462mod tests {
463 use super::*;
464 use cypherlite_core::{DatabaseConfig, SyncMode};
465 use tempfile::tempdir;
466
467 fn test_engine(dir: &std::path::Path) -> StorageEngine {
468 let config = DatabaseConfig {
469 path: dir.join("test.cyl"),
470 wal_sync_mode: SyncMode::Normal,
471 ..Default::default()
472 };
473 StorageEngine::open(config).expect("open")
474 }
475
476 #[test]
478 fn test_create_single_node() {
479 let dir = tempdir().expect("tempdir");
480 let mut engine = test_engine(dir.path());
481
482 let pattern = Pattern {
483 chains: vec![PatternChain {
484 elements: vec![PatternElement::Node(NodePattern {
485 variable: Some("n".to_string()),
486 labels: vec!["Person".to_string()],
487 properties: None,
488 })],
489 }],
490 };
491
492 let params = Params::new();
493 let result = execute_create(
494 vec![Record::new()],
495 &pattern,
496 &mut engine,
497 ¶ms,
498 &(),
499 &(),
500 );
501 let records = result.expect("should succeed");
502 assert_eq!(records.len(), 1);
503 assert!(records[0].contains_key("n"));
504 assert!(matches!(records[0].get("n"), Some(Value::Node(_))));
505
506 assert_eq!(engine.node_count(), 1);
508 }
509
510 #[test]
511 fn test_create_node_with_properties() {
512 let dir = tempdir().expect("tempdir");
513 let mut engine = test_engine(dir.path());
514
515 let _name_key = engine.get_or_create_prop_key("name");
517
518 let pattern = Pattern {
519 chains: vec![PatternChain {
520 elements: vec![PatternElement::Node(NodePattern {
521 variable: Some("n".to_string()),
522 labels: vec!["Person".to_string()],
523 properties: Some(vec![(
524 "name".to_string(),
525 Expression::Literal(Literal::String("Alice".into())),
526 )]),
527 })],
528 }],
529 };
530
531 let params = Params::new();
532 let result = execute_create(
533 vec![Record::new()],
534 &pattern,
535 &mut engine,
536 ¶ms,
537 &(),
538 &(),
539 );
540 let records = result.expect("should succeed");
541 assert_eq!(records.len(), 1);
542
543 if let Some(Value::Node(nid)) = records[0].get("n") {
545 let node = engine.get_node(*nid).expect("node exists");
546 assert!(!node.properties.is_empty());
547 } else {
548 panic!("expected node value");
549 }
550 }
551
552 #[test]
553 fn test_create_node_and_relationship() {
554 let dir = tempdir().expect("tempdir");
555 let mut engine = test_engine(dir.path());
556
557 let pattern = Pattern {
558 chains: vec![PatternChain {
559 elements: vec![
560 PatternElement::Node(NodePattern {
561 variable: Some("a".to_string()),
562 labels: vec!["Person".to_string()],
563 properties: None,
564 }),
565 PatternElement::Relationship(RelationshipPattern {
566 variable: Some("r".to_string()),
567 rel_types: vec!["KNOWS".to_string()],
568 direction: RelDirection::Outgoing,
569 properties: None,
570 min_hops: None,
571 max_hops: None,
572 }),
573 PatternElement::Node(NodePattern {
574 variable: Some("b".to_string()),
575 labels: vec!["Person".to_string()],
576 properties: None,
577 }),
578 ],
579 }],
580 };
581
582 let params = Params::new();
583 let result = execute_create(
584 vec![Record::new()],
585 &pattern,
586 &mut engine,
587 ¶ms,
588 &(),
589 &(),
590 );
591 let records = result.expect("should succeed");
592 assert_eq!(records.len(), 1);
593 assert!(records[0].contains_key("a"));
594 assert!(records[0].contains_key("r"));
595 assert!(records[0].contains_key("b"));
596 assert_eq!(engine.node_count(), 2);
597 assert_eq!(engine.edge_count(), 1);
598 }
599
600 #[test]
602 fn test_valid_from_is_not_system_property() {
603 assert!(!is_system_property("_valid_from"));
604 assert!(!is_system_property("_valid_to"));
605 }
606
607 #[test]
609 fn test_temporal_edge_property_detection() {
610 assert!(is_temporal_edge_property("_valid_from"));
611 assert!(is_temporal_edge_property("_valid_to"));
612 assert!(!is_temporal_edge_property("_created_at"));
613 assert!(!is_temporal_edge_property("name"));
614 }
615
616 #[test]
618 fn test_create_edge_injects_valid_from() {
619 let dir = tempdir().expect("tempdir");
620 let mut engine = test_engine(dir.path());
621
622 let pattern = Pattern {
623 chains: vec![PatternChain {
624 elements: vec![
625 PatternElement::Node(NodePattern {
626 variable: Some("a".to_string()),
627 labels: vec!["Person".to_string()],
628 properties: None,
629 }),
630 PatternElement::Relationship(RelationshipPattern {
631 variable: Some("r".to_string()),
632 rel_types: vec!["KNOWS".to_string()],
633 direction: RelDirection::Outgoing,
634 properties: None,
635 min_hops: None,
636 max_hops: None,
637 }),
638 PatternElement::Node(NodePattern {
639 variable: Some("b".to_string()),
640 labels: vec!["Person".to_string()],
641 properties: None,
642 }),
643 ],
644 }],
645 };
646
647 let mut params = Params::new();
648 params.insert(
649 "__query_start_ms__".to_string(),
650 Value::Int64(1_700_000_000_000),
651 );
652 let result = execute_create(
653 vec![Record::new()],
654 &pattern,
655 &mut engine,
656 ¶ms,
657 &(),
658 &(),
659 );
660 let records = result.expect("should succeed");
661
662 if let Some(Value::Edge(eid)) = records[0].get("r") {
664 let edge = engine.get_edge(*eid).expect("edge exists");
665 let valid_from_key = engine
666 .catalog()
667 .prop_key_id("_valid_from")
668 .expect("_valid_from key");
669 let has_valid_from = edge.properties.iter().any(|(k, _)| *k == valid_from_key);
670 assert!(has_valid_from, "edge should have _valid_from property");
671
672 let created_key = engine
674 .catalog()
675 .prop_key_id("_created_at")
676 .expect("_created_at key");
677 let updated_key = engine
678 .catalog()
679 .prop_key_id("_updated_at")
680 .expect("_updated_at key");
681 assert!(edge.properties.iter().any(|(k, _)| *k == created_key));
682 assert!(edge.properties.iter().any(|(k, _)| *k == updated_key));
683 } else {
684 panic!("expected edge value for 'r'");
685 }
686 }
687
688 #[test]
689 fn test_create_reuses_existing_variable() {
690 let dir = tempdir().expect("tempdir");
691 let mut engine = test_engine(dir.path());
692
693 let existing_id = engine.create_node(vec![], vec![]);
695 let mut initial_record = Record::new();
696 initial_record.insert("a".to_string(), Value::Node(existing_id));
697
698 let pattern = Pattern {
699 chains: vec![PatternChain {
700 elements: vec![
701 PatternElement::Node(NodePattern {
702 variable: Some("a".to_string()),
703 labels: vec![],
704 properties: None,
705 }),
706 PatternElement::Relationship(RelationshipPattern {
707 variable: None,
708 rel_types: vec!["KNOWS".to_string()],
709 direction: RelDirection::Outgoing,
710 properties: None,
711 min_hops: None,
712 max_hops: None,
713 }),
714 PatternElement::Node(NodePattern {
715 variable: Some("b".to_string()),
716 labels: vec![],
717 properties: None,
718 }),
719 ],
720 }],
721 };
722
723 let params = Params::new();
724 let result = execute_create(
725 vec![initial_record],
726 &pattern,
727 &mut engine,
728 ¶ms,
729 &(),
730 &(),
731 );
732 let records = result.expect("should succeed");
733
734 assert_eq!(engine.node_count(), 2); assert_eq!(records[0].get("a"), Some(&Value::Node(existing_id)));
737 }
738}