1use crate::executor::operators::temporal_filter::{is_edge_temporally_valid, TemporalFilter};
4use crate::executor::{Record, Value};
5use crate::parser::ast::RelDirection;
6#[cfg(feature = "subgraph")]
7use cypherlite_core::LabelRegistry;
8use cypherlite_core::NodeId;
9use cypherlite_storage::StorageEngine;
10
11#[allow(clippy::too_many_arguments)]
18pub fn execute_expand(
19 source_records: Vec<Record>,
20 src_var: &str,
21 rel_var: Option<&str>,
22 target_var: &str,
23 rel_type_id: Option<u32>,
24 direction: &RelDirection,
25 engine: &StorageEngine,
26 temporal_filter: Option<&TemporalFilter>,
27) -> Vec<Record> {
28 let mut results = Vec::new();
29
30 for record in source_records {
31 #[cfg(feature = "hypergraph")]
33 {
34 if let Some(Value::Hyperedge(he_id)) = record.get(src_var) {
35 if let Some(he_rec) = engine.get_hyperedge(*he_id) {
36 let type_matches = rel_type_id.is_none_or(|tid| he_rec.rel_type_id == tid);
38 if type_matches {
39 let participants: Vec<Value> = he_rec
41 .sources
42 .iter()
43 .chain(he_rec.targets.iter())
44 .map(|entity| match entity {
45 cypherlite_core::GraphEntity::Node(nid) => Value::Node(*nid),
46 #[cfg(feature = "subgraph")]
47 cypherlite_core::GraphEntity::Subgraph(sid) => {
48 Value::Subgraph(*sid)
49 }
50 cypherlite_core::GraphEntity::HyperEdge(hid) => {
51 Value::Hyperedge(*hid)
52 }
53 #[cfg(feature = "hypergraph")]
54 cypherlite_core::GraphEntity::TemporalRef(nid, ts) => {
55 Value::TemporalNode(*nid, *ts)
58 }
59 })
60 .collect();
61
62 if !participants.is_empty() {
63 for target_value in &participants[..participants.len() - 1] {
65 let mut new_record = record.clone();
66 if let Some(rv) = rel_var {
67 new_record.insert(rv.to_string(), Value::Null);
68 }
69 new_record.insert(target_var.to_string(), target_value.clone());
70 results.push(new_record);
71 }
72 let last_value = participants.into_iter().last().unwrap();
74 let mut last_record = record;
75 if let Some(rv) = rel_var {
76 last_record.insert(rv.to_string(), Value::Null);
77 }
78 last_record.insert(target_var.to_string(), last_value);
79 results.push(last_record);
80 }
81 }
82 }
83 continue;
84 }
85 }
86
87 #[cfg(feature = "subgraph")]
89 {
90 if let Some(Value::Subgraph(sg_id)) = record.get(src_var) {
91 let is_contains = rel_type_id
93 .is_some_and(|tid| engine.catalog().rel_type_name(tid) == Some("CONTAINS"));
94 if is_contains {
95 let members: Vec<NodeId> = engine.list_members(*sg_id);
98 if !members.is_empty() {
99 for &node_id in &members[..members.len() - 1] {
101 let mut new_record = record.clone();
102 if let Some(rv) = rel_var {
103 new_record.insert(rv.to_string(), Value::Null);
104 }
105 new_record.insert(target_var.to_string(), Value::Node(node_id));
106 results.push(new_record);
107 }
108 let last_node = *members.last().unwrap();
109 let mut last_record = record;
110 if let Some(rv) = rel_var {
111 last_record.insert(rv.to_string(), Value::Null);
112 }
113 last_record.insert(target_var.to_string(), Value::Node(last_node));
114 results.push(last_record);
115 }
116 continue;
117 }
118 }
119 }
120
121 let src_node_id = match record.get(src_var) {
122 Some(Value::Node(nid)) => *nid,
123 _ => continue,
124 };
125
126 let edges = engine.get_edges_for_node(src_node_id);
127
128 let mut matched: Vec<(cypherlite_core::EdgeId, NodeId)> = Vec::new();
131 for edge in edges {
132 if let Some(tid) = rel_type_id {
134 if edge.rel_type_id != tid {
135 continue;
136 }
137 }
138
139 if let Some(tf) = temporal_filter {
141 if !is_edge_temporally_valid(edge.edge_id, tf, engine) {
142 continue;
143 }
144 }
145
146 let target_node_id: Option<NodeId> = match direction {
148 RelDirection::Outgoing => {
149 if edge.start_node == src_node_id {
150 Some(edge.end_node)
151 } else {
152 None
153 }
154 }
155 RelDirection::Incoming => {
156 if edge.end_node == src_node_id {
157 Some(edge.start_node)
158 } else {
159 None
160 }
161 }
162 RelDirection::Undirected => {
163 if edge.start_node == src_node_id {
164 Some(edge.end_node)
165 } else if edge.end_node == src_node_id {
166 Some(edge.start_node)
167 } else {
168 None
169 }
170 }
171 };
172
173 if let Some(target_id) = target_node_id {
174 matched.push((edge.edge_id, target_id));
175 }
176 }
177
178 if matched.is_empty() {
179 continue;
180 }
181
182 for &(edge_id, target_id) in &matched[..matched.len() - 1] {
184 let mut new_record = record.clone();
185 if let Some(rv) = rel_var {
186 new_record.insert(rv.to_string(), Value::Edge(edge_id));
187 }
188 new_record.insert(target_var.to_string(), Value::Node(target_id));
189 results.push(new_record);
190 }
191 let (last_edge_id, last_target_id) = *matched.last().unwrap();
192 let mut last_record = record; if let Some(rv) = rel_var {
194 last_record.insert(rv.to_string(), Value::Edge(last_edge_id));
195 }
196 last_record.insert(target_var.to_string(), Value::Node(last_target_id));
197 results.push(last_record);
198 }
199
200 results
201}
202
203#[cfg(test)]
204mod tests {
205 use super::*;
206 use crate::executor::Record;
207 use cypherlite_core::{DatabaseConfig, LabelRegistry, SyncMode};
208 use cypherlite_storage::StorageEngine;
209 use tempfile::tempdir;
210
211 fn test_engine(dir: &std::path::Path) -> StorageEngine {
212 let config = DatabaseConfig {
213 path: dir.join("test.cyl"),
214 wal_sync_mode: SyncMode::Normal,
215 ..Default::default()
216 };
217 StorageEngine::open(config).expect("open")
218 }
219
220 #[test]
222 fn test_expand_outgoing() {
223 let dir = tempdir().expect("tempdir");
224 let mut engine = test_engine(dir.path());
225
226 let knows_type = engine.get_or_create_rel_type("KNOWS");
227 let n1 = engine.create_node(vec![], vec![]);
228 let n2 = engine.create_node(vec![], vec![]);
229 let n3 = engine.create_node(vec![], vec![]);
230
231 engine
232 .create_edge(n1, n2, knows_type, vec![])
233 .expect("edge");
234 engine
235 .create_edge(n1, n3, knows_type, vec![])
236 .expect("edge");
237
238 let mut source = Record::new();
239 source.insert("a".to_string(), Value::Node(n1));
240
241 let results = execute_expand(
242 vec![source],
243 "a",
244 Some("r"),
245 "b",
246 Some(knows_type),
247 &RelDirection::Outgoing,
248 &engine,
249 None,
250 );
251
252 assert_eq!(results.len(), 2);
253 for r in &results {
254 assert!(r.contains_key("a"));
255 assert!(r.contains_key("r"));
256 assert!(r.contains_key("b"));
257 }
258 }
259
260 #[test]
261 fn test_expand_incoming() {
262 let dir = tempdir().expect("tempdir");
263 let mut engine = test_engine(dir.path());
264
265 let knows_type = engine.get_or_create_rel_type("KNOWS");
266 let n1 = engine.create_node(vec![], vec![]);
267 let n2 = engine.create_node(vec![], vec![]);
268
269 engine
270 .create_edge(n1, n2, knows_type, vec![])
271 .expect("edge");
272
273 let mut source = Record::new();
274 source.insert("b".to_string(), Value::Node(n2));
275
276 let results = execute_expand(
277 vec![source],
278 "b",
279 None,
280 "a",
281 Some(knows_type),
282 &RelDirection::Incoming,
283 &engine,
284 None,
285 );
286
287 assert_eq!(results.len(), 1);
288 assert_eq!(results[0].get("a"), Some(&Value::Node(n1)));
289 }
290
291 #[test]
292 fn test_expand_no_matching_type() {
293 let dir = tempdir().expect("tempdir");
294 let mut engine = test_engine(dir.path());
295
296 let knows_type = engine.get_or_create_rel_type("KNOWS");
297 let likes_type = engine.get_or_create_rel_type("LIKES");
298 let n1 = engine.create_node(vec![], vec![]);
299 let n2 = engine.create_node(vec![], vec![]);
300
301 engine
302 .create_edge(n1, n2, knows_type, vec![])
303 .expect("edge");
304
305 let mut source = Record::new();
306 source.insert("a".to_string(), Value::Node(n1));
307
308 let results = execute_expand(
309 vec![source],
310 "a",
311 None,
312 "b",
313 Some(likes_type),
314 &RelDirection::Outgoing,
315 &engine,
316 None,
317 );
318
319 assert!(results.is_empty());
320 }
321
322 #[test]
323 fn test_expand_undirected() {
324 let dir = tempdir().expect("tempdir");
325 let mut engine = test_engine(dir.path());
326
327 let knows_type = engine.get_or_create_rel_type("KNOWS");
328 let n1 = engine.create_node(vec![], vec![]);
329 let n2 = engine.create_node(vec![], vec![]);
330
331 engine
332 .create_edge(n1, n2, knows_type, vec![])
333 .expect("edge");
334
335 let mut source = Record::new();
337 source.insert("a".to_string(), Value::Node(n2));
338
339 let results = execute_expand(
340 vec![source],
341 "a",
342 None,
343 "b",
344 None,
345 &RelDirection::Undirected,
346 &engine,
347 None,
348 );
349
350 assert_eq!(results.len(), 1);
351 assert_eq!(results[0].get("b"), Some(&Value::Node(n1)));
352 }
353
354 #[test]
359 fn test_expand_large_fanout() {
360 let dir = tempdir().expect("tempdir");
361 let mut engine = test_engine(dir.path());
362
363 let knows_type = engine.get_or_create_rel_type("KNOWS");
364 let center = engine.create_node(vec![], vec![]);
365 let mut targets = Vec::new();
366 for _ in 0..20 {
367 let t = engine.create_node(vec![], vec![]);
368 engine
369 .create_edge(center, t, knows_type, vec![])
370 .expect("edge");
371 targets.push(t);
372 }
373
374 let mut source = Record::new();
375 source.insert("a".to_string(), Value::Node(center));
376 source.insert("extra".to_string(), Value::Int64(42));
378
379 let results = execute_expand(
380 vec![source],
381 "a",
382 Some("r"),
383 "b",
384 Some(knows_type),
385 &RelDirection::Outgoing,
386 &engine,
387 None,
388 );
389
390 assert_eq!(results.len(), 20);
391 let mut found_targets: Vec<NodeId> = results
392 .iter()
393 .filter_map(|r| match r.get("b") {
394 Some(Value::Node(nid)) => Some(*nid),
395 _ => None,
396 })
397 .collect();
398 found_targets.sort();
399 targets.sort();
400 assert_eq!(found_targets, targets);
401
402 for r in &results {
404 assert_eq!(r.get("extra"), Some(&Value::Int64(42)));
405 assert!(r.contains_key("r"));
406 assert!(r.contains_key("a"));
407 }
408 }
409
410 #[test]
413 fn test_expand_single_edge_move() {
414 let dir = tempdir().expect("tempdir");
415 let mut engine = test_engine(dir.path());
416
417 let follows_type = engine.get_or_create_rel_type("FOLLOWS");
418 let n1 = engine.create_node(vec![], vec![]);
419 let n2 = engine.create_node(vec![], vec![]);
420 engine
421 .create_edge(n1, n2, follows_type, vec![])
422 .expect("edge");
423
424 let mut source = Record::new();
425 source.insert("x".to_string(), Value::Node(n1));
426 source.insert("ctx".to_string(), Value::Int64(99));
427
428 let results = execute_expand(
429 vec![source],
430 "x",
431 Some("rel"),
432 "y",
433 Some(follows_type),
434 &RelDirection::Outgoing,
435 &engine,
436 None,
437 );
438
439 assert_eq!(results.len(), 1);
440 assert_eq!(results[0].get("y"), Some(&Value::Node(n2)));
441 assert_eq!(results[0].get("x"), Some(&Value::Node(n1)));
442 assert_eq!(results[0].get("ctx"), Some(&Value::Int64(99)));
443 assert!(results[0].contains_key("rel"));
444 }
445
446 #[test]
449 fn test_expand_multiple_sources_varied_fanout() {
450 let dir = tempdir().expect("tempdir");
451 let mut engine = test_engine(dir.path());
452
453 let knows_type = engine.get_or_create_rel_type("KNOWS");
454
455 let s1 = engine.create_node(vec![], vec![]);
457 let t1a = engine.create_node(vec![], vec![]);
458 let t1b = engine.create_node(vec![], vec![]);
459 let t1c = engine.create_node(vec![], vec![]);
460 engine.create_edge(s1, t1a, knows_type, vec![]).unwrap();
461 engine.create_edge(s1, t1b, knows_type, vec![]).unwrap();
462 engine.create_edge(s1, t1c, knows_type, vec![]).unwrap();
463
464 let s2 = engine.create_node(vec![], vec![]);
466
467 let s3 = engine.create_node(vec![], vec![]);
469 let t3a = engine.create_node(vec![], vec![]);
470 engine.create_edge(s3, t3a, knows_type, vec![]).unwrap();
471
472 let mut rec1 = Record::new();
473 rec1.insert("n".to_string(), Value::Node(s1));
474 rec1.insert("tag".to_string(), Value::Int64(1));
475
476 let mut rec2 = Record::new();
477 rec2.insert("n".to_string(), Value::Node(s2));
478 rec2.insert("tag".to_string(), Value::Int64(2));
479
480 let mut rec3 = Record::new();
481 rec3.insert("n".to_string(), Value::Node(s3));
482 rec3.insert("tag".to_string(), Value::Int64(3));
483
484 let results = execute_expand(
485 vec![rec1, rec2, rec3],
486 "n",
487 Some("r"),
488 "m",
489 Some(knows_type),
490 &RelDirection::Outgoing,
491 &engine,
492 None,
493 );
494
495 assert_eq!(results.len(), 4);
497
498 let s1_results: Vec<_> = results
500 .iter()
501 .filter(|r| r.get("tag") == Some(&Value::Int64(1)))
502 .collect();
503 assert_eq!(s1_results.len(), 3);
504
505 let s2_results: Vec<_> = results
507 .iter()
508 .filter(|r| r.get("tag") == Some(&Value::Int64(2)))
509 .collect();
510 assert_eq!(s2_results.len(), 0);
511
512 let s3_results: Vec<_> = results
514 .iter()
515 .filter(|r| r.get("tag") == Some(&Value::Int64(3)))
516 .collect();
517 assert_eq!(s3_results.len(), 1);
518 assert_eq!(s3_results[0].get("m"), Some(&Value::Node(t3a)));
519 }
520
521 #[test]
523 fn test_expand_zero_matching_edges_no_results() {
524 let dir = tempdir().expect("tempdir");
525 let mut engine = test_engine(dir.path());
526
527 let knows_type = engine.get_or_create_rel_type("KNOWS");
528 let likes_type = engine.get_or_create_rel_type("LIKES");
529
530 let n1 = engine.create_node(vec![], vec![]);
531 let n2 = engine.create_node(vec![], vec![]);
532 engine.create_edge(n1, n2, likes_type, vec![]).unwrap();
534
535 let mut source = Record::new();
536 source.insert("a".to_string(), Value::Node(n1));
537 source.insert("data".to_string(), Value::Int64(7));
538
539 let results = execute_expand(
540 vec![source],
541 "a",
542 Some("r"),
543 "b",
544 Some(knows_type),
545 &RelDirection::Outgoing,
546 &engine,
547 None,
548 );
549
550 assert!(results.is_empty());
551 }
552
553 #[cfg(feature = "hypergraph")]
555 mod involves_tests {
556 use super::*;
557
558 #[test]
559 fn test_involves_expands_to_source_and_target_nodes() {
560 let dir = tempdir().expect("tempdir");
561 let mut engine = test_engine(dir.path());
562
563 let involves_type = engine.get_or_create_rel_type("INVOLVES");
564 let n1 = engine.create_node(vec![], vec![]);
565 let n2 = engine.create_node(vec![], vec![]);
566 let n3 = engine.create_node(vec![], vec![]);
567
568 use cypherlite_core::GraphEntity;
569 engine.create_hyperedge(
570 involves_type,
571 vec![GraphEntity::Node(n1)],
572 vec![GraphEntity::Node(n2), GraphEntity::Node(n3)],
573 vec![],
574 );
575
576 let he_id = cypherlite_core::HyperEdgeId(1);
578 let mut source = Record::new();
579 source.insert("he".to_string(), Value::Hyperedge(he_id));
580
581 let results = execute_expand(
582 vec![source],
583 "he",
584 Some("r"),
585 "n",
586 Some(involves_type),
587 &RelDirection::Outgoing,
588 &engine,
589 None,
590 );
591
592 assert_eq!(results.len(), 3);
594 for r in &results {
595 assert!(r.contains_key("he"));
596 assert!(r.contains_key("n"));
597 assert_eq!(r.get("r"), Some(&Value::Null));
599 assert!(matches!(r.get("n"), Some(Value::Node(_))));
600 }
601 }
602
603 #[test]
604 fn test_involves_no_matching_type() {
605 let dir = tempdir().expect("tempdir");
606 let mut engine = test_engine(dir.path());
607
608 let involves_type = engine.get_or_create_rel_type("INVOLVES");
609 let other_type = engine.get_or_create_rel_type("OTHER");
610 let n1 = engine.create_node(vec![], vec![]);
611
612 use cypherlite_core::GraphEntity;
613 engine.create_hyperedge(involves_type, vec![GraphEntity::Node(n1)], vec![], vec![]);
614
615 let he_id = cypherlite_core::HyperEdgeId(1);
616 let mut source = Record::new();
617 source.insert("he".to_string(), Value::Hyperedge(he_id));
618
619 let results = execute_expand(
621 vec![source],
622 "he",
623 None,
624 "n",
625 Some(other_type),
626 &RelDirection::Outgoing,
627 &engine,
628 None,
629 );
630
631 assert!(results.is_empty());
632 }
633
634 #[test]
635 fn test_involves_empty_hyperedge() {
636 let dir = tempdir().expect("tempdir");
637 let mut engine = test_engine(dir.path());
638
639 let involves_type = engine.get_or_create_rel_type("INVOLVES");
640
641 engine.create_hyperedge(involves_type, vec![], vec![], vec![]);
642
643 let he_id = cypherlite_core::HyperEdgeId(1);
644 let mut source = Record::new();
645 source.insert("he".to_string(), Value::Hyperedge(he_id));
646
647 let results = execute_expand(
648 vec![source],
649 "he",
650 None,
651 "n",
652 Some(involves_type),
653 &RelDirection::Outgoing,
654 &engine,
655 None,
656 );
657
658 assert!(results.is_empty());
659 }
660
661 #[test]
662 fn test_involves_no_rel_type_filter_matches_all() {
663 let dir = tempdir().expect("tempdir");
664 let mut engine = test_engine(dir.path());
665
666 let involves_type = engine.get_or_create_rel_type("INVOLVES");
667 let n1 = engine.create_node(vec![], vec![]);
668
669 use cypherlite_core::GraphEntity;
670 engine.create_hyperedge(involves_type, vec![GraphEntity::Node(n1)], vec![], vec![]);
671
672 let he_id = cypherlite_core::HyperEdgeId(1);
673 let mut source = Record::new();
674 source.insert("he".to_string(), Value::Hyperedge(he_id));
675
676 let results = execute_expand(
678 vec![source],
679 "he",
680 None,
681 "n",
682 None,
683 &RelDirection::Outgoing,
684 &engine,
685 None,
686 );
687
688 assert_eq!(results.len(), 1);
689 assert_eq!(results[0].get("n"), Some(&Value::Node(n1)));
690 }
691 }
692}