use crate::executor::operators::temporal_filter::{is_edge_temporally_valid, TemporalFilter};
use crate::executor::{Record, Value};
use crate::parser::ast::RelDirection;
use cypherlite_core::NodeId;
use cypherlite_storage::StorageEngine;
use std::collections::HashSet;
#[allow(clippy::too_many_arguments)]
pub fn execute_var_length_expand(
source_records: Vec<Record>,
src_var: &str,
rel_var: Option<&str>,
target_var: &str,
rel_type_id: Option<u32>,
direction: &RelDirection,
min_hops: u32,
max_hops: u32,
engine: &StorageEngine,
temporal_filter: Option<&TemporalFilter>,
) -> Vec<Record> {
let mut results = Vec::new();
for record in &source_records {
let src_node_id = match record.get(src_var) {
Some(Value::Node(nid)) => *nid,
_ => continue,
};
if min_hops == 0 {
let mut new_record = record.clone();
if let Some(rv) = rel_var {
new_record.insert(rv.to_string(), Value::Null);
}
new_record.insert(target_var.to_string(), Value::Node(src_node_id));
results.push(new_record);
}
let mut visited = HashSet::new();
visited.insert(src_node_id);
dfs(
src_node_id,
1, min_hops,
max_hops,
&mut visited,
rel_type_id,
direction,
record,
rel_var,
target_var,
engine,
temporal_filter,
&mut results,
);
}
results
}
#[allow(clippy::too_many_arguments)]
fn dfs(
current_node: NodeId,
depth: u32,
min_hops: u32,
max_hops: u32,
visited: &mut HashSet<NodeId>,
rel_type_id: Option<u32>,
direction: &RelDirection,
base_record: &Record,
rel_var: Option<&str>,
target_var: &str,
engine: &StorageEngine,
temporal_filter: Option<&TemporalFilter>,
results: &mut Vec<Record>,
) {
if depth > max_hops {
return;
}
let edges = engine.get_edges_for_node(current_node);
for edge in &edges {
if let Some(tid) = rel_type_id {
if edge.rel_type_id != tid {
continue;
}
}
if let Some(tf) = temporal_filter {
if !is_edge_temporally_valid(edge.edge_id, tf, engine) {
continue;
}
}
let target_node_id: Option<NodeId> = match direction {
RelDirection::Outgoing => {
if edge.start_node == current_node {
Some(edge.end_node)
} else {
None
}
}
RelDirection::Incoming => {
if edge.end_node == current_node {
Some(edge.start_node)
} else {
None
}
}
RelDirection::Undirected => {
if edge.start_node == current_node {
Some(edge.end_node)
} else if edge.end_node == current_node {
Some(edge.start_node)
} else {
None
}
}
};
if let Some(target_id) = target_node_id {
if visited.contains(&target_id) {
continue;
}
if depth >= min_hops {
let mut new_record = base_record.clone();
if let Some(rv) = rel_var {
new_record.insert(rv.to_string(), Value::Edge(edge.edge_id));
}
new_record.insert(target_var.to_string(), Value::Node(target_id));
results.push(new_record);
}
if depth < max_hops {
visited.insert(target_id);
dfs(
target_id,
depth + 1,
min_hops,
max_hops,
visited,
rel_type_id,
direction,
base_record,
rel_var,
target_var,
engine,
temporal_filter,
results,
);
visited.remove(&target_id);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::executor::Record;
use cypherlite_core::{DatabaseConfig, LabelRegistry, SyncMode};
use cypherlite_storage::StorageEngine;
use tempfile::tempdir;
fn test_engine(dir: &std::path::Path) -> StorageEngine {
let config = DatabaseConfig {
path: dir.join("test.cyl"),
wal_sync_mode: SyncMode::Normal,
..Default::default()
};
StorageEngine::open(config).expect("open")
}
fn create_linear_chain(engine: &mut StorageEngine, rel_type: u32, count: usize) -> Vec<NodeId> {
let nodes: Vec<NodeId> = (0..count)
.map(|_| engine.create_node(vec![], vec![]))
.collect();
for i in 0..count - 1 {
engine
.create_edge(nodes[i], nodes[i + 1], rel_type, vec![])
.expect("edge");
}
nodes
}
#[test]
fn test_var_expand_1_hop() {
let dir = tempdir().expect("tempdir");
let mut engine = test_engine(dir.path());
let knows = engine.get_or_create_rel_type("KNOWS");
let nodes = create_linear_chain(&mut engine, knows, 3);
let mut source = Record::new();
source.insert("a".to_string(), Value::Node(nodes[0]));
let results = execute_var_length_expand(
vec![source],
"a",
Some("r"),
"b",
Some(knows),
&RelDirection::Outgoing,
1,
1,
&engine,
None,
);
assert_eq!(results.len(), 1);
assert_eq!(results[0].get("b"), Some(&Value::Node(nodes[1])));
}
#[test]
fn test_var_expand_1_to_2_hops() {
let dir = tempdir().expect("tempdir");
let mut engine = test_engine(dir.path());
let knows = engine.get_or_create_rel_type("KNOWS");
let nodes = create_linear_chain(&mut engine, knows, 4);
let mut source = Record::new();
source.insert("a".to_string(), Value::Node(nodes[0]));
let results = execute_var_length_expand(
vec![source],
"a",
None,
"b",
Some(knows),
&RelDirection::Outgoing,
1,
2,
&engine,
None,
);
assert_eq!(results.len(), 2);
let targets: Vec<&Value> = results.iter().filter_map(|r| r.get("b")).collect();
assert!(targets.contains(&&Value::Node(nodes[1])));
assert!(targets.contains(&&Value::Node(nodes[2])));
}
#[test]
fn test_var_expand_1_to_3_hops_linear() {
let dir = tempdir().expect("tempdir");
let mut engine = test_engine(dir.path());
let knows = engine.get_or_create_rel_type("KNOWS");
let nodes = create_linear_chain(&mut engine, knows, 5);
let mut source = Record::new();
source.insert("a".to_string(), Value::Node(nodes[0]));
let results = execute_var_length_expand(
vec![source],
"a",
None,
"b",
Some(knows),
&RelDirection::Outgoing,
1,
3,
&engine,
None,
);
assert_eq!(results.len(), 3);
}
#[test]
fn test_var_expand_no_type_filter() {
let dir = tempdir().expect("tempdir");
let mut engine = test_engine(dir.path());
let knows = engine.get_or_create_rel_type("KNOWS");
let likes = engine.get_or_create_rel_type("LIKES");
let n0 = engine.create_node(vec![], vec![]);
let n1 = engine.create_node(vec![], vec![]);
let n2 = engine.create_node(vec![], vec![]);
engine.create_edge(n0, n1, knows, vec![]).expect("edge");
engine.create_edge(n1, n2, likes, vec![]).expect("edge");
let mut source = Record::new();
source.insert("a".to_string(), Value::Node(n0));
let results = execute_var_length_expand(
vec![source],
"a",
None,
"b",
None, &RelDirection::Outgoing,
1,
2,
&engine,
None,
);
assert_eq!(results.len(), 2);
}
#[test]
fn test_var_expand_with_type_filter() {
let dir = tempdir().expect("tempdir");
let mut engine = test_engine(dir.path());
let knows = engine.get_or_create_rel_type("KNOWS");
let likes = engine.get_or_create_rel_type("LIKES");
let n0 = engine.create_node(vec![], vec![]);
let n1 = engine.create_node(vec![], vec![]);
let n2 = engine.create_node(vec![], vec![]);
engine.create_edge(n0, n1, knows, vec![]).expect("edge");
engine.create_edge(n1, n2, likes, vec![]).expect("edge");
let mut source = Record::new();
source.insert("a".to_string(), Value::Node(n0));
let results = execute_var_length_expand(
vec![source],
"a",
None,
"b",
Some(knows),
&RelDirection::Outgoing,
1,
3,
&engine,
None,
);
assert_eq!(results.len(), 1);
assert_eq!(results[0].get("b"), Some(&Value::Node(n1)));
}
#[test]
fn test_var_expand_incoming() {
let dir = tempdir().expect("tempdir");
let mut engine = test_engine(dir.path());
let knows = engine.get_or_create_rel_type("KNOWS");
let nodes = create_linear_chain(&mut engine, knows, 4);
let mut source = Record::new();
source.insert("a".to_string(), Value::Node(nodes[3]));
let results = execute_var_length_expand(
vec![source],
"a",
None,
"b",
Some(knows),
&RelDirection::Incoming,
1,
2,
&engine,
None,
);
assert_eq!(results.len(), 2);
}
#[test]
fn test_var_expand_empty_source() {
let dir = tempdir().expect("tempdir");
let engine = test_engine(dir.path());
let results = execute_var_length_expand(
vec![],
"a",
None,
"b",
None,
&RelDirection::Outgoing,
1,
3,
&engine,
None,
);
assert!(results.is_empty());
}
#[test]
fn test_var_expand_no_edges() {
let dir = tempdir().expect("tempdir");
let mut engine = test_engine(dir.path());
let n0 = engine.create_node(vec![], vec![]);
let mut source = Record::new();
source.insert("a".to_string(), Value::Node(n0));
let results = execute_var_length_expand(
vec![source],
"a",
None,
"b",
None,
&RelDirection::Outgoing,
1,
3,
&engine,
None,
);
assert!(results.is_empty());
}
#[test]
fn test_var_expand_rel_var_binding() {
let dir = tempdir().expect("tempdir");
let mut engine = test_engine(dir.path());
let knows = engine.get_or_create_rel_type("KNOWS");
let nodes = create_linear_chain(&mut engine, knows, 3);
let mut source = Record::new();
source.insert("a".to_string(), Value::Node(nodes[0]));
let results = execute_var_length_expand(
vec![source],
"a",
Some("r"),
"b",
Some(knows),
&RelDirection::Outgoing,
1,
2,
&engine,
None,
);
for r in &results {
assert!(r.contains_key("r"));
assert!(matches!(r.get("r"), Some(Value::Edge(_))));
}
}
#[test]
fn test_var_expand_branching() {
let dir = tempdir().expect("tempdir");
let mut engine = test_engine(dir.path());
let knows = engine.get_or_create_rel_type("KNOWS");
let n0 = engine.create_node(vec![], vec![]);
let n1 = engine.create_node(vec![], vec![]);
let n2 = engine.create_node(vec![], vec![]);
let n3 = engine.create_node(vec![], vec![]);
engine.create_edge(n0, n1, knows, vec![]).expect("edge");
engine.create_edge(n0, n2, knows, vec![]).expect("edge");
engine.create_edge(n1, n3, knows, vec![]).expect("edge");
engine.create_edge(n2, n3, knows, vec![]).expect("edge");
let mut source = Record::new();
source.insert("a".to_string(), Value::Node(n0));
let results = execute_var_length_expand(
vec![source],
"a",
None,
"b",
Some(knows),
&RelDirection::Outgoing,
1,
2,
&engine,
None,
);
assert_eq!(results.len(), 4);
}
#[test]
fn test_var_expand_cycle_detection_simple() {
let dir = tempdir().expect("tempdir");
let mut engine = test_engine(dir.path());
let knows = engine.get_or_create_rel_type("KNOWS");
let n0 = engine.create_node(vec![], vec![]);
let n1 = engine.create_node(vec![], vec![]);
let n2 = engine.create_node(vec![], vec![]);
engine.create_edge(n0, n1, knows, vec![]).expect("edge");
engine.create_edge(n1, n2, knows, vec![]).expect("edge");
engine.create_edge(n2, n0, knows, vec![]).expect("edge");
let mut source = Record::new();
source.insert("a".to_string(), Value::Node(n0));
let results = execute_var_length_expand(
vec![source],
"a",
None,
"b",
Some(knows),
&RelDirection::Outgoing,
1,
10,
&engine,
None,
);
assert_eq!(results.len(), 2);
}
#[test]
fn test_var_expand_cycle_detection_self_loop() {
let dir = tempdir().expect("tempdir");
let mut engine = test_engine(dir.path());
let knows = engine.get_or_create_rel_type("KNOWS");
let n0 = engine.create_node(vec![], vec![]);
let n1 = engine.create_node(vec![], vec![]);
engine.create_edge(n0, n1, knows, vec![]).expect("edge");
engine.create_edge(n1, n1, knows, vec![]).expect("edge");
let mut source = Record::new();
source.insert("a".to_string(), Value::Node(n0));
let results = execute_var_length_expand(
vec![source],
"a",
None,
"b",
Some(knows),
&RelDirection::Outgoing,
1,
5,
&engine,
None,
);
assert_eq!(results.len(), 1);
}
#[test]
fn test_var_expand_cycle_different_paths_allowed() {
let dir = tempdir().expect("tempdir");
let mut engine = test_engine(dir.path());
let knows = engine.get_or_create_rel_type("KNOWS");
let n0 = engine.create_node(vec![], vec![]);
let n1 = engine.create_node(vec![], vec![]);
let n2 = engine.create_node(vec![], vec![]);
let n3 = engine.create_node(vec![], vec![]);
engine.create_edge(n0, n1, knows, vec![]).expect("edge");
engine.create_edge(n0, n2, knows, vec![]).expect("edge");
engine.create_edge(n1, n3, knows, vec![]).expect("edge");
engine.create_edge(n2, n3, knows, vec![]).expect("edge");
let mut source = Record::new();
source.insert("a".to_string(), Value::Node(n0));
let results = execute_var_length_expand(
vec![source],
"a",
None,
"b",
Some(knows),
&RelDirection::Outgoing,
2,
2,
&engine,
None,
);
assert_eq!(results.len(), 2);
for r in &results {
assert_eq!(r.get("b"), Some(&Value::Node(n3)));
}
}
#[test]
fn test_var_expand_undirected_cycle() {
let dir = tempdir().expect("tempdir");
let mut engine = test_engine(dir.path());
let knows = engine.get_or_create_rel_type("KNOWS");
let n0 = engine.create_node(vec![], vec![]);
let n1 = engine.create_node(vec![], vec![]);
engine.create_edge(n0, n1, knows, vec![]).expect("edge");
let mut source = Record::new();
source.insert("a".to_string(), Value::Node(n0));
let results = execute_var_length_expand(
vec![source],
"a",
None,
"b",
Some(knows),
&RelDirection::Undirected,
1,
5,
&engine,
None,
);
assert_eq!(results.len(), 1);
assert_eq!(results[0].get("b"), Some(&Value::Node(n1)));
}
#[test]
fn test_var_expand_exact_2_hop() {
let dir = tempdir().expect("tempdir");
let mut engine = test_engine(dir.path());
let knows = engine.get_or_create_rel_type("KNOWS");
let nodes = create_linear_chain(&mut engine, knows, 4);
let mut source = Record::new();
source.insert("a".to_string(), Value::Node(nodes[0]));
let results = execute_var_length_expand(
vec![source],
"a",
None,
"b",
Some(knows),
&RelDirection::Outgoing,
2,
2,
&engine,
None,
);
assert_eq!(results.len(), 1);
assert_eq!(results[0].get("b"), Some(&Value::Node(nodes[2])));
}
#[test]
fn test_var_expand_zero_min_hops() {
let dir = tempdir().expect("tempdir");
let mut engine = test_engine(dir.path());
let knows = engine.get_or_create_rel_type("KNOWS");
let n0 = engine.create_node(vec![], vec![]);
let n1 = engine.create_node(vec![], vec![]);
engine.create_edge(n0, n1, knows, vec![]).expect("edge");
let mut source = Record::new();
source.insert("a".to_string(), Value::Node(n0));
let results = execute_var_length_expand(
vec![source],
"a",
Some("r"),
"b",
Some(knows),
&RelDirection::Outgoing,
0,
1,
&engine,
None,
);
assert_eq!(results.len(), 2);
let zero_hop: Vec<_> = results
.iter()
.filter(|r| r.get("b") == Some(&Value::Node(n0)))
.collect();
assert_eq!(zero_hop.len(), 1);
assert_eq!(zero_hop[0].get("r"), Some(&Value::Null));
let one_hop: Vec<_> = results
.iter()
.filter(|r| r.get("b") == Some(&Value::Node(n1)))
.collect();
assert_eq!(one_hop.len(), 1);
}
#[test]
fn test_var_expand_zero_min_no_edges() {
let dir = tempdir().expect("tempdir");
let mut engine = test_engine(dir.path());
let n0 = engine.create_node(vec![], vec![]);
let mut source = Record::new();
source.insert("a".to_string(), Value::Node(n0));
let results = execute_var_length_expand(
vec![source],
"a",
None,
"b",
None,
&RelDirection::Outgoing,
0,
1,
&engine,
None,
);
assert_eq!(results.len(), 1);
assert_eq!(results[0].get("b"), Some(&Value::Node(n0)));
}
#[test]
fn test_var_expand_multiple_sources() {
let dir = tempdir().expect("tempdir");
let mut engine = test_engine(dir.path());
let knows = engine.get_or_create_rel_type("KNOWS");
let n0 = engine.create_node(vec![], vec![]);
let n1 = engine.create_node(vec![], vec![]);
let n2 = engine.create_node(vec![], vec![]);
engine.create_edge(n0, n1, knows, vec![]).expect("edge");
engine.create_edge(n2, n1, knows, vec![]).expect("edge");
let mut s0 = Record::new();
s0.insert("a".to_string(), Value::Node(n0));
let mut s1 = Record::new();
s1.insert("a".to_string(), Value::Node(n2));
let results = execute_var_length_expand(
vec![s0, s1],
"a",
None,
"b",
Some(knows),
&RelDirection::Outgoing,
1,
1,
&engine,
None,
);
assert_eq!(results.len(), 2);
}
#[test]
fn test_var_expand_preserves_source_vars() {
let dir = tempdir().expect("tempdir");
let mut engine = test_engine(dir.path());
let knows = engine.get_or_create_rel_type("KNOWS");
let n0 = engine.create_node(vec![], vec![]);
let n1 = engine.create_node(vec![], vec![]);
engine.create_edge(n0, n1, knows, vec![]).expect("edge");
let mut source = Record::new();
source.insert("a".to_string(), Value::Node(n0));
source.insert("extra".to_string(), Value::Int64(42));
let results = execute_var_length_expand(
vec![source],
"a",
None,
"b",
Some(knows),
&RelDirection::Outgoing,
1,
1,
&engine,
None,
);
assert_eq!(results.len(), 1);
assert_eq!(results[0].get("extra"), Some(&Value::Int64(42)));
}
}