use crate::error::{Error, Result};
use crate::procedures::{ProcCursor, ProcRow};
use crate::reader::GraphReader;
use crate::value::Value;
use meshdb_core::{Edge, EdgeId, Node, NodeId, Property};
use std::collections::{HashMap, HashSet, VecDeque};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Direction {
Outgoing,
Incoming,
Both,
}
impl Direction {
pub fn allows(self, candidate: Direction) -> bool {
match self {
Direction::Both => true,
_ => self == candidate,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct RelFilter {
by_type: std::collections::HashMap<String, Direction>,
wildcard: Option<Direction>,
}
impl RelFilter {
pub fn parse(s: &str) -> Result<Self> {
let mut filter = RelFilter::default();
let trimmed = s.trim();
if trimmed.is_empty() {
filter.wildcard = Some(Direction::Both);
return Ok(filter);
}
for raw in trimmed.split('|') {
let item = raw.trim();
if item.is_empty() {
continue;
}
let (dir, type_name) = if let Some(rest) = item.strip_prefix('>') {
(Direction::Outgoing, rest.trim())
} else if let Some(rest) = item.strip_prefix('<') {
(Direction::Incoming, rest.trim())
} else if let Some(rest) = item.strip_suffix('>') {
(Direction::Outgoing, rest.trim())
} else if let Some(rest) = item.strip_suffix('<') {
(Direction::Incoming, rest.trim())
} else {
(Direction::Both, item)
};
if type_name.is_empty() {
filter.wildcard = match filter.wildcard {
None => Some(dir),
Some(existing) if existing == dir => Some(existing),
_ => Some(Direction::Both),
};
} else {
let entry = filter.by_type.entry(type_name.to_string()).or_insert(dir);
if *entry != dir {
*entry = Direction::Both;
}
}
}
Ok(filter)
}
pub fn accepts(&self, edge_type: &str, direction: Direction) -> bool {
if let Some(allowed) = self.by_type.get(edge_type) {
allowed.allows(direction)
} else if let Some(wildcard) = self.wildcard {
wildcard.allows(direction)
} else {
false
}
}
pub fn allows_any_outgoing(&self) -> bool {
self.wildcard
.map(|d| d.allows(Direction::Outgoing))
.unwrap_or(false)
|| self.by_type.values().any(|d| d.allows(Direction::Outgoing))
}
pub fn allows_any_incoming(&self) -> bool {
self.wildcard
.map(|d| d.allows(Direction::Incoming))
.unwrap_or(false)
|| self.by_type.values().any(|d| d.allows(Direction::Incoming))
}
}
#[derive(Debug, Clone, Default)]
pub struct LabelFilter {
whitelist: HashSet<String>,
blacklist: HashSet<String>,
end_nodes: HashSet<String>,
terminators: HashSet<String>,
}
impl LabelFilter {
pub fn parse(s: &str) -> Result<Self> {
let mut filter = LabelFilter::default();
let trimmed = s.trim();
if trimmed.is_empty() {
return Ok(filter);
}
for raw in trimmed.split('|') {
let item = raw.trim();
if item.is_empty() {
continue;
}
let (set, label) = match item.as_bytes()[0] {
b'+' => (&mut filter.whitelist, item[1..].trim()),
b'-' => (&mut filter.blacklist, item[1..].trim()),
b'>' => (&mut filter.end_nodes, item[1..].trim()),
b'/' => (&mut filter.terminators, item[1..].trim()),
_ => (&mut filter.whitelist, item),
};
if label.is_empty() {
return Err(Error::Procedure(format!(
"labelFilter item '{item}' has no label after the prefix"
)));
}
set.insert(label.to_string());
}
Ok(filter)
}
#[cfg(test)]
pub fn is_permissive(&self) -> bool {
self.whitelist.is_empty()
&& self.blacklist.is_empty()
&& self.end_nodes.is_empty()
&& self.terminators.is_empty()
}
pub fn is_visitable(&self, labels: &[String]) -> bool {
if labels.iter().any(|l| self.blacklist.contains(l)) {
return false;
}
if self.whitelist.is_empty() {
return true;
}
labels.iter().any(|l| self.whitelist.contains(l))
}
pub fn can_continue(&self, labels: &[String]) -> bool {
!labels.iter().any(|l| self.terminators.contains(l))
}
pub fn is_endpoint(&self, labels: &[String]) -> bool {
if self.end_nodes.is_empty() && self.terminators.is_empty() {
return true;
}
labels
.iter()
.any(|l| self.end_nodes.contains(l) || self.terminators.contains(l))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Uniqueness {
NodeGlobal,
NodeLevel,
NodePath,
RelationshipGlobal,
RelationshipLevel,
RelationshipPath,
None,
}
impl Uniqueness {
pub fn parse(s: &str) -> Result<Self> {
match s.trim().to_ascii_uppercase().as_str() {
"NODE_GLOBAL" => Ok(Self::NodeGlobal),
"NODE_LEVEL" => Ok(Self::NodeLevel),
"NODE_PATH" => Ok(Self::NodePath),
"RELATIONSHIP_GLOBAL" => Ok(Self::RelationshipGlobal),
"RELATIONSHIP_LEVEL" => Ok(Self::RelationshipLevel),
"RELATIONSHIP_PATH" => Ok(Self::RelationshipPath),
"NONE" => Ok(Self::None),
other => Err(Error::Procedure(format!(
"unknown uniqueness mode '{other}' — expected one of \
NODE_GLOBAL / NODE_LEVEL / NODE_PATH / \
RELATIONSHIP_GLOBAL / RELATIONSHIP_LEVEL / \
RELATIONSHIP_PATH / NONE"
))),
}
}
pub fn is_node_scoped(self) -> bool {
matches!(self, Self::NodeGlobal | Self::NodeLevel | Self::NodePath)
}
pub fn is_relationship_scoped(self) -> bool {
matches!(
self,
Self::RelationshipGlobal | Self::RelationshipLevel | Self::RelationshipPath
)
}
}
#[derive(Debug, Clone, Default)]
pub struct UniquenessTracker {
mode: Option<Uniqueness>,
visited_nodes_global: HashSet<NodeId>,
visited_edges_global: HashSet<EdgeId>,
visited_nodes_level: HashSet<NodeId>,
visited_edges_level: HashSet<EdgeId>,
}
impl UniquenessTracker {
pub fn new(mode: Uniqueness) -> Self {
Self {
mode: Some(mode),
..Default::default()
}
}
pub fn advance_level(&mut self) {
self.visited_nodes_level.clear();
self.visited_edges_level.clear();
}
pub fn try_visit_node(&mut self, node: NodeId, path_visited: &mut HashSet<NodeId>) -> bool {
match self.mode {
Some(Uniqueness::NodeGlobal) => self.visited_nodes_global.insert(node),
Some(Uniqueness::NodeLevel) => self.visited_nodes_level.insert(node),
Some(Uniqueness::NodePath) => path_visited.insert(node),
_ => true,
}
}
pub fn try_visit_edge(&mut self, edge: EdgeId, path_visited: &mut HashSet<EdgeId>) -> bool {
match self.mode {
Some(Uniqueness::RelationshipGlobal) => self.visited_edges_global.insert(edge),
Some(Uniqueness::RelationshipLevel) => self.visited_edges_level.insert(edge),
Some(Uniqueness::RelationshipPath) => path_visited.insert(edge),
_ => true,
}
}
}
#[derive(Debug, Clone)]
pub struct ExpandConfig {
pub start_node: NodeId,
pub min_level: i64,
pub max_level: i64,
pub rel_filter: RelFilter,
pub label_filter: LabelFilter,
pub uniqueness: Uniqueness,
pub filter_start_node: bool,
pub limit: Option<usize>,
pub end_nodes: Option<HashSet<NodeId>>,
pub blacklist_nodes: Option<HashSet<NodeId>>,
pub whitelist_nodes: Option<HashSet<NodeId>>,
}
impl ExpandConfig {
pub fn resolve_levels(&self) -> (usize, usize) {
let min = if self.min_level < 0 {
1
} else {
self.min_level as usize
};
let max = if self.max_level < 0 {
usize::MAX
} else {
self.max_level as usize
};
(min, max.max(min))
}
}
pub struct ExpandCursor {
config: ExpandConfig,
tracker: UniquenessTracker,
queue: VecDeque<Frontier>,
current_level_remaining: usize,
next_level_count: usize,
emitted: usize,
output_column: String,
}
struct Frontier {
node_id: NodeId,
path_nodes: Vec<NodeId>,
path_edges: Vec<EdgeId>,
path_visited_nodes: HashSet<NodeId>,
path_visited_edges: HashSet<EdgeId>,
pending_emit: bool,
}
impl ExpandCursor {
pub const OUTPUT_COLUMN: &'static str = "path";
pub fn new(config: ExpandConfig) -> Self {
let tracker = UniquenessTracker::new(config.uniqueness);
let queue = VecDeque::new();
Self {
config,
tracker,
queue,
current_level_remaining: 0,
next_level_count: 0,
emitted: 0,
output_column: Self::OUTPUT_COLUMN.to_string(),
}
}
fn seed_if_empty(&mut self, reader: &dyn GraphReader) -> Result<()> {
if self.emitted == 0 && self.queue.is_empty() {
let start = self.config.start_node;
let mut path_visited_nodes: HashSet<NodeId> = HashSet::new();
let _admitted = self.tracker.try_visit_node(start, &mut path_visited_nodes);
self.queue.push_back(Frontier {
node_id: start,
path_nodes: vec![start],
path_edges: Vec::new(),
path_visited_nodes,
path_visited_edges: HashSet::new(),
pending_emit: self.should_emit_start(reader)?,
});
self.current_level_remaining = 1;
self.next_level_count = 0;
}
Ok(())
}
fn should_emit_start(&self, reader: &dyn GraphReader) -> Result<bool> {
let (min_level, _) = self.config.resolve_levels();
if min_level > 0 {
return Ok(false);
}
let node = match reader.get_node(self.config.start_node)? {
Some(n) => n,
None => return Ok(false),
};
if self.config.filter_start_node {
if !self.config.label_filter.is_visitable(&node.labels) {
return Ok(false);
}
if let Some(wl) = &self.config.whitelist_nodes {
if !wl.contains(&self.config.start_node) {
return Ok(false);
}
}
if let Some(bl) = &self.config.blacklist_nodes {
if bl.contains(&self.config.start_node) {
return Ok(false);
}
}
}
if !self.config.label_filter.is_endpoint(&node.labels) {
return Ok(false);
}
if let Some(endset) = &self.config.end_nodes {
if !endset.contains(&self.config.start_node) {
return Ok(false);
}
}
Ok(true)
}
fn expand(&mut self, entry_snapshot: Frontier, reader: &dyn GraphReader) -> Result<usize> {
let node = match reader.get_node(entry_snapshot.node_id)? {
Some(n) => n,
None => return Ok(0),
};
if !self.config.label_filter.can_continue(&node.labels) {
return Ok(0);
}
let new_level = entry_snapshot.path_nodes.len(); let (_, max_level) = self.config.resolve_levels();
if new_level > max_level {
return Ok(0);
}
let mut pushed = 0usize;
if self.config.rel_filter.allows_any_outgoing() {
for (edge_id, neighbor_id) in reader.outgoing(entry_snapshot.node_id)? {
if self.try_push(
&entry_snapshot,
edge_id,
neighbor_id,
Direction::Outgoing,
reader,
)? {
pushed += 1;
}
}
}
if self.config.rel_filter.allows_any_incoming() {
for (edge_id, neighbor_id) in reader.incoming(entry_snapshot.node_id)? {
if self.try_push(
&entry_snapshot,
edge_id,
neighbor_id,
Direction::Incoming,
reader,
)? {
pushed += 1;
}
}
}
Ok(pushed)
}
fn try_push(
&mut self,
entry: &Frontier,
edge_id: EdgeId,
neighbor_id: NodeId,
direction: Direction,
reader: &dyn GraphReader,
) -> Result<bool> {
let edge = match reader.get_edge(edge_id)? {
Some(e) => e,
None => return Ok(false),
};
if !self.config.rel_filter.accepts(&edge.edge_type, direction) {
return Ok(false);
}
if let Some(bl) = &self.config.blacklist_nodes {
if bl.contains(&neighbor_id) {
return Ok(false);
}
}
if let Some(wl) = &self.config.whitelist_nodes {
if !wl.contains(&neighbor_id) {
return Ok(false);
}
}
let neighbor = match reader.get_node(neighbor_id)? {
Some(n) => n,
None => return Ok(false),
};
if !self.config.label_filter.is_visitable(&neighbor.labels) {
return Ok(false);
}
let mut new_path_nodes_set = entry.path_visited_nodes.clone();
let mut new_path_edges_set = entry.path_visited_edges.clone();
if self.config.uniqueness.is_node_scoped()
&& !self
.tracker
.try_visit_node(neighbor_id, &mut new_path_nodes_set)
{
return Ok(false);
}
if self.config.uniqueness.is_relationship_scoped()
&& !self
.tracker
.try_visit_edge(edge_id, &mut new_path_edges_set)
{
return Ok(false);
}
let mut new_nodes = entry.path_nodes.clone();
new_nodes.push(neighbor_id);
let mut new_edges = entry.path_edges.clone();
new_edges.push(edge_id);
let new_level = new_nodes.len() - 1; let (min_level, max_level) = self.config.resolve_levels();
let within_bounds = new_level >= min_level && new_level <= max_level;
let is_endpoint = self.config.label_filter.is_endpoint(&neighbor.labels);
let passes_end_nodes = self
.config
.end_nodes
.as_ref()
.map(|s| s.contains(&neighbor_id))
.unwrap_or(true);
let should_emit = within_bounds && is_endpoint && passes_end_nodes;
self.queue.push_back(Frontier {
node_id: neighbor_id,
path_nodes: new_nodes,
path_edges: new_edges,
path_visited_nodes: new_path_nodes_set,
path_visited_edges: new_path_edges_set,
pending_emit: should_emit,
});
self.next_level_count += 1;
Ok(true)
}
fn materialize(&self, entry: &Frontier, reader: &dyn GraphReader) -> Result<Value> {
let mut nodes: Vec<Node> = Vec::with_capacity(entry.path_nodes.len());
for nid in &entry.path_nodes {
match reader.get_node(*nid)? {
Some(n) => nodes.push(n),
None => {
return Err(Error::Procedure(format!(
"apoc.path.expand: node {nid:?} vanished during traversal"
)))
}
}
}
let mut edges: Vec<Edge> = Vec::with_capacity(entry.path_edges.len());
for eid in &entry.path_edges {
match reader.get_edge(*eid)? {
Some(e) => edges.push(e),
None => {
return Err(Error::Procedure(format!(
"apoc.path.expand: edge {eid:?} vanished during traversal"
)))
}
}
}
Ok(Value::Path { nodes, edges })
}
}
impl ProcCursor for ExpandCursor {
fn advance(&mut self, reader: &dyn GraphReader) -> Result<Option<ProcRow>> {
self.seed_if_empty(reader)?;
if let Some(limit) = self.config.limit {
if self.emitted >= limit {
return Ok(None);
}
}
loop {
if let Some(head) = self.queue.front_mut() {
if head.pending_emit {
head.pending_emit = false;
let nodes = head.path_nodes.clone();
let edges = head.path_edges.clone();
let frontier_snap = Frontier {
node_id: head.node_id,
path_nodes: nodes,
path_edges: edges,
path_visited_nodes: HashSet::new(),
path_visited_edges: HashSet::new(),
pending_emit: false,
};
let path = self.materialize(&frontier_snap, reader)?;
let mut row: ProcRow = HashMap::new();
row.insert(self.output_column.clone(), path);
self.emitted += 1;
return Ok(Some(row));
}
} else {
return Ok(None);
}
let entry = self.queue.pop_front().expect("queue non-empty above");
self.current_level_remaining = self.current_level_remaining.saturating_sub(1);
let _ = self.expand(entry, reader)?;
if self.current_level_remaining == 0 {
self.current_level_remaining = std::mem::replace(&mut self.next_level_count, 0);
self.tracker.advance_level();
}
if let Some(limit) = self.config.limit {
if self.emitted >= limit {
return Ok(None);
}
}
}
}
}
pub struct SubgraphNodesCursor {
inner: ExpandCursor,
}
impl SubgraphNodesCursor {
pub const OUTPUT_COLUMN: &'static str = "node";
pub fn new(mut config: ExpandConfig) -> Self {
config.min_level = 0;
config.uniqueness = Uniqueness::NodeGlobal;
Self {
inner: ExpandCursor::new(config),
}
}
}
impl ProcCursor for SubgraphNodesCursor {
fn advance(&mut self, reader: &dyn GraphReader) -> Result<Option<ProcRow>> {
loop {
match self.inner.advance(reader)? {
Some(mut row) => match row.remove(ExpandCursor::OUTPUT_COLUMN) {
Some(Value::Path { nodes, .. }) => {
let last = match nodes.last() {
Some(n) => n.clone(),
None => continue,
};
let mut out: ProcRow = HashMap::new();
out.insert(Self::OUTPUT_COLUMN.to_string(), Value::Node(last));
return Ok(Some(out));
}
_ => continue,
},
None => return Ok(None),
}
}
}
}
pub struct SpanningTreeCursor(ExpandCursor);
impl SpanningTreeCursor {
pub fn new(mut config: ExpandConfig) -> Self {
config.min_level = 0;
config.uniqueness = Uniqueness::NodeGlobal;
Self(ExpandCursor::new(config))
}
}
impl ProcCursor for SpanningTreeCursor {
fn advance(&mut self, reader: &dyn GraphReader) -> Result<Option<ProcRow>> {
self.0.advance(reader)
}
}
pub struct SubgraphAllCursor {
inner: ExpandCursor,
drained: bool,
emitted: bool,
nodes: Vec<Node>,
edges: Vec<Edge>,
seen_nodes: HashSet<NodeId>,
seen_edges: HashSet<EdgeId>,
}
impl SubgraphAllCursor {
pub const NODES_COLUMN: &'static str = "nodes";
pub const RELATIONSHIPS_COLUMN: &'static str = "relationships";
pub fn new(mut config: ExpandConfig) -> Self {
config.min_level = 0;
config.uniqueness = Uniqueness::NodeGlobal;
Self {
inner: ExpandCursor::new(config),
drained: false,
emitted: false,
nodes: Vec::new(),
edges: Vec::new(),
seen_nodes: HashSet::new(),
seen_edges: HashSet::new(),
}
}
fn drain(&mut self, reader: &dyn GraphReader) -> Result<()> {
while let Some(mut row) = self.inner.advance(reader)? {
let path = row.remove(ExpandCursor::OUTPUT_COLUMN);
if let Some(Value::Path { nodes, edges }) = path {
for n in nodes {
if self.seen_nodes.insert(n.id) {
self.nodes.push(n);
}
}
for e in edges {
if self.seen_edges.insert(e.id) {
self.edges.push(e);
}
}
}
}
self.drained = true;
Ok(())
}
}
impl ProcCursor for SubgraphAllCursor {
fn advance(&mut self, reader: &dyn GraphReader) -> Result<Option<ProcRow>> {
if self.emitted {
return Ok(None);
}
if !self.drained {
self.drain(reader)?;
}
let nodes = std::mem::take(&mut self.nodes);
let edges = std::mem::take(&mut self.edges);
let nodes_value = Value::List(nodes.into_iter().map(Value::Node).collect());
let edges_value = Value::List(edges.into_iter().map(Value::Edge).collect());
let mut out: ProcRow = HashMap::new();
out.insert(Self::NODES_COLUMN.to_string(), nodes_value);
out.insert(Self::RELATIONSHIPS_COLUMN.to_string(), edges_value);
self.emitted = true;
Ok(Some(out))
}
}
pub fn expect_start_node(v: &Value, position: &str) -> Result<NodeId> {
match v {
Value::Node(n) => Ok(n.id),
Value::Null | Value::Property(Property::Null) => Err(Error::Procedure(format!(
"apoc.path.*: {position} must be a node, got null"
))),
other => Err(Error::Procedure(format!(
"apoc.path.*: {position} must be a node, got {other:?}"
))),
}
}
pub fn expect_int_or_default(v: &Value, default: i64, position: &str) -> Result<i64> {
match v {
Value::Null | Value::Property(Property::Null) => Ok(default),
Value::Property(Property::Int64(n)) => Ok(*n),
other => Err(Error::Procedure(format!(
"apoc.path.*: {position} must be an integer, got {other:?}"
))),
}
}
pub fn expect_string_or_default(v: &Value, default: &str, position: &str) -> Result<String> {
match v {
Value::Null | Value::Property(Property::Null) => Ok(default.to_string()),
Value::Property(Property::String(s)) => Ok(s.clone()),
other => Err(Error::Procedure(format!(
"apoc.path.*: {position} must be a string, got {other:?}"
))),
}
}
pub fn config_from_expand_args(args: &[Value]) -> Result<ExpandConfig> {
if args.len() != 5 {
return Err(Error::Procedure(format!(
"apoc.path.expand expects 5 arguments, got {}",
args.len()
)));
}
let start_node = expect_start_node(&args[0], "first argument (startNode)")?;
let rel_filter_str =
expect_string_or_default(&args[1], "", "second argument (relationshipFilter)")?;
let label_filter_str = expect_string_or_default(&args[2], "", "third argument (labelFilter)")?;
let min_level = expect_int_or_default(&args[3], -1, "fourth argument (minLevel)")?;
let max_level = expect_int_or_default(&args[4], -1, "fifth argument (maxLevel)")?;
Ok(ExpandConfig {
start_node,
min_level,
max_level,
rel_filter: RelFilter::parse(&rel_filter_str)?,
label_filter: LabelFilter::parse(&label_filter_str)?,
uniqueness: Uniqueness::NodeGlobal,
filter_start_node: false,
limit: None,
end_nodes: None,
blacklist_nodes: None,
whitelist_nodes: None,
})
}
pub fn config_from_expand_config_args(args: &[Value]) -> Result<ExpandConfig> {
if args.len() != 2 {
return Err(Error::Procedure(format!(
"apoc.path.expandConfig expects 2 arguments, got {}",
args.len()
)));
}
let start_node = expect_start_node(&args[0], "first argument (startNode)")?;
let config_map = match &args[1] {
Value::Map(m) => map_from_value_map(m)?,
Value::Property(Property::Map(m)) => m.clone(),
Value::Null | Value::Property(Property::Null) => HashMap::new(),
other => {
return Err(Error::Procedure(format!(
"apoc.path.expandConfig: second argument must be a map, got {other:?}"
)));
}
};
let get_str = |key: &str, default: &str| -> Result<String> {
match config_map.get(key) {
Some(Property::String(s)) => Ok(s.clone()),
Some(Property::Null) | None => Ok(default.to_string()),
Some(other) => Err(Error::Procedure(format!(
"apoc.path.expandConfig: config['{key}'] must be a string, got {other:?}"
))),
}
};
let get_int = |key: &str, default: i64| -> Result<i64> {
match config_map.get(key) {
Some(Property::Int64(n)) => Ok(*n),
Some(Property::Null) | None => Ok(default),
Some(other) => Err(Error::Procedure(format!(
"apoc.path.expandConfig: config['{key}'] must be an integer, got {other:?}"
))),
}
};
let get_bool = |key: &str, default: bool| -> Result<bool> {
match config_map.get(key) {
Some(Property::Bool(b)) => Ok(*b),
Some(Property::Null) | None => Ok(default),
Some(other) => Err(Error::Procedure(format!(
"apoc.path.expandConfig: config['{key}'] must be a boolean, got {other:?}"
))),
}
};
let min_level = get_int("minLevel", -1)?;
let max_level = get_int("maxLevel", -1)?;
let rel_str = get_str("relationshipFilter", "")?;
let label_str = get_str("labelFilter", "")?;
let uniq_str = get_str("uniqueness", "NODE_GLOBAL")?;
let filter_start_node = get_bool("filterStartNode", false)?;
let limit_raw = get_int("limit", -1)?;
let limit = if limit_raw < 0 {
None
} else {
Some(limit_raw as usize)
};
let (end_nodes, blacklist_nodes, whitelist_nodes) = match &args[1] {
Value::Map(m) => (
extract_node_id_set(m.get("endNodes"), "endNodes")?,
extract_node_id_set(m.get("blacklistNodes"), "blacklistNodes")?,
extract_node_id_set(m.get("whitelistNodes"), "whitelistNodes")?,
),
_ => (None, None, None),
};
Ok(ExpandConfig {
start_node,
min_level,
max_level,
rel_filter: RelFilter::parse(&rel_str)?,
label_filter: LabelFilter::parse(&label_str)?,
uniqueness: Uniqueness::parse(&uniq_str)?,
filter_start_node,
limit,
end_nodes,
blacklist_nodes,
whitelist_nodes,
})
}
fn map_from_value_map(m: &HashMap<String, Value>) -> Result<HashMap<String, Property>> {
let mut out = HashMap::with_capacity(m.len());
for (k, v) in m {
match v {
Value::Property(p) => {
out.insert(k.clone(), p.clone());
}
_ => {}
}
}
Ok(out)
}
fn extract_node_id_set(v: Option<&Value>, key: &str) -> Result<Option<HashSet<NodeId>>> {
match v {
None | Some(Value::Null | Value::Property(Property::Null)) => Ok(None),
Some(Value::Node(n)) => {
let mut s = HashSet::new();
s.insert(n.id);
Ok(Some(s))
}
Some(Value::List(items)) => {
let mut s = HashSet::new();
for item in items {
match item {
Value::Node(n) => {
s.insert(n.id);
}
other => {
return Err(Error::Procedure(format!(
"apoc.path.expandConfig: config['{key}'] must contain nodes, got {other:?}"
)));
}
}
}
Ok(Some(s))
}
Some(other) => Err(Error::Procedure(format!(
"apoc.path.expandConfig: config['{key}'] must be a list of nodes, got {other:?}"
))),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn rel_filter_empty_is_permissive() {
let f = RelFilter::parse("").unwrap();
assert!(f.accepts("KNOWS", Direction::Outgoing));
assert!(f.accepts("KNOWS", Direction::Incoming));
assert!(f.accepts("ANY", Direction::Outgoing));
}
#[test]
fn rel_filter_prefix_direction() {
let f = RelFilter::parse(">KNOWS").unwrap();
assert!(f.accepts("KNOWS", Direction::Outgoing));
assert!(!f.accepts("KNOWS", Direction::Incoming));
assert!(!f.accepts("LIKES", Direction::Outgoing));
}
#[test]
fn rel_filter_suffix_direction() {
let f = RelFilter::parse("KNOWS>").unwrap();
assert!(f.accepts("KNOWS", Direction::Outgoing));
assert!(!f.accepts("KNOWS", Direction::Incoming));
}
#[test]
fn rel_filter_mixed_list() {
let f = RelFilter::parse(">KNOWS|<LIKES|FRIENDS").unwrap();
assert!(f.accepts("KNOWS", Direction::Outgoing));
assert!(!f.accepts("KNOWS", Direction::Incoming));
assert!(f.accepts("LIKES", Direction::Incoming));
assert!(!f.accepts("LIKES", Direction::Outgoing));
assert!(f.accepts("FRIENDS", Direction::Outgoing));
assert!(f.accepts("FRIENDS", Direction::Incoming));
assert!(!f.accepts("OTHER", Direction::Outgoing));
}
#[test]
fn rel_filter_bare_direction_wildcard() {
let f = RelFilter::parse(">").unwrap();
assert!(f.accepts("KNOWS", Direction::Outgoing));
assert!(f.accepts("LIKES", Direction::Outgoing));
assert!(!f.accepts("KNOWS", Direction::Incoming));
}
#[test]
fn rel_filter_same_type_both_directions_widens() {
let f = RelFilter::parse("KNOWS>|KNOWS<").unwrap();
assert!(f.accepts("KNOWS", Direction::Outgoing));
assert!(f.accepts("KNOWS", Direction::Incoming));
}
#[test]
fn label_filter_empty_permissive() {
let f = LabelFilter::parse("").unwrap();
assert!(f.is_permissive());
assert!(f.is_visitable(&["A".into()]));
assert!(f.is_endpoint(&["A".into()]));
assert!(f.can_continue(&["A".into()]));
}
#[test]
fn label_filter_blacklist_rejects() {
let f = LabelFilter::parse("-Secret").unwrap();
assert!(!f.is_visitable(&["Secret".into()]));
assert!(f.is_visitable(&["Public".into()]));
}
#[test]
fn label_filter_whitelist_admits_only_matching() {
let f = LabelFilter::parse("+Person").unwrap();
assert!(f.is_visitable(&["Person".into()]));
assert!(!f.is_visitable(&["Company".into()]));
}
#[test]
fn label_filter_bare_is_whitelist() {
let f = LabelFilter::parse("Person").unwrap();
assert!(f.is_visitable(&["Person".into()]));
assert!(!f.is_visitable(&["Company".into()]));
}
#[test]
fn label_filter_end_node_marker() {
let f = LabelFilter::parse(">Admin").unwrap();
assert!(f.is_visitable(&["Admin".into()]));
assert!(f.is_visitable(&["User".into()]));
assert!(f.is_endpoint(&["Admin".into()]));
assert!(!f.is_endpoint(&["User".into()]));
assert!(f.can_continue(&["Admin".into()]));
}
#[test]
fn label_filter_terminator_stops_expansion() {
let f = LabelFilter::parse("/Leaf").unwrap();
assert!(f.is_visitable(&["Leaf".into()]));
assert!(f.is_visitable(&["Other".into()]));
assert!(f.is_endpoint(&["Leaf".into()]));
assert!(!f.is_endpoint(&["Other".into()]));
assert!(!f.can_continue(&["Leaf".into()]));
assert!(f.can_continue(&["Other".into()]));
}
#[test]
fn label_filter_mixed() {
let f = LabelFilter::parse("+Person|-Spam|>Admin|/Sink").unwrap();
assert!(f.is_visitable(&["Person".into()]));
assert!(!f.is_visitable(&["Admin".into()]));
assert!(!f.is_visitable(&["Sink".into()]));
assert!(!f.is_visitable(&["Person".into(), "Spam".into()]));
assert!(!f.is_visitable(&["Other".into()]));
assert!(f.can_continue(&["Person".into()]));
assert!(!f.can_continue(&["Person".into(), "Sink".into()]));
assert!(!f.is_endpoint(&["Person".into()]));
assert!(f.is_endpoint(&["Admin".into()]));
assert!(f.is_endpoint(&["Sink".into()]));
}
#[test]
fn uniqueness_parse_roundtrip() {
assert_eq!(
Uniqueness::parse("NODE_GLOBAL").unwrap(),
Uniqueness::NodeGlobal
);
assert_eq!(
Uniqueness::parse("relationship_path").unwrap(),
Uniqueness::RelationshipPath
);
assert_eq!(Uniqueness::parse("NONE").unwrap(), Uniqueness::None);
assert!(Uniqueness::parse("bogus").is_err());
}
#[test]
fn uniqueness_tracker_node_global() {
let mut t = UniquenessTracker::new(Uniqueness::NodeGlobal);
let a = NodeId::new();
let mut path = HashSet::new();
assert!(t.try_visit_node(a, &mut path));
assert!(!t.try_visit_node(a, &mut path));
}
#[test]
fn uniqueness_tracker_node_level_resets() {
let mut t = UniquenessTracker::new(Uniqueness::NodeLevel);
let a = NodeId::new();
let mut path = HashSet::new();
assert!(t.try_visit_node(a, &mut path));
assert!(!t.try_visit_node(a, &mut path));
t.advance_level();
assert!(t.try_visit_node(a, &mut path));
}
#[test]
fn uniqueness_tracker_node_path_scoped_to_path() {
let mut t = UniquenessTracker::new(Uniqueness::NodePath);
let a = NodeId::new();
let mut path_one = HashSet::new();
let mut path_two = HashSet::new();
assert!(t.try_visit_node(a, &mut path_one));
assert!(!t.try_visit_node(a, &mut path_one));
assert!(t.try_visit_node(a, &mut path_two));
}
#[test]
fn uniqueness_tracker_none_admits_everything() {
let mut t = UniquenessTracker::new(Uniqueness::None);
let a = NodeId::new();
let mut path = HashSet::new();
assert!(t.try_visit_node(a, &mut path));
assert!(t.try_visit_node(a, &mut path));
}
}