use super::{Operator, OperatorError, OperatorResult};
use crate::execution::DataChunk;
use crate::graph::Direction;
use crate::graph::GraphStore;
use grafeo_common::types::{EdgeId, EpochId, LogicalType, NodeId, TransactionId};
use std::collections::VecDeque;
use std::rc::Rc;
use std::sync::Arc;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum PathMode {
#[default]
Walk,
Trail,
Simple,
Acyclic,
}
#[allow(clippy::struct_excessive_bools)]
pub struct VariableLengthExpandOperator {
store: Arc<dyn GraphStore>,
input: Box<dyn Operator>,
source_column: usize,
direction: Direction,
edge_types: Vec<String>,
min_hops: u32,
max_hops: u32,
chunk_capacity: usize,
transaction_id: Option<TransactionId>,
viewing_epoch: Option<EpochId>,
read_only: bool,
input_rows: Option<Vec<InputRow>>,
current_input_idx: usize,
output_buffer: Vec<OutputRow>,
exhausted: bool,
output_path_length: bool,
output_path_detail: bool,
path_mode: PathMode,
}
struct InputRow {
columns: Vec<ColumnValue>,
source_node: NodeId,
}
#[derive(Clone)]
enum ColumnValue {
NodeId(NodeId),
EdgeId(EdgeId),
Value(grafeo_common::types::Value),
}
struct OutputRow {
input_idx: usize,
edge_id: Option<EdgeId>,
target_id: NodeId,
path_length: u32,
path_nodes: Option<Vec<NodeId>>,
path_edges: Option<Vec<EdgeId>>,
}
struct PathSegment {
node: NodeId,
edge: Option<EdgeId>,
parent: Option<Rc<PathSegment>>,
}
impl PathSegment {
fn collect_nodes(&self, depth: u32) -> Vec<NodeId> {
let mut nodes = Vec::with_capacity(depth as usize + 1);
self.collect_nodes_into(&mut nodes);
nodes
}
fn collect_nodes_into(&self, nodes: &mut Vec<NodeId>) {
if let Some(parent) = &self.parent {
parent.collect_nodes_into(nodes);
}
nodes.push(self.node);
}
fn collect_edges(&self, depth: u32) -> Vec<EdgeId> {
let mut edges = Vec::with_capacity(depth as usize);
self.collect_edges_into(&mut edges);
edges
}
fn collect_edges_into(&self, edges: &mut Vec<EdgeId>) {
if let Some(parent) = &self.parent {
parent.collect_edges_into(edges);
}
if let Some(edge) = self.edge {
edges.push(edge);
}
}
fn contains_node(&self, target: NodeId) -> bool {
if self.node == target {
return true;
}
if let Some(parent) = &self.parent {
return parent.contains_node(target);
}
false
}
fn contains_edge(&self, target: EdgeId) -> bool {
if self.edge == Some(target) {
return true;
}
if let Some(parent) = &self.parent {
return parent.contains_edge(target);
}
false
}
}
impl VariableLengthExpandOperator {
pub fn new(
store: Arc<dyn GraphStore>,
input: Box<dyn Operator>,
source_column: usize,
direction: Direction,
edge_types: Vec<String>,
min_hops: u32,
max_hops: u32,
) -> Self {
Self {
store,
input,
source_column,
direction,
edge_types,
min_hops,
max_hops: max_hops.max(min_hops), chunk_capacity: 2048,
transaction_id: None,
viewing_epoch: None,
read_only: false,
input_rows: None,
current_input_idx: 0,
output_buffer: Vec::new(),
exhausted: false,
output_path_length: false,
output_path_detail: false,
path_mode: PathMode::Walk,
}
}
pub fn with_path_mode(mut self, mode: PathMode) -> Self {
self.path_mode = mode;
self
}
pub fn with_path_length_output(mut self) -> Self {
self.output_path_length = true;
self
}
pub fn with_path_detail_output(mut self) -> Self {
self.output_path_detail = true;
self
}
pub fn with_chunk_capacity(mut self, capacity: usize) -> Self {
self.chunk_capacity = capacity;
self
}
pub fn with_transaction_context(
mut self,
epoch: EpochId,
transaction_id: Option<TransactionId>,
) -> Self {
self.viewing_epoch = Some(epoch);
self.transaction_id = transaction_id;
self
}
pub fn with_read_only(mut self, read_only: bool) -> Self {
self.read_only = read_only;
self
}
fn materialize_input(&mut self) -> Result<(), OperatorError> {
let mut rows = Vec::new();
while let Some(mut chunk) = self.input.next()? {
chunk.flatten();
for row_idx in 0..chunk.row_count() {
let col = chunk.column(self.source_column).ok_or_else(|| {
OperatorError::ColumnNotFound(format!(
"Column {} not found",
self.source_column
))
})?;
let source_node = col.get_node_id(row_idx).ok_or_else(|| {
OperatorError::Execution("Expected node ID in source column".into())
})?;
let mut columns = Vec::with_capacity(chunk.column_count());
for col_idx in 0..chunk.column_count() {
let col = chunk
.column(col_idx)
.expect("col_idx within column_count range");
let value = if let Some(node_id) = col.get_node_id(row_idx) {
ColumnValue::NodeId(node_id)
} else if let Some(edge_id) = col.get_edge_id(row_idx) {
ColumnValue::EdgeId(edge_id)
} else if let Some(val) = col.get_value(row_idx) {
ColumnValue::Value(val)
} else {
ColumnValue::Value(grafeo_common::types::Value::Null)
};
columns.push(value);
}
rows.push(InputRow {
columns,
source_node,
});
}
}
self.input_rows = Some(rows);
Ok(())
}
fn get_edges(&self, node_id: NodeId) -> Vec<(NodeId, EdgeId)> {
let epoch = self.viewing_epoch;
let transaction_id = self.transaction_id;
let use_versioned = !self.read_only;
self.store
.edges_from(node_id, self.direction)
.into_iter()
.filter(|(target_id, edge_id)| {
let type_matches = if self.edge_types.is_empty() {
true
} else if let Some(actual_type) = self.store.edge_type(*edge_id) {
self.edge_types
.iter()
.any(|t| actual_type.as_str().eq_ignore_ascii_case(t.as_str()))
} else {
false
};
if !type_matches {
return false;
}
if let Some(epoch) = epoch {
if use_versioned && let Some(tx) = transaction_id {
self.store.is_edge_visible_versioned(*edge_id, epoch, tx)
&& self.store.is_node_visible_versioned(*target_id, epoch, tx)
} else {
self.store.is_edge_visible_at_epoch(*edge_id, epoch)
&& self.store.is_node_visible_at_epoch(*target_id, epoch)
}
} else {
true
}
})
.collect()
}
fn is_expansion_allowed(
&self,
segment: &PathSegment,
target: NodeId,
edge_id: EdgeId,
source_node: NodeId,
) -> bool {
match self.path_mode {
PathMode::Walk => true,
PathMode::Trail => !segment.contains_edge(edge_id),
PathMode::Simple => {
target == source_node || !segment.contains_node(target)
}
PathMode::Acyclic => !segment.contains_node(target),
}
}
fn process_input_row(&self, input_idx: usize, source_node: NodeId) -> Vec<OutputRow> {
let mut results = Vec::new();
let needs_tracking = self.output_path_detail || self.path_mode != PathMode::Walk;
if self.min_hops == 0 {
results.push(OutputRow {
input_idx,
edge_id: None,
target_id: source_node,
path_length: 0,
path_nodes: if self.output_path_detail {
Some(vec![source_node])
} else {
None
},
path_edges: if self.output_path_detail {
Some(Vec::new())
} else {
None
},
});
}
if needs_tracking {
let mut frontier: VecDeque<(NodeId, u32, EdgeId, Rc<PathSegment>)> = VecDeque::new();
let root = Rc::new(PathSegment {
node: source_node,
edge: None,
parent: None,
});
for (target, edge_id) in self.get_edges(source_node) {
if !self.is_expansion_allowed(&root, target, edge_id, source_node) {
continue;
}
let segment = Rc::new(PathSegment {
node: target,
edge: Some(edge_id),
parent: Some(Rc::clone(&root)),
});
frontier.push_back((target, 1, edge_id, segment));
}
while let Some((current_node, depth, edge_id, segment)) = frontier.pop_front() {
if depth >= self.min_hops && depth <= self.max_hops {
results.push(OutputRow {
input_idx,
edge_id: Some(edge_id),
target_id: current_node,
path_length: depth,
path_nodes: if self.output_path_detail {
Some(segment.collect_nodes(depth))
} else {
None
},
path_edges: if self.output_path_detail {
Some(segment.collect_edges(depth))
} else {
None
},
});
}
if depth < self.max_hops {
for (target, next_edge_id) in self.get_edges(current_node) {
if !self.is_expansion_allowed(&segment, target, next_edge_id, source_node) {
continue;
}
let new_segment = Rc::new(PathSegment {
node: target,
edge: Some(next_edge_id),
parent: Some(Rc::clone(&segment)),
});
frontier.push_back((target, depth + 1, next_edge_id, new_segment));
}
}
}
} else {
let mut frontier: VecDeque<(NodeId, u32, EdgeId)> = VecDeque::new();
for (target, edge_id) in self.get_edges(source_node) {
frontier.push_back((target, 1, edge_id));
}
while let Some((current_node, depth, edge_id)) = frontier.pop_front() {
if depth >= self.min_hops && depth <= self.max_hops {
results.push(OutputRow {
input_idx,
edge_id: Some(edge_id),
target_id: current_node,
path_length: depth,
path_nodes: None,
path_edges: None,
});
}
if depth < self.max_hops {
for (target, next_edge_id) in self.get_edges(current_node) {
frontier.push_back((target, depth + 1, next_edge_id));
}
}
}
}
results
}
fn fill_output_buffer(&mut self) {
let Some(input_rows) = &self.input_rows else {
return;
};
while self.output_buffer.is_empty() && self.current_input_idx < input_rows.len() {
let source_node = input_rows[self.current_input_idx].source_node;
let results = self.process_input_row(self.current_input_idx, source_node);
self.output_buffer.extend(results);
self.current_input_idx += 1;
}
}
}
impl Operator for VariableLengthExpandOperator {
fn next(&mut self) -> OperatorResult {
if self.exhausted {
return Ok(None);
}
if self.input_rows.is_none() {
self.materialize_input()?;
if self.input_rows.as_ref().map_or(true, Vec::is_empty) {
self.exhausted = true;
return Ok(None);
}
}
self.fill_output_buffer();
if self.output_buffer.is_empty() {
self.exhausted = true;
return Ok(None);
}
let input_rows = self
.input_rows
.as_ref()
.expect("input_rows is Some: populated during BFS");
let num_input_cols = input_rows.first().map_or(0, |r| r.columns.len());
let extra_cols =
2 + usize::from(self.output_path_length) + usize::from(self.output_path_detail) * 3;
let mut schema: Vec<LogicalType> = Vec::with_capacity(num_input_cols + extra_cols);
if let Some(first_row) = input_rows.first() {
for col_val in &first_row.columns {
let ty = match col_val {
ColumnValue::NodeId(_) => LogicalType::Node,
ColumnValue::EdgeId(_) => LogicalType::Edge,
ColumnValue::Value(_) => LogicalType::Any,
};
schema.push(ty);
}
}
schema.push(LogicalType::Edge);
schema.push(LogicalType::Node);
if self.output_path_length {
schema.push(LogicalType::Int64);
}
if self.output_path_detail {
schema.push(LogicalType::Any); schema.push(LogicalType::Any); schema.push(LogicalType::Any); }
let mut chunk = DataChunk::with_capacity(&schema, self.chunk_capacity);
let take_count = self.output_buffer.len().min(self.chunk_capacity);
let to_output: Vec<_> = self.output_buffer.drain(..take_count).collect();
for out_row in &to_output {
let input_row = &input_rows[out_row.input_idx];
for (col_idx, col_val) in input_row.columns.iter().enumerate() {
if let Some(out_col) = chunk.column_mut(col_idx) {
match col_val {
ColumnValue::NodeId(id) => out_col.push_node_id(*id),
ColumnValue::EdgeId(id) => out_col.push_edge_id(*id),
ColumnValue::Value(v) => out_col.push_value(v.clone()),
}
}
}
if let Some(col) = chunk.column_mut(num_input_cols) {
if let Some(edge_id) = out_row.edge_id {
col.push_edge_id(edge_id);
} else {
col.push_value(grafeo_common::types::Value::Null);
}
}
if let Some(col) = chunk.column_mut(num_input_cols + 1) {
col.push_node_id(out_row.target_id);
}
if self.output_path_length
&& let Some(col) = chunk.column_mut(num_input_cols + 2)
{
col.push_value(grafeo_common::types::Value::Int64(i64::from(
out_row.path_length,
)));
}
if self.output_path_detail {
let base = num_input_cols + 2 + usize::from(self.output_path_length);
if let Some(col) = chunk.column_mut(base) {
let nodes_list: Vec<grafeo_common::types::Value> = out_row
.path_nodes
.as_deref()
.unwrap_or(&[])
.iter()
.map(|id| grafeo_common::types::Value::Int64(id.0 as i64))
.collect();
col.push_value(grafeo_common::types::Value::List(nodes_list.into()));
}
if let Some(col) = chunk.column_mut(base + 1) {
let edges_list: Vec<grafeo_common::types::Value> = out_row
.path_edges
.as_deref()
.unwrap_or(&[])
.iter()
.map(|id| grafeo_common::types::Value::Int64(id.0 as i64))
.collect();
col.push_value(grafeo_common::types::Value::List(edges_list.into()));
}
if let Some(col) = chunk.column_mut(base + 2) {
let nodes: Vec<grafeo_common::types::Value> = out_row
.path_nodes
.as_deref()
.unwrap_or(&[])
.iter()
.map(|id| grafeo_common::types::Value::Int64(id.0 as i64))
.collect();
let edges: Vec<grafeo_common::types::Value> = out_row
.path_edges
.as_deref()
.unwrap_or(&[])
.iter()
.map(|id| grafeo_common::types::Value::Int64(id.0 as i64))
.collect();
col.push_value(grafeo_common::types::Value::Path {
nodes: nodes.into(),
edges: edges.into(),
});
}
}
}
chunk.set_count(to_output.len());
Ok(Some(chunk))
}
fn reset(&mut self) {
self.input.reset();
self.input_rows = None;
self.current_input_idx = 0;
self.output_buffer.clear();
self.exhausted = false;
}
fn name(&self) -> &'static str {
"VariableLengthExpand"
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::execution::operators::ScanOperator;
use crate::graph::lpg::LpgStore;
#[test]
fn test_variable_length_expand_chain() {
let store = Arc::new(LpgStore::new().unwrap());
let a = store.create_node(&["Node"]);
let b = store.create_node(&["Node"]);
let c = store.create_node(&["Node"]);
let d = store.create_node(&["Node"]);
store.set_node_property(a, "name", "a".into());
store.set_node_property(b, "name", "b".into());
store.set_node_property(c, "name", "c".into());
store.set_node_property(d, "name", "d".into());
store.create_edge(a, b, "NEXT");
store.create_edge(b, c, "NEXT");
store.create_edge(c, d, "NEXT");
let scan = Box::new(ScanOperator::with_label(
Arc::clone(&store) as Arc<dyn GraphStore>,
"Node",
));
let mut expand = VariableLengthExpandOperator::new(
Arc::clone(&store) as Arc<dyn GraphStore>,
scan,
0,
Direction::Outgoing,
vec!["NEXT".to_string()],
1,
3,
);
let mut results = Vec::new();
while let Ok(Some(chunk)) = expand.next() {
for i in 0..chunk.row_count() {
let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
results.push((src, dst));
}
}
let a_targets: Vec<NodeId> = results
.iter()
.filter(|(s, _)| *s == a)
.map(|(_, t)| *t)
.collect();
assert!(a_targets.contains(&b), "a should reach b");
assert!(a_targets.contains(&c), "a should reach c");
assert!(a_targets.contains(&d), "a should reach d");
assert_eq!(a_targets.len(), 3, "a should reach exactly 3 nodes");
}
#[test]
fn test_variable_length_expand_min_hops() {
let store = Arc::new(LpgStore::new().unwrap());
let a = store.create_node(&["Node"]);
let b = store.create_node(&["Node"]);
let c = store.create_node(&["Node"]);
store.create_edge(a, b, "NEXT");
store.create_edge(b, c, "NEXT");
let scan = Box::new(ScanOperator::with_label(
Arc::clone(&store) as Arc<dyn GraphStore>,
"Node",
));
let mut expand = VariableLengthExpandOperator::new(
Arc::clone(&store) as Arc<dyn GraphStore>,
scan,
0,
Direction::Outgoing,
vec!["NEXT".to_string()],
2, 3, );
let mut results = Vec::new();
while let Ok(Some(chunk)) = expand.next() {
for i in 0..chunk.row_count() {
let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
results.push((src, dst));
}
}
let a_targets: Vec<NodeId> = results
.iter()
.filter(|(s, _)| *s == a)
.map(|(_, t)| *t)
.collect();
assert!(
!a_targets.contains(&b),
"a should NOT reach b with min_hops=2"
);
assert!(a_targets.contains(&c), "a should reach c");
}
#[test]
fn test_variable_length_expand_diamond() {
let store = Arc::new(LpgStore::new().unwrap());
let a = store.create_node(&["Node"]);
let b = store.create_node(&["Node"]);
let c = store.create_node(&["Node"]);
let d = store.create_node(&["Node"]);
store.create_edge(a, b, "EDGE");
store.create_edge(a, c, "EDGE");
store.create_edge(b, d, "EDGE");
store.create_edge(c, d, "EDGE");
let scan = Box::new(ScanOperator::with_label(
Arc::clone(&store) as Arc<dyn GraphStore>,
"Node",
));
let mut expand = VariableLengthExpandOperator::new(
Arc::clone(&store) as Arc<dyn GraphStore>,
scan,
0,
Direction::Outgoing,
vec![],
1,
2,
);
let mut results = Vec::new();
while let Ok(Some(chunk)) = expand.next() {
for i in 0..chunk.row_count() {
let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
results.push((src, dst));
}
}
let a_targets: Vec<NodeId> = results
.iter()
.filter(|(s, _)| *s == a)
.map(|(_, t)| *t)
.collect();
assert!(a_targets.contains(&b));
assert!(a_targets.contains(&c));
assert!(a_targets.contains(&d));
assert_eq!(a_targets.iter().filter(|&&t| t == d).count(), 2);
}
#[test]
fn test_variable_length_expand_no_matching_edges() {
let store = Arc::new(LpgStore::new().unwrap());
let a = store.create_node(&["Node"]);
let b = store.create_node(&["Node"]);
store.create_edge(a, b, "KNOWS");
let scan = Box::new(ScanOperator::with_label(
Arc::clone(&store) as Arc<dyn GraphStore>,
"Node",
));
let mut expand = VariableLengthExpandOperator::new(
Arc::clone(&store) as Arc<dyn GraphStore>,
scan,
0,
Direction::Outgoing,
vec!["LIKES".to_string()],
1,
3,
);
let result = expand.next().unwrap();
assert!(result.is_none());
}
#[test]
fn test_variable_length_expand_single_hop() {
let store = Arc::new(LpgStore::new().unwrap());
let a = store.create_node(&["Node"]);
let b = store.create_node(&["Node"]);
store.create_edge(a, b, "EDGE");
let scan = Box::new(ScanOperator::with_label(
Arc::clone(&store) as Arc<dyn GraphStore>,
"Node",
));
let mut expand = VariableLengthExpandOperator::new(
Arc::clone(&store) as Arc<dyn GraphStore>,
scan,
0,
Direction::Outgoing,
vec![],
1,
1,
);
let mut results = Vec::new();
while let Ok(Some(chunk)) = expand.next() {
for i in 0..chunk.row_count() {
let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
results.push((src, dst));
}
}
let a_results: Vec<_> = results.iter().filter(|(s, _)| *s == a).collect();
assert_eq!(a_results.len(), 1);
assert_eq!(a_results[0].1, b);
}
#[test]
fn test_variable_length_expand_with_path_length() {
let store = Arc::new(LpgStore::new().unwrap());
let a = store.create_node(&["Node"]);
let b = store.create_node(&["Node"]);
let c = store.create_node(&["Node"]);
store.create_edge(a, b, "EDGE");
store.create_edge(b, c, "EDGE");
let scan = Box::new(ScanOperator::with_label(
Arc::clone(&store) as Arc<dyn GraphStore>,
"Node",
));
let mut expand = VariableLengthExpandOperator::new(
Arc::clone(&store) as Arc<dyn GraphStore>,
scan,
0,
Direction::Outgoing,
vec![],
1,
2,
)
.with_path_length_output();
let mut found_path_lengths = false;
while let Ok(Some(chunk)) = expand.next() {
assert!(chunk.column_count() >= 4); found_path_lengths = true;
}
assert!(found_path_lengths);
}
#[test]
fn test_variable_length_expand_reset() {
let store = Arc::new(LpgStore::new().unwrap());
let a = store.create_node(&["Node"]);
let b = store.create_node(&["Node"]);
store.create_edge(a, b, "EDGE");
let scan = Box::new(ScanOperator::with_label(
Arc::clone(&store) as Arc<dyn GraphStore>,
"Node",
));
let mut expand = VariableLengthExpandOperator::new(
Arc::clone(&store) as Arc<dyn GraphStore>,
scan,
0,
Direction::Outgoing,
vec![],
1,
1,
);
let mut count1 = 0;
while let Ok(Some(chunk)) = expand.next() {
count1 += chunk.row_count();
}
expand.reset();
let mut count2 = 0;
while let Ok(Some(chunk)) = expand.next() {
count2 += chunk.row_count();
}
assert_eq!(count1, count2);
}
#[test]
fn test_variable_length_expand_name() {
let store = Arc::new(LpgStore::new().unwrap());
let scan = Box::new(ScanOperator::with_label(
Arc::clone(&store) as Arc<dyn GraphStore>,
"Node",
));
let expand = VariableLengthExpandOperator::new(
Arc::clone(&store) as Arc<dyn GraphStore>,
scan,
0,
Direction::Outgoing,
vec![],
1,
3,
);
assert_eq!(expand.name(), "VariableLengthExpand");
}
#[test]
fn test_variable_length_expand_empty_input() {
let store = Arc::new(LpgStore::new().unwrap());
let scan = Box::new(ScanOperator::with_label(
Arc::clone(&store) as Arc<dyn GraphStore>,
"Nonexistent",
));
let mut expand = VariableLengthExpandOperator::new(
Arc::clone(&store) as Arc<dyn GraphStore>,
scan,
0,
Direction::Outgoing,
vec![],
1,
3,
);
let result = expand.next().unwrap();
assert!(result.is_none());
}
#[test]
fn test_variable_length_expand_with_chunk_capacity() {
let store = Arc::new(LpgStore::new().unwrap());
let center = store.create_node(&["Node"]);
for _ in 0..5 {
let outer = store.create_node(&["Node"]);
store.create_edge(center, outer, "EDGE");
}
let scan = Box::new(ScanOperator::with_label(
Arc::clone(&store) as Arc<dyn GraphStore>,
"Node",
));
let mut expand = VariableLengthExpandOperator::new(
Arc::clone(&store) as Arc<dyn GraphStore>,
scan,
0,
Direction::Outgoing,
vec![],
1,
1,
)
.with_chunk_capacity(2);
let mut total = 0;
let mut chunk_count = 0;
while let Ok(Some(chunk)) = expand.next() {
chunk_count += 1;
total += chunk.row_count();
}
assert_eq!(total, 5);
assert!(chunk_count >= 2);
}
#[test]
fn test_trail_mode_no_repeated_edges() {
let store = Arc::new(LpgStore::new().unwrap());
let a = store.create_node(&["Node"]);
let b = store.create_node(&["Node"]);
store.create_edge(a, b, "EDGE");
store.create_edge(b, a, "EDGE");
let scan = Box::new(ScanOperator::with_label(
Arc::clone(&store) as Arc<dyn GraphStore>,
"Node",
));
let mut expand = VariableLengthExpandOperator::new(
Arc::clone(&store) as Arc<dyn GraphStore>,
scan,
0,
Direction::Outgoing,
vec![],
1,
4,
)
.with_path_mode(PathMode::Trail);
let mut results = Vec::new();
while let Ok(Some(chunk)) = expand.next() {
for i in 0..chunk.row_count() {
let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
results.push((src, dst));
}
}
let a_results: Vec<_> = results.iter().filter(|(s, _)| *s == a).collect();
assert_eq!(a_results.len(), 2, "Trail from a: a->b and a->b->a only");
}
#[test]
fn test_acyclic_mode_no_repeated_nodes() {
let store = Arc::new(LpgStore::new().unwrap());
let a = store.create_node(&["Node"]);
let b = store.create_node(&["Node"]);
store.create_edge(a, b, "EDGE");
store.create_edge(b, a, "EDGE");
let scan = Box::new(ScanOperator::with_label(
Arc::clone(&store) as Arc<dyn GraphStore>,
"Node",
));
let mut expand = VariableLengthExpandOperator::new(
Arc::clone(&store) as Arc<dyn GraphStore>,
scan,
0,
Direction::Outgoing,
vec![],
1,
4,
)
.with_path_mode(PathMode::Acyclic);
let mut results = Vec::new();
while let Ok(Some(chunk)) = expand.next() {
for i in 0..chunk.row_count() {
let src = chunk.column(0).unwrap().get_node_id(i).unwrap();
let dst = chunk.column(2).unwrap().get_node_id(i).unwrap();
results.push((src, dst));
}
}
let a_results: Vec<_> = results.iter().filter(|(s, _)| *s == a).collect();
assert_eq!(a_results.len(), 1, "Acyclic from a: only a->b");
assert_eq!(a_results[0].1, b);
}
}