use crate::traversal::context::ExecutionContext;
use crate::traversal::step::Step;
use crate::traversal::Traverser;
use crate::value::Value;
#[derive(Clone, Debug)]
pub struct OutStep {
labels: Vec<String>,
}
impl OutStep {
pub fn new() -> Self {
Self { labels: Vec::new() }
}
pub fn with_labels(labels: Vec<String>) -> Self {
Self { labels }
}
}
impl Default for OutStep {
fn default() -> Self {
Self::new()
}
}
impl Step for OutStep {
type Iter<'a>
= Box<dyn Iterator<Item = Traverser> + 'a>
where
Self: 'a;
fn apply<'a>(
&'a self,
ctx: &'a ExecutionContext<'a>,
input: Box<dyn Iterator<Item = Traverser> + 'a>,
) -> Self::Iter<'a> {
let label_ids: Vec<u32> = if self.labels.is_empty() {
Vec::new()
} else {
ctx.resolve_labels(&self.labels.iter().map(|s| s.as_str()).collect::<Vec<_>>())
};
if !self.labels.is_empty() && label_ids.is_empty() {
return Box::new(std::iter::empty());
}
let track_paths = ctx.is_tracking_paths();
Box::new(input.flat_map(move |t| {
let vertex_id = match t.as_vertex_id() {
Some(id) => id,
None => return Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Traverser>>,
};
let label_ids = label_ids.clone();
Box::new(ctx.storage().out_edges(vertex_id).filter_map(move |edge| {
if !label_ids.is_empty() {
let edge_label_id = ctx.interner().lookup(&edge.label)?;
if !label_ids.contains(&edge_label_id) {
return None;
}
}
let mut new_t = t.split(Value::Vertex(edge.dst));
if track_paths {
new_t.extend_path_unlabeled();
}
Some(new_t)
})) as Box<dyn Iterator<Item = Traverser>>
}))
}
fn name(&self) -> &'static str {
"out"
}
fn category(&self) -> crate::traversal::explain::StepCategory {
crate::traversal::explain::StepCategory::Navigation
}
fn describe(&self) -> Option<String> {
if self.labels.is_empty() {
None
} else {
Some(self.labels.iter().map(|l| format!("\"{l}\"")).collect::<Vec<_>>().join(", "))
}
}
fn apply_streaming(
&self,
ctx: crate::traversal::context::StreamingContext,
input: Traverser,
) -> Box<dyn Iterator<Item = Traverser> + Send + 'static> {
let vertex_id = match input.as_vertex_id() {
Some(id) => id,
None => return Box::new(std::iter::empty()),
};
let label_ids: Vec<u32> = if self.labels.is_empty() {
Vec::new()
} else {
self.labels
.iter()
.filter_map(|label| ctx.interner().lookup(label))
.collect()
};
if !self.labels.is_empty() && label_ids.is_empty() {
return Box::new(std::iter::empty());
}
let track_paths = ctx.is_tracking_paths();
let neighbors = ctx
.streamable_storage()
.stream_out_neighbors(vertex_id, &label_ids);
Box::new(neighbors.map(move |dst| {
let mut new_t = input.split(Value::Vertex(dst));
if track_paths {
new_t.extend_path_unlabeled();
}
new_t
}))
}
}
#[derive(Clone, Debug)]
pub struct InStep {
labels: Vec<String>,
}
impl InStep {
pub fn new() -> Self {
Self { labels: Vec::new() }
}
pub fn with_labels(labels: Vec<String>) -> Self {
Self { labels }
}
}
impl Default for InStep {
fn default() -> Self {
Self::new()
}
}
impl Step for InStep {
type Iter<'a>
= Box<dyn Iterator<Item = Traverser> + 'a>
where
Self: 'a;
fn apply<'a>(
&'a self,
ctx: &'a ExecutionContext<'a>,
input: Box<dyn Iterator<Item = Traverser> + 'a>,
) -> Self::Iter<'a> {
let label_ids: Vec<u32> = if self.labels.is_empty() {
Vec::new()
} else {
ctx.resolve_labels(&self.labels.iter().map(|s| s.as_str()).collect::<Vec<_>>())
};
if !self.labels.is_empty() && label_ids.is_empty() {
return Box::new(std::iter::empty());
}
let track_paths = ctx.is_tracking_paths();
Box::new(input.flat_map(move |t| {
let vertex_id = match t.as_vertex_id() {
Some(id) => id,
None => return Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Traverser>>,
};
let label_ids = label_ids.clone();
Box::new(ctx.storage().in_edges(vertex_id).filter_map(move |edge| {
if !label_ids.is_empty() {
let edge_label_id = ctx.interner().lookup(&edge.label)?;
if !label_ids.contains(&edge_label_id) {
return None;
}
}
let mut new_t = t.split(Value::Vertex(edge.src));
if track_paths {
new_t.extend_path_unlabeled();
}
Some(new_t)
})) as Box<dyn Iterator<Item = Traverser>>
}))
}
fn name(&self) -> &'static str {
"in"
}
fn category(&self) -> crate::traversal::explain::StepCategory {
crate::traversal::explain::StepCategory::Navigation
}
fn describe(&self) -> Option<String> {
if self.labels.is_empty() {
None
} else {
Some(self.labels.iter().map(|l| format!("\"{l}\"")).collect::<Vec<_>>().join(", "))
}
}
fn apply_streaming(
&self,
ctx: crate::traversal::context::StreamingContext,
input: Traverser,
) -> Box<dyn Iterator<Item = Traverser> + Send + 'static> {
let vertex_id = match input.as_vertex_id() {
Some(id) => id,
None => return Box::new(std::iter::empty()),
};
let label_ids: Vec<u32> = if self.labels.is_empty() {
Vec::new()
} else {
self.labels
.iter()
.filter_map(|label| ctx.interner().lookup(label))
.collect()
};
if !self.labels.is_empty() && label_ids.is_empty() {
return Box::new(std::iter::empty());
}
let track_paths = ctx.is_tracking_paths();
let neighbors = ctx
.streamable_storage()
.stream_in_neighbors(vertex_id, &label_ids);
Box::new(neighbors.map(move |src| {
let mut new_t = input.split(Value::Vertex(src));
if track_paths {
new_t.extend_path_unlabeled();
}
new_t
}))
}
}
#[derive(Clone, Debug)]
pub struct BothStep {
labels: Vec<String>,
}
impl BothStep {
pub fn new() -> Self {
Self { labels: Vec::new() }
}
pub fn with_labels(labels: Vec<String>) -> Self {
Self { labels }
}
}
impl Default for BothStep {
fn default() -> Self {
Self::new()
}
}
impl Step for BothStep {
type Iter<'a>
= Box<dyn Iterator<Item = Traverser> + 'a>
where
Self: 'a;
fn apply<'a>(
&'a self,
ctx: &'a ExecutionContext<'a>,
input: Box<dyn Iterator<Item = Traverser> + 'a>,
) -> Self::Iter<'a> {
let label_ids: Vec<u32> = if self.labels.is_empty() {
Vec::new()
} else {
ctx.resolve_labels(&self.labels.iter().map(|s| s.as_str()).collect::<Vec<_>>())
};
if !self.labels.is_empty() && label_ids.is_empty() {
return Box::new(std::iter::empty());
}
let track_paths = ctx.is_tracking_paths();
Box::new(input.flat_map(move |t| {
let vertex_id = match t.as_vertex_id() {
Some(id) => id,
None => return Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Traverser>>,
};
let label_ids_out = label_ids.clone();
let label_ids_in = label_ids.clone();
let t_for_in = t.clone();
let out_iter = ctx.storage().out_edges(vertex_id).filter_map(move |edge| {
if !label_ids_out.is_empty() {
let edge_label_id = ctx.interner().lookup(&edge.label)?;
if !label_ids_out.contains(&edge_label_id) {
return None;
}
}
let mut new_t = t.split(Value::Vertex(edge.dst));
if track_paths {
new_t.extend_path_unlabeled();
}
Some(new_t)
});
let in_iter = ctx.storage().in_edges(vertex_id).filter_map(move |edge| {
if !label_ids_in.is_empty() {
let edge_label_id = ctx.interner().lookup(&edge.label)?;
if !label_ids_in.contains(&edge_label_id) {
return None;
}
}
let mut new_t = t_for_in.split(Value::Vertex(edge.src));
if track_paths {
new_t.extend_path_unlabeled();
}
Some(new_t)
});
Box::new(out_iter.chain(in_iter)) as Box<dyn Iterator<Item = Traverser>>
}))
}
fn name(&self) -> &'static str {
"both"
}
fn category(&self) -> crate::traversal::explain::StepCategory {
crate::traversal::explain::StepCategory::Navigation
}
fn describe(&self) -> Option<String> {
if self.labels.is_empty() {
None
} else {
Some(self.labels.iter().map(|l| format!("\"{l}\"")).collect::<Vec<_>>().join(", "))
}
}
fn apply_streaming(
&self,
ctx: crate::traversal::context::StreamingContext,
input: Traverser,
) -> Box<dyn Iterator<Item = Traverser> + Send + 'static> {
let vertex_id = match input.as_vertex_id() {
Some(id) => id,
None => return Box::new(std::iter::empty()),
};
let label_ids: Vec<u32> = if self.labels.is_empty() {
Vec::new()
} else {
self.labels
.iter()
.filter_map(|label| ctx.interner().lookup(label))
.collect()
};
if !self.labels.is_empty() && label_ids.is_empty() {
return Box::new(std::iter::empty());
}
let track_paths = ctx.is_tracking_paths();
let neighbors = ctx
.streamable_storage()
.stream_both_neighbors(vertex_id, &label_ids);
Box::new(neighbors.map(move |neighbor| {
let mut new_t = input.split(Value::Vertex(neighbor));
if track_paths {
new_t.extend_path_unlabeled();
}
new_t
}))
}
}
#[derive(Clone, Debug)]
pub struct OutEStep {
labels: Vec<String>,
}
impl OutEStep {
pub fn new() -> Self {
Self { labels: Vec::new() }
}
pub fn with_labels(labels: Vec<String>) -> Self {
Self { labels }
}
}
impl Default for OutEStep {
fn default() -> Self {
Self::new()
}
}
impl Step for OutEStep {
type Iter<'a>
= Box<dyn Iterator<Item = Traverser> + 'a>
where
Self: 'a;
fn apply<'a>(
&'a self,
ctx: &'a ExecutionContext<'a>,
input: Box<dyn Iterator<Item = Traverser> + 'a>,
) -> Self::Iter<'a> {
let label_ids: Vec<u32> = if self.labels.is_empty() {
Vec::new()
} else {
ctx.resolve_labels(&self.labels.iter().map(|s| s.as_str()).collect::<Vec<_>>())
};
if !self.labels.is_empty() && label_ids.is_empty() {
return Box::new(std::iter::empty());
}
let track_paths = ctx.is_tracking_paths();
Box::new(input.flat_map(move |t| {
let vertex_id = match t.as_vertex_id() {
Some(id) => id,
None => return Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Traverser>>,
};
let label_ids = label_ids.clone();
Box::new(ctx.storage().out_edges(vertex_id).filter_map(move |edge| {
if !label_ids.is_empty() {
let edge_label_id = ctx.interner().lookup(&edge.label)?;
if !label_ids.contains(&edge_label_id) {
return None;
}
}
let mut new_t = t.split(Value::Edge(edge.id));
if track_paths {
new_t.extend_path_unlabeled();
}
Some(new_t)
})) as Box<dyn Iterator<Item = Traverser>>
}))
}
fn name(&self) -> &'static str {
"outE"
}
fn category(&self) -> crate::traversal::explain::StepCategory {
crate::traversal::explain::StepCategory::Navigation
}
fn describe(&self) -> Option<String> {
if self.labels.is_empty() {
None
} else {
Some(self.labels.iter().map(|l| format!("\"{l}\"")).collect::<Vec<_>>().join(", "))
}
}
fn apply_streaming(
&self,
ctx: crate::traversal::context::StreamingContext,
input: Traverser,
) -> Box<dyn Iterator<Item = Traverser> + Send + 'static> {
let vertex_id = match input.as_vertex_id() {
Some(id) => id,
None => return Box::new(std::iter::empty()),
};
let label_ids: Vec<u32> = if self.labels.is_empty() {
Vec::new()
} else {
self.labels
.iter()
.filter_map(|label| ctx.interner().lookup(label))
.collect()
};
if !self.labels.is_empty() && label_ids.is_empty() {
return Box::new(std::iter::empty());
}
let track_paths = ctx.is_tracking_paths();
let edge_ids = ctx.streamable_storage().stream_out_edges(vertex_id);
let storage = ctx.arc_streamable();
Box::new(edge_ids.filter_map(move |edge_id| {
if !label_ids.is_empty() {
if let Some(edge) = storage.get_edge(edge_id) {
let edge_label_id = storage.interner().lookup(&edge.label)?;
if !label_ids.contains(&edge_label_id) {
return None;
}
} else {
return None;
}
}
let mut new_t = input.split(Value::Edge(edge_id));
if track_paths {
new_t.extend_path_unlabeled();
}
Some(new_t)
}))
}
}
#[derive(Clone, Debug)]
pub struct InEStep {
labels: Vec<String>,
}
impl InEStep {
pub fn new() -> Self {
Self { labels: Vec::new() }
}
pub fn with_labels(labels: Vec<String>) -> Self {
Self { labels }
}
}
impl Default for InEStep {
fn default() -> Self {
Self::new()
}
}
impl Step for InEStep {
type Iter<'a>
= Box<dyn Iterator<Item = Traverser> + 'a>
where
Self: 'a;
fn apply<'a>(
&'a self,
ctx: &'a ExecutionContext<'a>,
input: Box<dyn Iterator<Item = Traverser> + 'a>,
) -> Self::Iter<'a> {
let label_ids: Vec<u32> = if self.labels.is_empty() {
Vec::new()
} else {
ctx.resolve_labels(&self.labels.iter().map(|s| s.as_str()).collect::<Vec<_>>())
};
if !self.labels.is_empty() && label_ids.is_empty() {
return Box::new(std::iter::empty());
}
let track_paths = ctx.is_tracking_paths();
Box::new(input.flat_map(move |t| {
let vertex_id = match t.as_vertex_id() {
Some(id) => id,
None => return Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Traverser>>,
};
let label_ids = label_ids.clone();
Box::new(ctx.storage().in_edges(vertex_id).filter_map(move |edge| {
if !label_ids.is_empty() {
let edge_label_id = ctx.interner().lookup(&edge.label)?;
if !label_ids.contains(&edge_label_id) {
return None;
}
}
let mut new_t = t.split(Value::Edge(edge.id));
if track_paths {
new_t.extend_path_unlabeled();
}
Some(new_t)
})) as Box<dyn Iterator<Item = Traverser>>
}))
}
fn name(&self) -> &'static str {
"inE"
}
fn category(&self) -> crate::traversal::explain::StepCategory {
crate::traversal::explain::StepCategory::Navigation
}
fn describe(&self) -> Option<String> {
if self.labels.is_empty() {
None
} else {
Some(self.labels.iter().map(|l| format!("\"{l}\"")).collect::<Vec<_>>().join(", "))
}
}
fn apply_streaming(
&self,
ctx: crate::traversal::context::StreamingContext,
input: Traverser,
) -> Box<dyn Iterator<Item = Traverser> + Send + 'static> {
let vertex_id = match input.as_vertex_id() {
Some(id) => id,
None => return Box::new(std::iter::empty()),
};
let label_ids: Vec<u32> = if self.labels.is_empty() {
Vec::new()
} else {
self.labels
.iter()
.filter_map(|label| ctx.interner().lookup(label))
.collect()
};
if !self.labels.is_empty() && label_ids.is_empty() {
return Box::new(std::iter::empty());
}
let track_paths = ctx.is_tracking_paths();
let edge_ids = ctx.streamable_storage().stream_in_edges(vertex_id);
let storage = ctx.arc_streamable();
Box::new(edge_ids.filter_map(move |edge_id| {
if !label_ids.is_empty() {
if let Some(edge) = storage.get_edge(edge_id) {
let edge_label_id = storage.interner().lookup(&edge.label)?;
if !label_ids.contains(&edge_label_id) {
return None;
}
} else {
return None;
}
}
let mut new_t = input.split(Value::Edge(edge_id));
if track_paths {
new_t.extend_path_unlabeled();
}
Some(new_t)
}))
}
}
#[derive(Clone, Debug)]
pub struct BothEStep {
labels: Vec<String>,
}
impl BothEStep {
pub fn new() -> Self {
Self { labels: Vec::new() }
}
pub fn with_labels(labels: Vec<String>) -> Self {
Self { labels }
}
}
impl Default for BothEStep {
fn default() -> Self {
Self::new()
}
}
impl Step for BothEStep {
type Iter<'a>
= Box<dyn Iterator<Item = Traverser> + 'a>
where
Self: 'a;
fn apply<'a>(
&'a self,
ctx: &'a ExecutionContext<'a>,
input: Box<dyn Iterator<Item = Traverser> + 'a>,
) -> Self::Iter<'a> {
let label_ids: Vec<u32> = if self.labels.is_empty() {
Vec::new()
} else {
ctx.resolve_labels(&self.labels.iter().map(|s| s.as_str()).collect::<Vec<_>>())
};
if !self.labels.is_empty() && label_ids.is_empty() {
return Box::new(std::iter::empty());
}
let track_paths = ctx.is_tracking_paths();
Box::new(input.flat_map(move |t| {
let vertex_id = match t.as_vertex_id() {
Some(id) => id,
None => return Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Traverser>>,
};
let label_ids_out = label_ids.clone();
let label_ids_in = label_ids.clone();
let t_for_in = t.clone();
let out_iter = ctx.storage().out_edges(vertex_id).filter_map(move |edge| {
if !label_ids_out.is_empty() {
let edge_label_id = ctx.interner().lookup(&edge.label)?;
if !label_ids_out.contains(&edge_label_id) {
return None;
}
}
let mut new_t = t.split(Value::Edge(edge.id));
if track_paths {
new_t.extend_path_unlabeled();
}
Some(new_t)
});
let in_iter = ctx.storage().in_edges(vertex_id).filter_map(move |edge| {
if !label_ids_in.is_empty() {
let edge_label_id = ctx.interner().lookup(&edge.label)?;
if !label_ids_in.contains(&edge_label_id) {
return None;
}
}
let mut new_t = t_for_in.split(Value::Edge(edge.id));
if track_paths {
new_t.extend_path_unlabeled();
}
Some(new_t)
});
Box::new(out_iter.chain(in_iter)) as Box<dyn Iterator<Item = Traverser>>
}))
}
fn name(&self) -> &'static str {
"bothE"
}
fn category(&self) -> crate::traversal::explain::StepCategory {
crate::traversal::explain::StepCategory::Navigation
}
fn describe(&self) -> Option<String> {
if self.labels.is_empty() {
None
} else {
Some(self.labels.iter().map(|l| format!("\"{l}\"")).collect::<Vec<_>>().join(", "))
}
}
fn apply_streaming(
&self,
ctx: crate::traversal::context::StreamingContext,
input: Traverser,
) -> Box<dyn Iterator<Item = Traverser> + Send + 'static> {
let vertex_id = match input.as_vertex_id() {
Some(id) => id,
None => return Box::new(std::iter::empty()),
};
let label_ids: Vec<u32> = if self.labels.is_empty() {
Vec::new()
} else {
self.labels
.iter()
.filter_map(|label| ctx.interner().lookup(label))
.collect()
};
if !self.labels.is_empty() && label_ids.is_empty() {
return Box::new(std::iter::empty());
}
let track_paths = ctx.is_tracking_paths();
let label_ids_in = label_ids.clone();
let storage = ctx.arc_streamable();
let storage_in = storage.clone();
let out_edge_ids = ctx.streamable_storage().stream_out_edges(vertex_id);
let in_edge_ids = ctx.streamable_storage().stream_in_edges(vertex_id);
let input_out = input.clone();
let out_iter = out_edge_ids.filter_map(move |edge_id| {
if !label_ids.is_empty() {
if let Some(edge) = storage.get_edge(edge_id) {
let edge_label_id = storage.interner().lookup(&edge.label)?;
if !label_ids.contains(&edge_label_id) {
return None;
}
} else {
return None;
}
}
let mut new_t = input_out.split(Value::Edge(edge_id));
if track_paths {
new_t.extend_path_unlabeled();
}
Some(new_t)
});
let in_iter = in_edge_ids.filter_map(move |edge_id| {
if !label_ids_in.is_empty() {
if let Some(edge) = storage_in.get_edge(edge_id) {
let edge_label_id = storage_in.interner().lookup(&edge.label)?;
if !label_ids_in.contains(&edge_label_id) {
return None;
}
} else {
return None;
}
}
let mut new_t = input.split(Value::Edge(edge_id));
if track_paths {
new_t.extend_path_unlabeled();
}
Some(new_t)
});
Box::new(out_iter.chain(in_iter))
}
}
#[derive(Clone, Copy, Debug)]
pub struct OutVStep;
impl OutVStep {
pub fn new() -> Self {
Self
}
}
impl Default for OutVStep {
fn default() -> Self {
Self::new()
}
}
impl Step for OutVStep {
type Iter<'a>
= Box<dyn Iterator<Item = Traverser> + 'a>
where
Self: 'a;
fn apply<'a>(
&'a self,
ctx: &'a ExecutionContext<'a>,
input: Box<dyn Iterator<Item = Traverser> + 'a>,
) -> Self::Iter<'a> {
Box::new(input.filter_map(move |t| {
let edge_id = t.as_edge_id()?;
let edge = ctx.storage().get_edge(edge_id)?;
let mut new_t = t.split(Value::Vertex(edge.src));
if ctx.is_tracking_paths() {
new_t.extend_path_unlabeled();
}
Some(new_t)
}))
}
fn name(&self) -> &'static str {
"outV"
}
fn category(&self) -> crate::traversal::explain::StepCategory {
crate::traversal::explain::StepCategory::Navigation
}
fn apply_streaming(
&self,
ctx: crate::traversal::context::StreamingContext,
input: Traverser,
) -> Box<dyn Iterator<Item = Traverser> + Send + 'static> {
let edge_id = match input.as_edge_id() {
Some(id) => id,
None => return Box::new(std::iter::empty()),
};
let edge = match ctx.storage().get_edge(edge_id) {
Some(e) => e,
None => return Box::new(std::iter::empty()),
};
let track_paths = ctx.is_tracking_paths();
let mut new_t = input.split(Value::Vertex(edge.src));
if track_paths {
new_t.extend_path_unlabeled();
}
Box::new(std::iter::once(new_t))
}
}
#[derive(Clone, Copy, Debug)]
pub struct InVStep;
impl InVStep {
pub fn new() -> Self {
Self
}
}
impl Default for InVStep {
fn default() -> Self {
Self::new()
}
}
impl Step for InVStep {
type Iter<'a>
= Box<dyn Iterator<Item = Traverser> + 'a>
where
Self: 'a;
fn apply<'a>(
&'a self,
ctx: &'a ExecutionContext<'a>,
input: Box<dyn Iterator<Item = Traverser> + 'a>,
) -> Self::Iter<'a> {
Box::new(input.filter_map(move |t| {
let edge_id = t.as_edge_id()?;
let edge = ctx.storage().get_edge(edge_id)?;
let mut new_t = t.split(Value::Vertex(edge.dst));
if ctx.is_tracking_paths() {
new_t.extend_path_unlabeled();
}
Some(new_t)
}))
}
fn name(&self) -> &'static str {
"inV"
}
fn category(&self) -> crate::traversal::explain::StepCategory {
crate::traversal::explain::StepCategory::Navigation
}
fn apply_streaming(
&self,
ctx: crate::traversal::context::StreamingContext,
input: Traverser,
) -> Box<dyn Iterator<Item = Traverser> + Send + 'static> {
let edge_id = match input.as_edge_id() {
Some(id) => id,
None => return Box::new(std::iter::empty()),
};
let edge = match ctx.storage().get_edge(edge_id) {
Some(e) => e,
None => return Box::new(std::iter::empty()),
};
let track_paths = ctx.is_tracking_paths();
let mut new_t = input.split(Value::Vertex(edge.dst));
if track_paths {
new_t.extend_path_unlabeled();
}
Box::new(std::iter::once(new_t))
}
}
#[derive(Clone, Copy, Debug)]
pub struct BothVStep;
impl BothVStep {
pub fn new() -> Self {
Self
}
}
impl Default for BothVStep {
fn default() -> Self {
Self::new()
}
}
impl Step for BothVStep {
type Iter<'a>
= Box<dyn Iterator<Item = Traverser> + 'a>
where
Self: 'a;
fn apply<'a>(
&'a self,
ctx: &'a ExecutionContext<'a>,
input: Box<dyn Iterator<Item = Traverser> + 'a>,
) -> Self::Iter<'a> {
Box::new(input.flat_map(move |t| {
let edge_id = match t.as_edge_id() {
Some(id) => id,
None => return Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Traverser>>,
};
match ctx.storage().get_edge(edge_id) {
Some(edge) => {
let track_paths = ctx.is_tracking_paths();
let mut src = t.split(Value::Vertex(edge.src));
let mut dst = t.split(Value::Vertex(edge.dst));
if track_paths {
src.extend_path_unlabeled();
dst.extend_path_unlabeled();
}
Box::new([src, dst].into_iter()) as Box<dyn Iterator<Item = Traverser>>
}
None => Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Traverser>>,
}
}))
}
fn name(&self) -> &'static str {
"bothV"
}
fn category(&self) -> crate::traversal::explain::StepCategory {
crate::traversal::explain::StepCategory::Navigation
}
fn apply_streaming(
&self,
ctx: crate::traversal::context::StreamingContext,
input: Traverser,
) -> Box<dyn Iterator<Item = Traverser> + Send + 'static> {
let edge_id = match input.as_edge_id() {
Some(id) => id,
None => return Box::new(std::iter::empty()),
};
let edge = match ctx.storage().get_edge(edge_id) {
Some(e) => e,
None => return Box::new(std::iter::empty()),
};
let track_paths = ctx.is_tracking_paths();
let mut src = input.split(Value::Vertex(edge.src));
let mut dst = input.split(Value::Vertex(edge.dst));
if track_paths {
src.extend_path_unlabeled();
dst.extend_path_unlabeled();
}
Box::new([src, dst].into_iter())
}
}
#[derive(Clone, Copy, Debug, Default)]
pub struct OtherVStep;
impl OtherVStep {
pub fn new() -> Self {
Self
}
fn get_other_vertex(
&self,
ctx: &ExecutionContext<'_>,
traverser: &Traverser,
) -> Option<crate::value::VertexId> {
use crate::traversal::PathValue;
let edge_id = match &traverser.value {
Value::Edge(id) => *id,
_ => return None,
};
let edge = ctx.storage().get_edge(edge_id)?;
let path_len = traverser.path.len();
if path_len < 2 {
return Some(edge.dst);
}
let path_elements: Vec<_> = traverser.path.elements().collect();
let prev_element = &path_elements[path_len - 2];
match &prev_element.value {
PathValue::Vertex(prev_id) => {
if *prev_id == edge.src {
Some(edge.dst)
} else if *prev_id == edge.dst {
Some(edge.src)
} else {
None
}
}
_ => None,
}
}
}
impl Step for OtherVStep {
type Iter<'a>
= Box<dyn Iterator<Item = Traverser> + 'a>
where
Self: 'a;
fn apply<'a>(
&'a self,
ctx: &'a ExecutionContext<'a>,
input: Box<dyn Iterator<Item = Traverser> + 'a>,
) -> Self::Iter<'a> {
Box::new(input.filter_map(move |t| {
self.get_other_vertex(ctx, &t).map(|vid| {
let mut new_t = t.with_value(Value::Vertex(vid));
if ctx.is_tracking_paths() {
new_t.extend_path_unlabeled();
}
new_t
})
}))
}
fn name(&self) -> &'static str {
"otherV"
}
fn category(&self) -> crate::traversal::explain::StepCategory {
crate::traversal::explain::StepCategory::Navigation
}
fn apply_streaming(
&self,
ctx: crate::traversal::context::StreamingContext,
input: Traverser,
) -> Box<dyn Iterator<Item = Traverser> + Send + 'static> {
use crate::traversal::PathValue;
let edge_id = match &input.value {
Value::Edge(id) => *id,
_ => return Box::new(std::iter::empty()),
};
let edge = match ctx.storage().get_edge(edge_id) {
Some(e) => e,
None => return Box::new(std::iter::empty()),
};
let path_len = input.path.len();
let other_vid = if path_len >= 2 {
let path_elements: Vec<_> = input.path.elements().collect();
let prev_element = &path_elements[path_len - 2];
match &prev_element.value {
PathValue::Vertex(prev_id) => {
if *prev_id == edge.src {
edge.dst
} else if *prev_id == edge.dst {
edge.src
} else {
edge.dst
}
}
_ => edge.dst, }
} else {
edge.dst
};
let track_paths = ctx.is_tracking_paths();
let mut new_t = input.with_value(Value::Vertex(other_vid));
if track_paths {
new_t.extend_path_unlabeled();
}
Box::new(std::iter::once(new_t))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::Graph;
use crate::traversal::step::DynStep;
use crate::value::{EdgeId, VertexId};
use std::collections::HashMap;
fn create_test_graph() -> Graph {
let graph = Graph::new();
let alice = graph.add_vertex("person", {
let mut props = HashMap::new();
props.insert("name".to_string(), Value::String("Alice".to_string()));
props
});
let bob = graph.add_vertex("person", {
let mut props = HashMap::new();
props.insert("name".to_string(), Value::String("Bob".to_string()));
props
});
let charlie = graph.add_vertex("person", {
let mut props = HashMap::new();
props.insert("name".to_string(), Value::String("Charlie".to_string()));
props
});
let graphdb = graph.add_vertex("software", {
let mut props = HashMap::new();
props.insert("name".to_string(), Value::String("GraphDB".to_string()));
props
});
graph.add_edge(alice, bob, "knows", HashMap::new()).unwrap();
graph
.add_edge(bob, charlie, "knows", HashMap::new())
.unwrap();
graph
.add_edge(alice, graphdb, "uses", HashMap::new())
.unwrap();
graph
.add_edge(bob, graphdb, "uses", HashMap::new())
.unwrap();
graph
}
mod out_step_tests {
use super::*;
#[test]
fn out_step_new() {
let step = OutStep::new();
assert!(step.labels.is_empty());
}
#[test]
fn out_step_with_labels() {
let step = OutStep::with_labels(vec!["knows".to_string()]);
assert_eq!(step.labels, vec!["knows"]);
}
#[test]
fn out_step_name() {
let step = OutStep::new();
assert_eq!(step.name(), "out");
}
#[test]
fn out_step_clone_box() {
let step = OutStep::with_labels(vec!["test".to_string()]);
let cloned = DynStep::clone_box(&step);
assert_eq!(cloned.dyn_name(), "out");
}
#[test]
fn out_traverses_all_outgoing_edges() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let results = g.v_ids([VertexId(0)]).out().to_list();
assert_eq!(results.len(), 2);
let vertex_ids: Vec<_> = results.iter().filter_map(|v| v.as_vertex_id()).collect();
assert!(vertex_ids.contains(&VertexId(1))); assert!(vertex_ids.contains(&VertexId(3))); }
#[test]
fn out_with_label_filter() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let results = g.v_ids([VertexId(0)]).out_labels(&["knows"]).to_list();
assert_eq!(results.len(), 1);
assert_eq!(results[0], Value::Vertex(VertexId(1)));
}
#[test]
fn out_from_non_vertex_produces_nothing() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let results = g.inject([42i64]).out().to_list();
assert!(results.is_empty());
}
#[test]
fn out_from_vertex_with_no_out_edges() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let results = g.v_ids([VertexId(2)]).out().to_list();
assert!(results.is_empty());
}
#[test]
fn out_chained() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let results = g
.v_ids([VertexId(0)])
.out_labels(&["knows"])
.out_labels(&["knows"])
.to_list();
assert_eq!(results.len(), 1);
assert_eq!(results[0], Value::Vertex(VertexId(2))); }
#[test]
fn out_with_multiple_labels() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let results = g
.v_ids([VertexId(0)])
.out_labels(&["knows", "uses"])
.to_list();
assert_eq!(results.len(), 2);
}
#[test]
fn out_with_nonexistent_label() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let results = g.v_ids([VertexId(0)]).out_labels(&["likes"]).to_list();
assert!(results.is_empty());
}
}
mod in_step_tests {
use super::*;
#[test]
fn in_step_new() {
let step = InStep::new();
assert!(step.labels.is_empty());
}
#[test]
fn in_step_name() {
let step = InStep::new();
assert_eq!(step.name(), "in");
}
#[test]
fn in_traverses_all_incoming_edges() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let results = g.v_ids([VertexId(1)]).in_().to_list();
assert_eq!(results.len(), 1);
assert_eq!(results[0], Value::Vertex(VertexId(0))); }
#[test]
fn in_with_label_filter() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let results = g.v_ids([VertexId(3)]).in_labels(&["uses"]).to_list();
assert_eq!(results.len(), 2);
}
#[test]
fn in_from_vertex_with_no_in_edges() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let results = g.v_ids([VertexId(0)]).in_().to_list();
assert!(results.is_empty());
}
}
mod both_step_tests {
use super::*;
#[test]
fn both_step_name() {
let step = BothStep::new();
assert_eq!(step.name(), "both");
}
#[test]
fn both_traverses_all_directions() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let results = g.v_ids([VertexId(1)]).both().to_list();
assert_eq!(results.len(), 3);
}
#[test]
fn both_with_label_filter() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let results = g.v_ids([VertexId(1)]).both_labels(&["knows"]).to_list();
assert_eq!(results.len(), 2);
}
}
mod out_e_step_tests {
use super::*;
#[test]
fn out_e_step_name() {
let step = OutEStep::new();
assert_eq!(step.name(), "outE");
}
#[test]
fn out_e_returns_edges() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let results = g.v_ids([VertexId(0)]).out_e().to_list();
assert_eq!(results.len(), 2);
for result in &results {
assert!(result.as_edge_id().is_some());
}
}
#[test]
fn out_e_with_label_filter() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let results = g.v_ids([VertexId(0)]).out_e_labels(&["knows"]).to_list();
assert_eq!(results.len(), 1);
}
}
mod in_e_step_tests {
use super::*;
#[test]
fn in_e_step_name() {
let step = InEStep::new();
assert_eq!(step.name(), "inE");
}
#[test]
fn in_e_returns_edges() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let results = g.v_ids([VertexId(3)]).in_e().to_list();
assert_eq!(results.len(), 2);
for result in &results {
assert!(result.as_edge_id().is_some());
}
}
}
mod both_e_step_tests {
use super::*;
#[test]
fn both_e_step_name() {
let step = BothEStep::new();
assert_eq!(step.name(), "bothE");
}
#[test]
fn both_e_returns_all_edges() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let results = g.v_ids([VertexId(1)]).both_e().to_list();
assert_eq!(results.len(), 3);
for result in &results {
assert!(result.as_edge_id().is_some());
}
}
}
mod out_v_step_tests {
use super::*;
#[test]
fn out_v_step_name() {
let step = OutVStep::new();
assert_eq!(step.name(), "outV");
}
#[test]
fn out_v_returns_source_vertex() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let results = g.e().out_v().to_list();
assert_eq!(results.len(), 4);
for result in &results {
assert!(result.as_vertex_id().is_some());
}
}
#[test]
fn out_v_from_non_edge_produces_nothing() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let results = g.inject([42i64]).out_v().to_list();
assert!(results.is_empty());
}
#[test]
fn out_e_in_v_traversal() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let via_out = g.v_ids([VertexId(0)]).out().to_list();
let via_edges = g.v_ids([VertexId(0)]).out_e().in_v().to_list();
assert_eq!(via_out.len(), via_edges.len());
}
}
mod in_v_step_tests {
use super::*;
#[test]
fn in_v_step_name() {
let step = InVStep::new();
assert_eq!(step.name(), "inV");
}
#[test]
fn in_v_returns_target_vertex() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let results = g.e().in_v().to_list();
assert_eq!(results.len(), 4);
for result in &results {
assert!(result.as_vertex_id().is_some());
}
}
}
mod both_v_step_tests {
use super::*;
#[test]
fn both_v_step_name() {
let step = BothVStep::new();
assert_eq!(step.name(), "bothV");
}
#[test]
fn both_v_returns_both_vertices() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let results = g.e_ids([EdgeId(0)]).both_v().to_list();
assert_eq!(results.len(), 2);
let vertex_ids: Vec<_> = results.iter().filter_map(|v| v.as_vertex_id()).collect();
assert!(vertex_ids.contains(&VertexId(0))); assert!(vertex_ids.contains(&VertexId(1))); }
#[test]
fn both_v_from_non_edge_produces_nothing() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let results = g.v_ids([VertexId(0)]).both_v().to_list();
assert!(results.is_empty());
}
}
mod other_v_step_tests {
use super::*;
#[test]
fn other_v_step_new() {
let step = OtherVStep::new();
assert_eq!(step.name(), "otherV");
}
#[test]
fn other_v_step_default() {
let step = OtherVStep;
assert_eq!(step.name(), "otherV");
}
#[test]
fn other_v_step_clone_box() {
let step = OtherVStep::new();
let cloned = DynStep::clone_box(&step);
assert_eq!(cloned.dyn_name(), "otherV");
}
#[test]
fn other_v_from_out_edge_returns_target() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let results = g
.v_ids([VertexId(0)])
.with_path()
.out_e()
.other_v()
.to_list();
assert_eq!(results.len(), 2);
let vertex_ids: Vec<_> = results.iter().filter_map(|v| v.as_vertex_id()).collect();
assert!(vertex_ids.contains(&VertexId(1))); assert!(vertex_ids.contains(&VertexId(3))); }
#[test]
fn other_v_from_in_edge_returns_source() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let results = g
.v_ids([VertexId(1)])
.with_path()
.in_e_labels(&["knows"])
.other_v()
.to_list();
assert_eq!(results.len(), 1);
assert_eq!(results[0], Value::Vertex(VertexId(0))); }
#[test]
fn other_v_from_both_e_returns_opposite() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let results = g
.v_ids([VertexId(1)])
.with_path()
.both_e()
.other_v()
.to_list();
assert_eq!(results.len(), 3);
let vertex_ids: Vec<_> = results.iter().filter_map(|v| v.as_vertex_id()).collect();
assert!(vertex_ids.contains(&VertexId(0))); assert!(vertex_ids.contains(&VertexId(2))); assert!(vertex_ids.contains(&VertexId(3))); }
#[test]
fn other_v_from_non_edge_produces_nothing() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let results = g.v_ids([VertexId(0)]).with_path().other_v().to_list();
assert!(results.is_empty());
}
#[test]
fn other_v_with_label_filter() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let results = g
.v_ids([VertexId(0)])
.with_path()
.out_e_labels(&["knows"])
.other_v()
.to_list();
assert_eq!(results.len(), 1);
assert_eq!(results[0], Value::Vertex(VertexId(1))); }
#[test]
fn other_v_equivalent_to_in_v_after_out_e() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let via_other_v = g
.v_ids([VertexId(0)])
.with_path()
.out_e_labels(&["knows"])
.other_v()
.to_list();
let g2 = snapshot.gremlin();
let via_in_v = g2
.v_ids([VertexId(0)])
.out_e_labels(&["knows"])
.in_v()
.to_list();
assert_eq!(via_other_v, via_in_v);
}
#[test]
fn other_v_equivalent_to_out_v_after_in_e() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let via_other_v = g
.v_ids([VertexId(1)])
.with_path()
.in_e_labels(&["knows"])
.other_v()
.to_list();
let g2 = snapshot.gremlin();
let via_out_v = g2
.v_ids([VertexId(1)])
.in_e_labels(&["knows"])
.out_v()
.to_list();
assert_eq!(via_other_v, via_out_v);
}
#[test]
fn other_v_without_path_tracking_uses_fallback() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let results = g.v_ids([VertexId(0)]).out_e().other_v().to_list();
assert!(!results.is_empty());
}
#[test]
fn other_v_chained_traversal() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let results = g
.v_ids([VertexId(0)])
.with_path()
.out_e_labels(&["knows"])
.other_v()
.out_e_labels(&["knows"])
.other_v()
.to_list();
assert_eq!(results.len(), 1);
assert_eq!(results[0], Value::Vertex(VertexId(2))); }
#[test]
fn other_v_with_injected_edge_no_path() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let results = g
.inject([Value::Edge(EdgeId(0))])
.with_path()
.other_v()
.to_list();
assert_eq!(results.len(), 1);
assert_eq!(results[0], Value::Vertex(VertexId(1))); }
#[test]
fn other_v_values_property_access() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let results = g
.v_ids([VertexId(0)])
.with_path()
.out_e_labels(&["knows"])
.other_v()
.values("name")
.to_list();
assert_eq!(results.len(), 1);
assert_eq!(results[0], Value::String("Bob".to_string()));
}
}
mod integration_tests {
use super::*;
#[test]
fn complex_traversal() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let results = g
.v_ids([VertexId(3)]) .in_labels(&["uses"])
.to_list();
assert_eq!(results.len(), 2);
let vertex_ids: Vec<_> = results.iter().filter_map(|v| v.as_vertex_id()).collect();
assert!(vertex_ids.contains(&VertexId(0))); assert!(vertex_ids.contains(&VertexId(1))); }
#[test]
fn multi_hop_traversal() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let results = g
.v_ids([VertexId(0)])
.out_labels(&["knows"])
.out_labels(&["knows"])
.to_list();
assert_eq!(results.len(), 1);
assert_eq!(results[0], Value::Vertex(VertexId(2))); }
#[test]
fn dedup_with_navigation() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let with_dups = g.v_ids([VertexId(0)]).out().to_list();
let without_dups = g.v_ids([VertexId(0)]).out().dedup().to_list();
assert_eq!(with_dups.len(), without_dups.len());
}
}
}