Skip to main content

cypherlite_query/executor/operators/
expand.rs

1// ExpandOp: edge traversal (linked-list walk, O(degree))
2
3use crate::executor::operators::temporal_filter::{is_edge_temporally_valid, TemporalFilter};
4use crate::executor::{Record, Value};
5use crate::parser::ast::RelDirection;
6#[cfg(feature = "subgraph")]
7use cypherlite_core::LabelRegistry;
8use cypherlite_core::NodeId;
9use cypherlite_storage::StorageEngine;
10
11/// Expand from source records along edges.
12/// For each source record, find edges matching direction and type,
13/// and produce new records with rel_var and target_var bindings.
14///
15/// When `temporal_filter` is Some, edges that fail temporal validity
16/// are skipped (DD-T2).
17#[allow(clippy::too_many_arguments)]
18pub fn execute_expand(
19    source_records: Vec<Record>,
20    src_var: &str,
21    rel_var: Option<&str>,
22    target_var: &str,
23    rel_type_id: Option<u32>,
24    direction: &RelDirection,
25    engine: &StorageEngine,
26    temporal_filter: Option<&TemporalFilter>,
27) -> Vec<Record> {
28    let mut results = Vec::new();
29
30    for record in source_records {
31        // Check if source is a Hyperedge -- virtual :INVOLVES expansion.
32        #[cfg(feature = "hypergraph")]
33        {
34            if let Some(Value::Hyperedge(he_id)) = record.get(src_var) {
35                if let Some(he_rec) = engine.get_hyperedge(*he_id) {
36                    // Filter by rel_type if specified
37                    let type_matches = rel_type_id.is_none_or(|tid| he_rec.rel_type_id == tid);
38                    if type_matches {
39                        // Collect participant target values first for move-last optimisation.
40                        let participants: Vec<Value> = he_rec
41                            .sources
42                            .iter()
43                            .chain(he_rec.targets.iter())
44                            .map(|entity| match entity {
45                                cypherlite_core::GraphEntity::Node(nid) => Value::Node(*nid),
46                                #[cfg(feature = "subgraph")]
47                                cypherlite_core::GraphEntity::Subgraph(sid) => {
48                                    Value::Subgraph(*sid)
49                                }
50                                cypherlite_core::GraphEntity::HyperEdge(hid) => {
51                                    Value::Hyperedge(*hid)
52                                }
53                                #[cfg(feature = "hypergraph")]
54                                cypherlite_core::GraphEntity::TemporalRef(nid, ts) => {
55                                    // NN-003: Lazy resolution -- store as TemporalNode,
56                                    // resolved when properties are accessed.
57                                    Value::TemporalNode(*nid, *ts)
58                                }
59                            })
60                            .collect();
61
62                        if !participants.is_empty() {
63                            // Clone for all-but-last, move for last (REQ-Q-003).
64                            for target_value in &participants[..participants.len() - 1] {
65                                let mut new_record = record.clone();
66                                if let Some(rv) = rel_var {
67                                    new_record.insert(rv.to_string(), Value::Null);
68                                }
69                                new_record.insert(target_var.to_string(), target_value.clone());
70                                results.push(new_record);
71                            }
72                            // Last participant: move the record (no clone).
73                            let last_value = participants.into_iter().last().unwrap();
74                            let mut last_record = record;
75                            if let Some(rv) = rel_var {
76                                last_record.insert(rv.to_string(), Value::Null);
77                            }
78                            last_record.insert(target_var.to_string(), last_value);
79                            results.push(last_record);
80                        }
81                    }
82                }
83                continue;
84            }
85        }
86
87        // Check if source is a Subgraph and rel_type is "CONTAINS" -- virtual edge expansion.
88        #[cfg(feature = "subgraph")]
89        {
90            if let Some(Value::Subgraph(sg_id)) = record.get(src_var) {
91                // Check if the relationship type is "CONTAINS"
92                let is_contains = rel_type_id
93                    .is_some_and(|tid| engine.catalog().rel_type_name(tid) == Some("CONTAINS"));
94                if is_contains {
95                    // Virtual :CONTAINS expansion via MembershipIndex.
96                    // Collect members first for move-last optimisation (REQ-Q-003).
97                    let members: Vec<NodeId> = engine.list_members(*sg_id);
98                    if !members.is_empty() {
99                        // Clone for all-but-last, move for last.
100                        for &node_id in &members[..members.len() - 1] {
101                            let mut new_record = record.clone();
102                            if let Some(rv) = rel_var {
103                                new_record.insert(rv.to_string(), Value::Null);
104                            }
105                            new_record.insert(target_var.to_string(), Value::Node(node_id));
106                            results.push(new_record);
107                        }
108                        let last_node = *members.last().unwrap();
109                        let mut last_record = record;
110                        if let Some(rv) = rel_var {
111                            last_record.insert(rv.to_string(), Value::Null);
112                        }
113                        last_record.insert(target_var.to_string(), Value::Node(last_node));
114                        results.push(last_record);
115                    }
116                    continue;
117                }
118            }
119        }
120
121        let src_node_id = match record.get(src_var) {
122            Some(Value::Node(nid)) => *nid,
123            _ => continue,
124        };
125
126        let edges = engine.get_edges_for_node(src_node_id);
127
128        // Collect matching (edge_id, target_node_id) pairs for move-last optimisation
129        // (REQ-Q-003). This avoids cloning the source record for the last match.
130        let mut matched: Vec<(cypherlite_core::EdgeId, NodeId)> = Vec::new();
131        for edge in edges {
132            // Filter by relationship type if specified
133            if let Some(tid) = rel_type_id {
134                if edge.rel_type_id != tid {
135                    continue;
136                }
137            }
138
139            // Temporal filter: skip edges that are not temporally valid
140            if let Some(tf) = temporal_filter {
141                if !is_edge_temporally_valid(edge.edge_id, tf, engine) {
142                    continue;
143                }
144            }
145
146            // Direction filtering and determine target node
147            let target_node_id: Option<NodeId> = match direction {
148                RelDirection::Outgoing => {
149                    if edge.start_node == src_node_id {
150                        Some(edge.end_node)
151                    } else {
152                        None
153                    }
154                }
155                RelDirection::Incoming => {
156                    if edge.end_node == src_node_id {
157                        Some(edge.start_node)
158                    } else {
159                        None
160                    }
161                }
162                RelDirection::Undirected => {
163                    if edge.start_node == src_node_id {
164                        Some(edge.end_node)
165                    } else if edge.end_node == src_node_id {
166                        Some(edge.start_node)
167                    } else {
168                        None
169                    }
170                }
171            };
172
173            if let Some(target_id) = target_node_id {
174                matched.push((edge.edge_id, target_id));
175            }
176        }
177
178        if matched.is_empty() {
179            continue;
180        }
181
182        // Emit: clone for all-but-last, move for last.
183        for &(edge_id, target_id) in &matched[..matched.len() - 1] {
184            let mut new_record = record.clone();
185            if let Some(rv) = rel_var {
186                new_record.insert(rv.to_string(), Value::Edge(edge_id));
187            }
188            new_record.insert(target_var.to_string(), Value::Node(target_id));
189            results.push(new_record);
190        }
191        let (last_edge_id, last_target_id) = *matched.last().unwrap();
192        let mut last_record = record; // move ownership, no clone
193        if let Some(rv) = rel_var {
194            last_record.insert(rv.to_string(), Value::Edge(last_edge_id));
195        }
196        last_record.insert(target_var.to_string(), Value::Node(last_target_id));
197        results.push(last_record);
198    }
199
200    results
201}
202
203#[cfg(test)]
204mod tests {
205    use super::*;
206    use crate::executor::Record;
207    use cypherlite_core::{DatabaseConfig, LabelRegistry, SyncMode};
208    use cypherlite_storage::StorageEngine;
209    use tempfile::tempdir;
210
211    fn test_engine(dir: &std::path::Path) -> StorageEngine {
212        let config = DatabaseConfig {
213            path: dir.join("test.cyl"),
214            wal_sync_mode: SyncMode::Normal,
215            ..Default::default()
216        };
217        StorageEngine::open(config).expect("open")
218    }
219
220    // EXEC-T002: ExpandOp directed traversal
221    #[test]
222    fn test_expand_outgoing() {
223        let dir = tempdir().expect("tempdir");
224        let mut engine = test_engine(dir.path());
225
226        let knows_type = engine.get_or_create_rel_type("KNOWS");
227        let n1 = engine.create_node(vec![], vec![]);
228        let n2 = engine.create_node(vec![], vec![]);
229        let n3 = engine.create_node(vec![], vec![]);
230
231        engine
232            .create_edge(n1, n2, knows_type, vec![])
233            .expect("edge");
234        engine
235            .create_edge(n1, n3, knows_type, vec![])
236            .expect("edge");
237
238        let mut source = Record::new();
239        source.insert("a".to_string(), Value::Node(n1));
240
241        let results = execute_expand(
242            vec![source],
243            "a",
244            Some("r"),
245            "b",
246            Some(knows_type),
247            &RelDirection::Outgoing,
248            &engine,
249            None,
250        );
251
252        assert_eq!(results.len(), 2);
253        for r in &results {
254            assert!(r.contains_key("a"));
255            assert!(r.contains_key("r"));
256            assert!(r.contains_key("b"));
257        }
258    }
259
260    #[test]
261    fn test_expand_incoming() {
262        let dir = tempdir().expect("tempdir");
263        let mut engine = test_engine(dir.path());
264
265        let knows_type = engine.get_or_create_rel_type("KNOWS");
266        let n1 = engine.create_node(vec![], vec![]);
267        let n2 = engine.create_node(vec![], vec![]);
268
269        engine
270            .create_edge(n1, n2, knows_type, vec![])
271            .expect("edge");
272
273        let mut source = Record::new();
274        source.insert("b".to_string(), Value::Node(n2));
275
276        let results = execute_expand(
277            vec![source],
278            "b",
279            None,
280            "a",
281            Some(knows_type),
282            &RelDirection::Incoming,
283            &engine,
284            None,
285        );
286
287        assert_eq!(results.len(), 1);
288        assert_eq!(results[0].get("a"), Some(&Value::Node(n1)));
289    }
290
291    #[test]
292    fn test_expand_no_matching_type() {
293        let dir = tempdir().expect("tempdir");
294        let mut engine = test_engine(dir.path());
295
296        let knows_type = engine.get_or_create_rel_type("KNOWS");
297        let likes_type = engine.get_or_create_rel_type("LIKES");
298        let n1 = engine.create_node(vec![], vec![]);
299        let n2 = engine.create_node(vec![], vec![]);
300
301        engine
302            .create_edge(n1, n2, knows_type, vec![])
303            .expect("edge");
304
305        let mut source = Record::new();
306        source.insert("a".to_string(), Value::Node(n1));
307
308        let results = execute_expand(
309            vec![source],
310            "a",
311            None,
312            "b",
313            Some(likes_type),
314            &RelDirection::Outgoing,
315            &engine,
316            None,
317        );
318
319        assert!(results.is_empty());
320    }
321
322    #[test]
323    fn test_expand_undirected() {
324        let dir = tempdir().expect("tempdir");
325        let mut engine = test_engine(dir.path());
326
327        let knows_type = engine.get_or_create_rel_type("KNOWS");
328        let n1 = engine.create_node(vec![], vec![]);
329        let n2 = engine.create_node(vec![], vec![]);
330
331        engine
332            .create_edge(n1, n2, knows_type, vec![])
333            .expect("edge");
334
335        // Starting from n2, undirected should find n1
336        let mut source = Record::new();
337        source.insert("a".to_string(), Value::Node(n2));
338
339        let results = execute_expand(
340            vec![source],
341            "a",
342            None,
343            "b",
344            None,
345            &RelDirection::Undirected,
346            &engine,
347            None,
348        );
349
350        assert_eq!(results.len(), 1);
351        assert_eq!(results[0].get("b"), Some(&Value::Node(n1)));
352    }
353
354    // ── M3-1: Record sharing / clone-last optimization tests ───────────
355
356    /// Star graph: one source node with 20 outgoing edges.
357    /// Verifies all 20 results contain correct bindings.
358    #[test]
359    fn test_expand_large_fanout() {
360        let dir = tempdir().expect("tempdir");
361        let mut engine = test_engine(dir.path());
362
363        let knows_type = engine.get_or_create_rel_type("KNOWS");
364        let center = engine.create_node(vec![], vec![]);
365        let mut targets = Vec::new();
366        for _ in 0..20 {
367            let t = engine.create_node(vec![], vec![]);
368            engine
369                .create_edge(center, t, knows_type, vec![])
370                .expect("edge");
371            targets.push(t);
372        }
373
374        let mut source = Record::new();
375        source.insert("a".to_string(), Value::Node(center));
376        // Add extra binding to verify it survives clone/move
377        source.insert("extra".to_string(), Value::Int64(42));
378
379        let results = execute_expand(
380            vec![source],
381            "a",
382            Some("r"),
383            "b",
384            Some(knows_type),
385            &RelDirection::Outgoing,
386            &engine,
387            None,
388        );
389
390        assert_eq!(results.len(), 20);
391        let mut found_targets: Vec<NodeId> = results
392            .iter()
393            .filter_map(|r| match r.get("b") {
394                Some(Value::Node(nid)) => Some(*nid),
395                _ => None,
396            })
397            .collect();
398        found_targets.sort();
399        targets.sort();
400        assert_eq!(found_targets, targets);
401
402        // Verify extra binding preserved in every result
403        for r in &results {
404            assert_eq!(r.get("extra"), Some(&Value::Int64(42)));
405            assert!(r.contains_key("r"));
406            assert!(r.contains_key("a"));
407        }
408    }
409
410    /// Single edge: the source record should be moved (not cloned).
411    /// Validates the single-match path produces a correct record.
412    #[test]
413    fn test_expand_single_edge_move() {
414        let dir = tempdir().expect("tempdir");
415        let mut engine = test_engine(dir.path());
416
417        let follows_type = engine.get_or_create_rel_type("FOLLOWS");
418        let n1 = engine.create_node(vec![], vec![]);
419        let n2 = engine.create_node(vec![], vec![]);
420        engine
421            .create_edge(n1, n2, follows_type, vec![])
422            .expect("edge");
423
424        let mut source = Record::new();
425        source.insert("x".to_string(), Value::Node(n1));
426        source.insert("ctx".to_string(), Value::Int64(99));
427
428        let results = execute_expand(
429            vec![source],
430            "x",
431            Some("rel"),
432            "y",
433            Some(follows_type),
434            &RelDirection::Outgoing,
435            &engine,
436            None,
437        );
438
439        assert_eq!(results.len(), 1);
440        assert_eq!(results[0].get("y"), Some(&Value::Node(n2)));
441        assert_eq!(results[0].get("x"), Some(&Value::Node(n1)));
442        assert_eq!(results[0].get("ctx"), Some(&Value::Int64(99)));
443        assert!(results[0].contains_key("rel"));
444    }
445
446    /// Multiple source records, each with different edge counts.
447    /// Verifies the optimization works correctly per-source-record.
448    #[test]
449    fn test_expand_multiple_sources_varied_fanout() {
450        let dir = tempdir().expect("tempdir");
451        let mut engine = test_engine(dir.path());
452
453        let knows_type = engine.get_or_create_rel_type("KNOWS");
454
455        // Source 1: 3 edges
456        let s1 = engine.create_node(vec![], vec![]);
457        let t1a = engine.create_node(vec![], vec![]);
458        let t1b = engine.create_node(vec![], vec![]);
459        let t1c = engine.create_node(vec![], vec![]);
460        engine.create_edge(s1, t1a, knows_type, vec![]).unwrap();
461        engine.create_edge(s1, t1b, knows_type, vec![]).unwrap();
462        engine.create_edge(s1, t1c, knows_type, vec![]).unwrap();
463
464        // Source 2: 0 edges (no matches)
465        let s2 = engine.create_node(vec![], vec![]);
466
467        // Source 3: 1 edge
468        let s3 = engine.create_node(vec![], vec![]);
469        let t3a = engine.create_node(vec![], vec![]);
470        engine.create_edge(s3, t3a, knows_type, vec![]).unwrap();
471
472        let mut rec1 = Record::new();
473        rec1.insert("n".to_string(), Value::Node(s1));
474        rec1.insert("tag".to_string(), Value::Int64(1));
475
476        let mut rec2 = Record::new();
477        rec2.insert("n".to_string(), Value::Node(s2));
478        rec2.insert("tag".to_string(), Value::Int64(2));
479
480        let mut rec3 = Record::new();
481        rec3.insert("n".to_string(), Value::Node(s3));
482        rec3.insert("tag".to_string(), Value::Int64(3));
483
484        let results = execute_expand(
485            vec![rec1, rec2, rec3],
486            "n",
487            Some("r"),
488            "m",
489            Some(knows_type),
490            &RelDirection::Outgoing,
491            &engine,
492            None,
493        );
494
495        // 3 from s1 + 0 from s2 + 1 from s3 = 4 results
496        assert_eq!(results.len(), 4);
497
498        // First 3 results from s1 have tag=1
499        let s1_results: Vec<_> = results
500            .iter()
501            .filter(|r| r.get("tag") == Some(&Value::Int64(1)))
502            .collect();
503        assert_eq!(s1_results.len(), 3);
504
505        // No results with tag=2 (s2 had no edges)
506        let s2_results: Vec<_> = results
507            .iter()
508            .filter(|r| r.get("tag") == Some(&Value::Int64(2)))
509            .collect();
510        assert_eq!(s2_results.len(), 0);
511
512        // 1 result from s3 with tag=3
513        let s3_results: Vec<_> = results
514            .iter()
515            .filter(|r| r.get("tag") == Some(&Value::Int64(3)))
516            .collect();
517        assert_eq!(s3_results.len(), 1);
518        assert_eq!(s3_results[0].get("m"), Some(&Value::Node(t3a)));
519    }
520
521    /// Zero matching edges: no results should be produced.
522    #[test]
523    fn test_expand_zero_matching_edges_no_results() {
524        let dir = tempdir().expect("tempdir");
525        let mut engine = test_engine(dir.path());
526
527        let knows_type = engine.get_or_create_rel_type("KNOWS");
528        let likes_type = engine.get_or_create_rel_type("LIKES");
529
530        let n1 = engine.create_node(vec![], vec![]);
531        let n2 = engine.create_node(vec![], vec![]);
532        // Edge is LIKES, but we query for KNOWS
533        engine.create_edge(n1, n2, likes_type, vec![]).unwrap();
534
535        let mut source = Record::new();
536        source.insert("a".to_string(), Value::Node(n1));
537        source.insert("data".to_string(), Value::Int64(7));
538
539        let results = execute_expand(
540            vec![source],
541            "a",
542            Some("r"),
543            "b",
544            Some(knows_type),
545            &RelDirection::Outgoing,
546            &engine,
547            None,
548        );
549
550        assert!(results.is_empty());
551    }
552
553    // ── Hypergraph :INVOLVES virtual expansion tests ───────────────────
554    #[cfg(feature = "hypergraph")]
555    mod involves_tests {
556        use super::*;
557
558        #[test]
559        fn test_involves_expands_to_source_and_target_nodes() {
560            let dir = tempdir().expect("tempdir");
561            let mut engine = test_engine(dir.path());
562
563            let involves_type = engine.get_or_create_rel_type("INVOLVES");
564            let n1 = engine.create_node(vec![], vec![]);
565            let n2 = engine.create_node(vec![], vec![]);
566            let n3 = engine.create_node(vec![], vec![]);
567
568            use cypherlite_core::GraphEntity;
569            engine.create_hyperedge(
570                involves_type,
571                vec![GraphEntity::Node(n1)],
572                vec![GraphEntity::Node(n2), GraphEntity::Node(n3)],
573                vec![],
574            );
575
576            // Source record: he bound to the hyperedge
577            let he_id = cypherlite_core::HyperEdgeId(1);
578            let mut source = Record::new();
579            source.insert("he".to_string(), Value::Hyperedge(he_id));
580
581            let results = execute_expand(
582                vec![source],
583                "he",
584                Some("r"),
585                "n",
586                Some(involves_type),
587                &RelDirection::Outgoing,
588                &engine,
589                None,
590            );
591
592            // Should return all sources + targets = 3 nodes
593            assert_eq!(results.len(), 3);
594            for r in &results {
595                assert!(r.contains_key("he"));
596                assert!(r.contains_key("n"));
597                // Virtual edge: rel_var is Null
598                assert_eq!(r.get("r"), Some(&Value::Null));
599                assert!(matches!(r.get("n"), Some(Value::Node(_))));
600            }
601        }
602
603        #[test]
604        fn test_involves_no_matching_type() {
605            let dir = tempdir().expect("tempdir");
606            let mut engine = test_engine(dir.path());
607
608            let involves_type = engine.get_or_create_rel_type("INVOLVES");
609            let other_type = engine.get_or_create_rel_type("OTHER");
610            let n1 = engine.create_node(vec![], vec![]);
611
612            use cypherlite_core::GraphEntity;
613            engine.create_hyperedge(involves_type, vec![GraphEntity::Node(n1)], vec![], vec![]);
614
615            let he_id = cypherlite_core::HyperEdgeId(1);
616            let mut source = Record::new();
617            source.insert("he".to_string(), Value::Hyperedge(he_id));
618
619            // Ask for OTHER type, but hyperedge is INVOLVES -> mismatch
620            let results = execute_expand(
621                vec![source],
622                "he",
623                None,
624                "n",
625                Some(other_type),
626                &RelDirection::Outgoing,
627                &engine,
628                None,
629            );
630
631            assert!(results.is_empty());
632        }
633
634        #[test]
635        fn test_involves_empty_hyperedge() {
636            let dir = tempdir().expect("tempdir");
637            let mut engine = test_engine(dir.path());
638
639            let involves_type = engine.get_or_create_rel_type("INVOLVES");
640
641            engine.create_hyperedge(involves_type, vec![], vec![], vec![]);
642
643            let he_id = cypherlite_core::HyperEdgeId(1);
644            let mut source = Record::new();
645            source.insert("he".to_string(), Value::Hyperedge(he_id));
646
647            let results = execute_expand(
648                vec![source],
649                "he",
650                None,
651                "n",
652                Some(involves_type),
653                &RelDirection::Outgoing,
654                &engine,
655                None,
656            );
657
658            assert!(results.is_empty());
659        }
660
661        #[test]
662        fn test_involves_no_rel_type_filter_matches_all() {
663            let dir = tempdir().expect("tempdir");
664            let mut engine = test_engine(dir.path());
665
666            let involves_type = engine.get_or_create_rel_type("INVOLVES");
667            let n1 = engine.create_node(vec![], vec![]);
668
669            use cypherlite_core::GraphEntity;
670            engine.create_hyperedge(involves_type, vec![GraphEntity::Node(n1)], vec![], vec![]);
671
672            let he_id = cypherlite_core::HyperEdgeId(1);
673            let mut source = Record::new();
674            source.insert("he".to_string(), Value::Hyperedge(he_id));
675
676            // No rel_type filter (None) -> should still expand all participants
677            let results = execute_expand(
678                vec![source],
679                "he",
680                None,
681                "n",
682                None,
683                &RelDirection::Outgoing,
684                &engine,
685                None,
686            );
687
688            assert_eq!(results.len(), 1);
689            assert_eq!(results[0].get("n"), Some(&Value::Node(n1)));
690        }
691    }
692}