use std::collections::HashMap;
use std::marker::PhantomData;
use crate::traversal::step::{execute_traversal_from, DynStep, Step};
use crate::traversal::{ExecutionContext, Traversal, Traverser};
use crate::value::Value;
#[inline]
fn value_to_map_key(value: &Value) -> String {
match value {
Value::String(s) => s.clone(),
Value::Int(n) => n.to_string(),
Value::Float(f) => f.to_string(),
Value::Bool(b) => b.to_string(),
Value::Vertex(id) => format!("v[{}]", id.0),
Value::Edge(id) => format!("e[{}]", id.0),
Value::Null => "null".to_string(),
Value::List(_) => "[list]".to_string(),
Value::Map(_) => "[map]".to_string(),
Value::Point(p) => p.to_string(),
Value::Polygon(p) => p.to_string(),
}
}
#[derive(Clone, Debug)]
pub enum GroupKey {
Label,
Property(String),
Traversal(Box<Traversal<crate::Value, crate::Value>>),
}
impl GroupKey {
#[inline]
pub fn by_label() -> Self {
GroupKey::Label
}
#[inline]
pub fn by_property(key: impl Into<String>) -> Self {
GroupKey::Property(key.into())
}
#[inline]
pub fn by_traversal(traversal: Traversal<crate::Value, crate::Value>) -> Self {
GroupKey::Traversal(Box::new(traversal))
}
}
#[derive(Clone, Debug)]
pub enum GroupValue {
Identity,
Property(String),
Traversal(Box<Traversal<crate::Value, crate::Value>>),
Fold(Box<Traversal<crate::Value, crate::Value>>),
}
impl GroupValue {
pub fn identity() -> Self {
GroupValue::Identity
}
pub fn by_property(key: impl Into<String>) -> Self {
GroupValue::Property(key.into())
}
pub fn by_traversal(traversal: Traversal<crate::Value, crate::Value>) -> Self {
GroupValue::Traversal(Box::new(traversal))
}
}
#[derive(Clone)]
pub struct GroupStep {
key_selector: GroupKey,
value_collector: GroupValue,
}
impl GroupStep {
pub fn new() -> Self {
Self {
key_selector: GroupKey::Label,
value_collector: GroupValue::Identity,
}
}
pub fn with_selectors(key_selector: GroupKey, value_collector: GroupValue) -> Self {
Self {
key_selector,
value_collector,
}
}
fn get_key(&self, ctx: &ExecutionContext, traverser: &Traverser) -> Option<Value> {
match &self.key_selector {
GroupKey::Label => {
match &traverser.value {
Value::Vertex(id) => ctx
.storage()
.get_vertex(*id)
.map(|v| Value::String(v.label.clone())),
Value::Edge(id) => ctx
.storage()
.get_edge(*id)
.map(|e| Value::String(e.label.clone())),
_ => None,
}
}
GroupKey::Property(key) => {
match &traverser.value {
Value::Vertex(id) => ctx
.storage()
.get_vertex(*id)
.and_then(|v| v.properties.get(key).cloned()),
Value::Edge(id) => ctx
.storage()
.get_edge(*id)
.and_then(|e| e.properties.get(key).cloned()),
_ => None,
}
}
GroupKey::Traversal(sub) => {
execute_traversal_from(ctx, sub, Box::new(std::iter::once(traverser.clone())))
.next()
.map(|t| t.value)
}
}
}
fn get_value(&self, ctx: &ExecutionContext, traverser: &Traverser) -> Option<Value> {
match &self.value_collector {
GroupValue::Identity => {
Some(traverser.value.clone())
}
GroupValue::Property(key) => {
match &traverser.value {
Value::Vertex(id) => ctx
.storage()
.get_vertex(*id)
.and_then(|v| v.properties.get(key).cloned()),
Value::Edge(id) => ctx
.storage()
.get_edge(*id)
.and_then(|e| e.properties.get(key).cloned()),
_ => None,
}
}
GroupValue::Traversal(sub) => {
let results: Vec<Value> =
execute_traversal_from(ctx, sub, Box::new(std::iter::once(traverser.clone())))
.map(|t| t.value)
.collect();
if results.is_empty() {
None
} else if results.len() == 1 {
Some(results.into_iter().next().unwrap())
} else {
Some(Value::List(results))
}
}
GroupValue::Fold(_) => {
Some(traverser.value.clone())
}
}
}
fn apply_per_element<'a>(
&'a self,
ctx: &'a ExecutionContext<'a>,
input: Box<dyn Iterator<Item = Traverser> + 'a>,
) -> std::iter::Once<Traverser> {
let mut groups: HashMap<Value, Vec<Value>> = HashMap::new();
let mut last_path = None;
for traverser in input {
last_path = Some(traverser.path.clone());
if let Some(key) = self.get_key(ctx, &traverser) {
if matches!(key, Value::List(_) | Value::Map(_)) {
continue;
}
if let Some(value) = self.get_value(ctx, &traverser) {
groups.entry(key).or_default().push(value);
}
}
}
let mut result_map: crate::value::ValueMap = crate::value::ValueMap::new();
for (key, values) in groups {
let key_str = value_to_map_key(&key);
result_map.insert(key_str, Value::List(values));
}
let result_value = Value::Map(result_map);
let result_traverser = Traverser {
value: result_value,
path: last_path.unwrap_or_default(),
loops: 0,
sack: None,
bulk: 1,
};
std::iter::once(result_traverser)
}
fn apply_with_fold<'a>(
&'a self,
ctx: &'a ExecutionContext<'a>,
input: Box<dyn Iterator<Item = Traverser> + 'a>,
) -> std::iter::Once<Traverser> {
let mut groups: HashMap<Value, Vec<Traverser>> = HashMap::new();
let mut last_path = None;
for traverser in input {
last_path = Some(traverser.path.clone());
if let Some(key) = self.get_key(ctx, &traverser) {
if matches!(key, Value::List(_) | Value::Map(_)) {
continue;
}
groups.entry(key).or_default().push(traverser);
}
}
let fold_traversal = match &self.value_collector {
GroupValue::Fold(t) => t.as_ref(),
_ => unreachable!("apply_with_fold called without Fold value collector"),
};
let mut result_map: crate::value::ValueMap = crate::value::ValueMap::new();
for (key, traversers) in groups {
let key_str = value_to_map_key(&key);
let fold_input = Box::new(traversers.into_iter());
let fold_results: Vec<Value> = execute_traversal_from(ctx, fold_traversal, fold_input)
.map(|t| t.value)
.collect();
let fold_result = if fold_results.is_empty() {
Value::Null
} else if fold_results.len() == 1 {
fold_results.into_iter().next().unwrap()
} else {
Value::List(fold_results)
};
result_map.insert(key_str, fold_result);
}
let result_value = Value::Map(result_map);
let result_traverser = Traverser {
value: result_value,
path: last_path.unwrap_or_default(),
loops: 0,
sack: None,
bulk: 1,
};
std::iter::once(result_traverser)
}
}
impl Default for GroupStep {
fn default() -> Self {
Self::new()
}
}
impl Step for GroupStep {
type Iter<'a>
= std::iter::Once<Traverser>
where
Self: 'a;
fn apply<'a>(
&'a self,
ctx: &'a ExecutionContext<'a>,
input: Box<dyn Iterator<Item = Traverser> + 'a>,
) -> Self::Iter<'a> {
let is_fold = matches!(self.value_collector, GroupValue::Fold(_));
if is_fold {
self.apply_with_fold(ctx, input)
} else {
self.apply_per_element(ctx, input)
}
}
fn name(&self) -> &'static str {
"group"
}
fn is_barrier(&self) -> bool {
true
}
fn category(&self) -> crate::traversal::explain::StepCategory {
crate::traversal::explain::StepCategory::Aggregation
}
fn apply_streaming(
&self,
_ctx: crate::traversal::context::StreamingContext,
input: Traverser,
) -> Box<dyn Iterator<Item = Traverser> + Send + 'static> {
Box::new(std::iter::once(input))
}
}
pub struct GroupBuilder<In> {
steps: Vec<Box<dyn DynStep>>,
key_selector: Option<GroupKey>,
value_collector: Option<GroupValue>,
_phantom: PhantomData<In>,
}
impl<In> GroupBuilder<In> {
pub(crate) fn new(steps: Vec<Box<dyn DynStep>>) -> Self {
Self {
steps,
key_selector: None,
value_collector: None,
_phantom: PhantomData,
}
}
pub fn by_label(mut self) -> Self {
self.key_selector = Some(GroupKey::Label);
self
}
pub fn by_key(mut self, key: &str) -> Self {
self.key_selector = Some(GroupKey::Property(key.to_string()));
self
}
pub fn by_traversal(mut self, t: Traversal<Value, Value>) -> Self {
self.key_selector = Some(GroupKey::Traversal(Box::new(t)));
self
}
pub fn by_value(mut self) -> Self {
self.value_collector = Some(GroupValue::Identity);
self
}
pub fn by_value_key(mut self, key: &str) -> Self {
self.value_collector = Some(GroupValue::Property(key.to_string()));
self
}
pub fn by_value_traversal(mut self, t: Traversal<Value, Value>) -> Self {
self.value_collector = Some(GroupValue::Traversal(Box::new(t)));
self
}
pub fn by_value_fold(mut self, t: Traversal<Value, Value>) -> Self {
self.value_collector = Some(GroupValue::Fold(Box::new(t)));
self
}
pub fn build(mut self) -> Traversal<In, Value> {
let key_selector = self.key_selector.unwrap_or(GroupKey::Label);
let value_collector = self.value_collector.unwrap_or(GroupValue::Identity);
let group_step = GroupStep::with_selectors(key_selector, value_collector);
self.steps.push(Box::new(group_step));
Traversal {
steps: self.steps,
source: None,
_phantom: PhantomData,
}
}
}
pub struct BoundGroupBuilder<'g, In> {
snapshot: &'g dyn crate::traversal::SnapshotLike,
source: Option<crate::traversal::TraversalSource>,
steps: Vec<Box<dyn DynStep>>,
key_selector: Option<GroupKey>,
value_collector: Option<GroupValue>,
track_paths: bool,
_phantom: PhantomData<In>,
}
impl<'g, In> BoundGroupBuilder<'g, In> {
pub(crate) fn new(
snapshot: &'g dyn crate::traversal::SnapshotLike,
source: Option<crate::traversal::TraversalSource>,
steps: Vec<Box<dyn DynStep>>,
track_paths: bool,
) -> Self {
Self {
snapshot,
source,
steps,
key_selector: None,
value_collector: None,
track_paths,
_phantom: PhantomData,
}
}
pub fn by_label(mut self) -> Self {
self.key_selector = Some(GroupKey::Label);
self
}
pub fn by_key(mut self, key: &str) -> Self {
self.key_selector = Some(GroupKey::Property(key.to_string()));
self
}
pub fn by_traversal(mut self, t: Traversal<Value, Value>) -> Self {
self.key_selector = Some(GroupKey::Traversal(Box::new(t)));
self
}
pub fn by_value(mut self) -> Self {
self.value_collector = Some(GroupValue::Identity);
self
}
pub fn by_value_key(mut self, key: &str) -> Self {
self.value_collector = Some(GroupValue::Property(key.to_string()));
self
}
pub fn by_value_traversal(mut self, t: Traversal<Value, Value>) -> Self {
self.value_collector = Some(GroupValue::Traversal(Box::new(t)));
self
}
pub fn by_value_fold(mut self, t: Traversal<Value, Value>) -> Self {
self.value_collector = Some(GroupValue::Fold(Box::new(t)));
self
}
pub fn build(mut self) -> crate::traversal::source::BoundTraversal<'g, In, Value> {
let key_selector = self.key_selector.unwrap_or(GroupKey::Label);
let value_collector = self.value_collector.unwrap_or(GroupValue::Identity);
let group_step = GroupStep::with_selectors(key_selector, value_collector);
self.steps.push(Box::new(group_step));
let traversal = Traversal {
steps: self.steps,
source: self.source,
_phantom: PhantomData,
};
let mut bound = crate::traversal::source::BoundTraversal::new(self.snapshot, traversal);
if self.track_paths {
bound = bound.with_path();
}
bound
}
}
#[derive(Clone, Debug)]
pub struct GroupCountStep {
key_selector: GroupKey,
}
impl GroupCountStep {
pub fn new(key_selector: GroupKey) -> Self {
GroupCountStep { key_selector }
}
fn get_key(&self, ctx: &ExecutionContext, traverser: &Traverser) -> Option<Value> {
match &self.key_selector {
GroupKey::Label => match &traverser.value {
Value::Vertex(id) => ctx
.storage()
.get_vertex(*id)
.map(|v| Value::String(v.label.clone())),
Value::Edge(id) => ctx
.storage()
.get_edge(*id)
.map(|e| Value::String(e.label.clone())),
_ => None,
},
GroupKey::Property(key) => match &traverser.value {
Value::Vertex(id) => ctx
.storage()
.get_vertex(*id)
.and_then(|v| v.properties.get(key).cloned()),
Value::Edge(id) => ctx
.storage()
.get_edge(*id)
.and_then(|e| e.properties.get(key).cloned()),
_ => None,
},
GroupKey::Traversal(t) => {
let results =
execute_traversal_from(ctx, t, Box::new(std::iter::once(traverser.clone())));
results.into_iter().next().map(|t| t.value)
}
}
}
}
impl Step for GroupCountStep {
type Iter<'a>
= std::iter::Once<Traverser>
where
Self: 'a;
fn apply<'a>(
&'a self,
ctx: &'a ExecutionContext,
input: Box<dyn Iterator<Item = Traverser> + 'a>,
) -> Self::Iter<'a> {
let mut counts: HashMap<Value, i64> = HashMap::new();
let mut last_path = None;
for traverser in input {
last_path = Some(traverser.path.clone());
if let Some(key_value) = self.get_key(ctx, &traverser) {
if matches!(key_value, Value::List(_) | Value::Map(_)) {
continue;
}
*counts.entry(key_value).or_insert(0) += traverser.bulk as i64;
}
}
let mut result_map: crate::value::ValueMap = crate::value::ValueMap::new();
for (key, count) in counts {
let key_str = value_to_map_key(&key);
result_map.insert(key_str, Value::Int(count));
}
let result_value = Value::Map(result_map);
let result_traverser = Traverser {
value: result_value,
path: last_path.unwrap_or_default(),
loops: 0,
sack: None,
bulk: 1,
};
std::iter::once(result_traverser)
}
fn name(&self) -> &'static str {
"groupCount"
}
fn is_barrier(&self) -> bool {
true
}
fn category(&self) -> crate::traversal::explain::StepCategory {
crate::traversal::explain::StepCategory::Aggregation
}
fn apply_streaming(
&self,
_ctx: crate::traversal::context::StreamingContext,
input: Traverser,
) -> Box<dyn Iterator<Item = Traverser> + Send + 'static> {
Box::new(std::iter::once(input))
}
}
pub struct GroupCountBuilder<In> {
steps: Vec<Box<dyn DynStep>>,
key_selector: Option<GroupKey>,
_phantom: PhantomData<In>,
}
impl<In> GroupCountBuilder<In> {
pub(crate) fn new(steps: Vec<Box<dyn DynStep>>) -> Self {
GroupCountBuilder {
steps,
key_selector: None,
_phantom: PhantomData,
}
}
pub fn by_label(mut self) -> Self {
self.key_selector = Some(GroupKey::Label);
self
}
pub fn by_key(mut self, key: &str) -> Self {
self.key_selector = Some(GroupKey::Property(key.to_string()));
self
}
pub fn by_traversal(mut self, traversal: Traversal<Value, Value>) -> Self {
self.key_selector = Some(GroupKey::Traversal(Box::new(traversal)));
self
}
pub fn build(self) -> Traversal<In, Value> {
let key_selector = self.key_selector.unwrap_or(GroupKey::Label);
let mut steps = self.steps;
steps.push(Box::new(GroupCountStep::new(key_selector)));
Traversal {
steps,
source: None,
_phantom: PhantomData,
}
}
}
pub struct BoundGroupCountBuilder<'g, In> {
snapshot: &'g dyn crate::traversal::SnapshotLike,
source: Option<crate::traversal::TraversalSource>,
steps: Vec<Box<dyn DynStep>>,
key_selector: Option<GroupKey>,
track_paths: bool,
_phantom: PhantomData<In>,
}
impl<'g, In> BoundGroupCountBuilder<'g, In> {
pub(crate) fn new(
snapshot: &'g dyn crate::traversal::SnapshotLike,
source: Option<crate::traversal::TraversalSource>,
steps: Vec<Box<dyn DynStep>>,
track_paths: bool,
) -> Self {
BoundGroupCountBuilder {
snapshot,
source,
steps,
key_selector: None,
track_paths,
_phantom: PhantomData,
}
}
pub fn by_label(mut self) -> Self {
self.key_selector = Some(GroupKey::Label);
self
}
pub fn by_key(mut self, key: &str) -> Self {
self.key_selector = Some(GroupKey::Property(key.to_string()));
self
}
pub fn by_traversal(mut self, traversal: Traversal<Value, Value>) -> Self {
self.key_selector = Some(GroupKey::Traversal(Box::new(traversal)));
self
}
pub fn build(self) -> crate::traversal::BoundTraversal<'g, In, Value> {
let key_selector = self.key_selector.unwrap_or(GroupKey::Label);
let mut steps = self.steps;
steps.push(Box::new(GroupCountStep::new(key_selector)));
let traversal = Traversal {
steps,
source: self.source,
_phantom: PhantomData,
};
let mut bound = crate::traversal::source::BoundTraversal::new(self.snapshot, traversal);
if self.track_paths {
bound = bound.with_path();
}
bound
}
}
#[derive(Clone, Copy, Debug, Default)]
pub struct CountStep;
impl CountStep {
#[inline]
pub fn new() -> Self {
Self
}
}
impl Step for CountStep {
type Iter<'a>
= std::iter::Once<Traverser>
where
Self: 'a;
fn apply<'a>(
&'a self,
_ctx: &'a ExecutionContext<'a>,
input: Box<dyn Iterator<Item = Traverser> + 'a>,
) -> Self::Iter<'a> {
let count: i64 = input.map(|t| t.bulk as i64).sum();
std::iter::once(Traverser::new(Value::Int(count)))
}
fn name(&self) -> &'static str {
"count"
}
fn is_barrier(&self) -> bool {
true
}
fn category(&self) -> crate::traversal::explain::StepCategory {
crate::traversal::explain::StepCategory::Aggregation
}
fn apply_streaming(
&self,
_ctx: crate::traversal::context::StreamingContext,
input: Traverser,
) -> Box<dyn Iterator<Item = Traverser> + Send + 'static> {
Box::new(std::iter::once(input))
}
}
#[derive(Clone, Copy, Debug, Default)]
pub struct MinStep;
impl MinStep {
#[inline]
pub fn new() -> Self {
Self
}
fn compare_values(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
use std::cmp::Ordering;
match (a, b) {
(Value::Int(x), Value::Int(y)) => Some(x.cmp(y)),
(Value::Float(x), Value::Float(y)) => x.partial_cmp(y),
(Value::String(x), Value::String(y)) => Some(x.cmp(y)),
(Value::Int(x), Value::Float(y)) => (*x as f64).partial_cmp(y),
(Value::Float(x), Value::Int(y)) => x.partial_cmp(&(*y as f64)),
(Value::Int(_), Value::String(_)) => Some(Ordering::Less),
(Value::String(_), Value::Int(_)) => Some(Ordering::Greater),
(Value::Float(_), Value::String(_)) => Some(Ordering::Less),
(Value::String(_), Value::Float(_)) => Some(Ordering::Greater),
_ => None,
}
}
}
impl Step for MinStep {
type Iter<'a>
= std::iter::Once<Traverser>
where
Self: 'a;
fn apply<'a>(
&'a self,
_ctx: &'a ExecutionContext<'a>,
input: Box<dyn Iterator<Item = Traverser> + 'a>,
) -> Self::Iter<'a> {
let mut min_value: Option<Value> = None;
let mut last_path = None;
for t in input {
last_path = Some(t.path.clone());
if !matches!(t.value, Value::Int(_) | Value::Float(_) | Value::String(_)) {
continue;
}
min_value = match min_value {
None => Some(t.value),
Some(current) => {
if let Some(ord) = Self::compare_values(&t.value, ¤t) {
if ord == std::cmp::Ordering::Less {
Some(t.value)
} else {
Some(current)
}
} else {
Some(current)
}
}
};
}
let result = min_value.unwrap_or(Value::Null);
let mut traverser = Traverser::new(result);
if let Some(path) = last_path {
traverser.path = path;
}
std::iter::once(traverser)
}
fn name(&self) -> &'static str {
"min"
}
fn is_barrier(&self) -> bool {
true
}
fn category(&self) -> crate::traversal::explain::StepCategory {
crate::traversal::explain::StepCategory::Aggregation
}
fn apply_streaming(
&self,
_ctx: crate::traversal::context::StreamingContext,
input: Traverser,
) -> Box<dyn Iterator<Item = Traverser> + Send + 'static> {
Box::new(std::iter::once(input))
}
}
#[derive(Clone, Copy, Debug, Default)]
pub struct MaxStep;
impl MaxStep {
#[inline]
pub fn new() -> Self {
Self
}
}
impl Step for MaxStep {
type Iter<'a>
= std::iter::Once<Traverser>
where
Self: 'a;
fn apply<'a>(
&'a self,
_ctx: &'a ExecutionContext<'a>,
input: Box<dyn Iterator<Item = Traverser> + 'a>,
) -> Self::Iter<'a> {
let mut max_value: Option<Value> = None;
let mut last_path = None;
for t in input {
last_path = Some(t.path.clone());
if !matches!(t.value, Value::Int(_) | Value::Float(_) | Value::String(_)) {
continue;
}
max_value = match max_value {
None => Some(t.value),
Some(current) => {
if let Some(ord) = MinStep::compare_values(&t.value, ¤t) {
if ord == std::cmp::Ordering::Greater {
Some(t.value)
} else {
Some(current)
}
} else {
Some(current)
}
}
};
}
let result = max_value.unwrap_or(Value::Null);
let mut traverser = Traverser::new(result);
if let Some(path) = last_path {
traverser.path = path;
}
std::iter::once(traverser)
}
fn name(&self) -> &'static str {
"max"
}
fn is_barrier(&self) -> bool {
true
}
fn category(&self) -> crate::traversal::explain::StepCategory {
crate::traversal::explain::StepCategory::Aggregation
}
fn apply_streaming(
&self,
_ctx: crate::traversal::context::StreamingContext,
input: Traverser,
) -> Box<dyn Iterator<Item = Traverser> + Send + 'static> {
Box::new(std::iter::once(input))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::Graph;
use crate::traversal::SnapshotLike;
use std::collections::HashMap as StdHashMap;
fn create_test_graph() -> Graph {
let graph = Graph::new();
let mut props1 = StdHashMap::new();
props1.insert("name".to_string(), Value::String("Alice".to_string()));
props1.insert("age".to_string(), Value::Int(29));
graph.add_vertex("person", props1);
let mut props2 = StdHashMap::new();
props2.insert("name".to_string(), Value::String("Bob".to_string()));
props2.insert("age".to_string(), Value::Int(29));
graph.add_vertex("person", props2);
let mut props3 = StdHashMap::new();
props3.insert("name".to_string(), Value::String("Charlie".to_string()));
props3.insert("age".to_string(), Value::Int(30));
graph.add_vertex("person", props3);
let mut props4 = StdHashMap::new();
props4.insert("name".to_string(), Value::String("lop".to_string()));
graph.add_vertex("software", props4);
graph
}
#[test]
fn test_group_by_label_identity() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let result = g.v().group().by_label().by_value().build().next();
assert!(result.is_some());
let result = result.unwrap();
if let Value::Map(map) = result {
assert!(map.contains_key("person"));
assert!(map.contains_key("software"));
if let Some(Value::List(persons)) = map.get("person") {
assert_eq!(persons.len(), 3);
for val in persons {
assert!(matches!(val, Value::Vertex(_)));
}
} else {
panic!("Expected person group to be a list");
}
if let Some(Value::List(softwares)) = map.get("software") {
assert_eq!(softwares.len(), 1);
assert!(matches!(softwares[0], Value::Vertex(_)));
} else {
panic!("Expected software group to be a list");
}
} else {
panic!("Expected Map value, got: {:?}", result);
}
}
#[test]
fn test_group_by_property_collect_property() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let result = g
.v()
.has_label("person")
.group()
.by_key("age")
.by_value_key("name")
.build()
.next();
assert!(result.is_some());
let result = result.unwrap();
if let Value::Map(map) = result {
assert!(map.contains_key("29") || map.contains_key("30"));
if let Some(Value::List(names)) = map.get("29") {
assert_eq!(names.len(), 2);
assert!(names.contains(&Value::String("Alice".to_string())));
assert!(names.contains(&Value::String("Bob".to_string())));
}
if let Some(Value::List(names)) = map.get("30") {
assert_eq!(names.len(), 1);
assert!(names.contains(&Value::String("Charlie".to_string())));
}
} else {
panic!("Expected Map value");
}
}
#[test]
fn test_group_default_selectors() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let result = g.v().group().build().next();
assert!(result.is_some());
let result = result.unwrap();
if let Value::Map(map) = result {
assert!(map.contains_key("person"));
assert!(map.contains_key("software"));
} else {
panic!("Expected Map value");
}
}
#[test]
fn test_group_builder_fluent_api() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let result = g
.v()
.has_label("person")
.group()
.by_key("age")
.by_value()
.build()
.next();
assert!(result.is_some());
if let Some(Value::Map(map)) = result {
for (_, value) in map {
if let Value::List(values) = value {
for val in values {
assert!(matches!(val, Value::Vertex(_)));
}
}
}
}
}
#[test]
fn test_group_empty_input() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let result = g
.v()
.has_label("nonexistent")
.group()
.by_label()
.by_value()
.build()
.next();
assert!(result.is_some());
if let Some(Value::Map(map)) = result {
assert!(map.is_empty());
}
}
#[test]
fn test_group_count_by_label() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let result = g.v().group_count().by_label().build().next();
assert!(result.is_some());
if let Some(Value::Map(map)) = result {
assert_eq!(map.len(), 2);
assert_eq!(map.get("person"), Some(&Value::Int(3))); assert_eq!(map.get("software"), Some(&Value::Int(1))); } else {
panic!("Expected Value::Map, got {:?}", result);
}
}
#[test]
fn test_group_count_by_property() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let result = g
.v()
.has_label("person")
.group_count()
.by_key("age")
.build()
.next();
assert!(result.is_some());
if let Some(Value::Map(map)) = result {
assert_eq!(map.len(), 2);
assert_eq!(map.get("29"), Some(&Value::Int(2))); assert_eq!(map.get("30"), Some(&Value::Int(1))); } else {
panic!("Expected Value::Map, got {:?}", result);
}
}
#[test]
fn test_group_count_default_selector() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let result = g.v().group_count().build().next();
assert!(result.is_some());
if let Some(Value::Map(map)) = result {
assert_eq!(map.len(), 2);
assert_eq!(map.get("person"), Some(&Value::Int(3)));
assert_eq!(map.get("software"), Some(&Value::Int(1)));
} else {
panic!("Expected Value::Map, got {:?}", result);
}
}
#[test]
fn test_group_count_empty_input() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let result = g
.v()
.has_label("nonexistent")
.group_count()
.by_label()
.build()
.next();
assert!(result.is_some());
if let Some(Value::Map(map)) = result {
assert!(map.is_empty());
} else {
panic!("Expected Value::Map, got {:?}", result);
}
}
#[test]
fn test_group_count_respects_bulk() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = GroupCountStep::new(GroupKey::Label);
let mut t1 = Traverser::from_vertex(crate::value::VertexId(0)); t1.bulk = 5;
let mut t2 = Traverser::from_vertex(crate::value::VertexId(1)); t2.bulk = 3;
let mut t3 = Traverser::from_vertex(crate::value::VertexId(3)); t3.bulk = 2;
let input = vec![t1, t2, t3];
let result: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(result.len(), 1);
if let Value::Map(map) = &result[0].value {
assert_eq!(map.get("person"), Some(&Value::Int(8)));
assert_eq!(map.get("software"), Some(&Value::Int(2)));
} else {
panic!("Expected Value::Map, got {:?}", result[0].value);
}
}
#[test]
fn test_group_by_traversal_key() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let key_traversal = crate::traversal::__.values("name");
let result = g
.v()
.has_label("person")
.group()
.by_traversal(key_traversal)
.by_value()
.build()
.next();
assert!(result.is_some());
if let Some(Value::Map(map)) = result {
assert!(!map.is_empty());
for (_key, value) in map {
if let Value::List(vertices) = value {
for v in vertices {
assert!(matches!(v, Value::Vertex(_)));
}
}
}
} else {
panic!("Expected Value::Map");
}
}
#[test]
fn test_group_by_value_traversal() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let value_traversal = crate::traversal::__.values("age");
let result = g
.v()
.has_label("person")
.group()
.by_label()
.by_value_traversal(value_traversal)
.build()
.next();
assert!(result.is_some());
if let Some(Value::Map(map)) = result {
if let Some(Value::List(ages)) = map.get("person") {
assert_eq!(ages.len(), 3);
for age in ages {
assert!(matches!(age, Value::Int(_)));
}
} else {
panic!("Expected person group with list of ages");
}
} else {
panic!("Expected Value::Map");
}
}
#[test]
fn test_group_edges_by_label() {
let graph = Graph::new();
let mut props1 = StdHashMap::new();
props1.insert("name".to_string(), Value::String("v1".to_string()));
let v1 = graph.add_vertex("person", props1);
let mut props2 = StdHashMap::new();
props2.insert("name".to_string(), Value::String("v2".to_string()));
let v2 = graph.add_vertex("person", props2);
let mut props3 = StdHashMap::new();
props3.insert("name".to_string(), Value::String("v3".to_string()));
let v3 = graph.add_vertex("software", props3);
graph.add_edge(v1, v2, "knows", StdHashMap::new()).unwrap();
graph
.add_edge(v1, v3, "created", StdHashMap::new())
.unwrap();
graph
.add_edge(v2, v3, "created", StdHashMap::new())
.unwrap();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let result = g.e().group().by_label().by_value().build().next();
assert!(result.is_some());
if let Some(Value::Map(map)) = result {
assert_eq!(map.len(), 2);
if let Some(Value::List(knows_edges)) = map.get("knows") {
assert_eq!(knows_edges.len(), 1);
assert!(matches!(knows_edges[0], Value::Edge(_)));
} else {
panic!("Expected knows edges");
}
if let Some(Value::List(created_edges)) = map.get("created") {
assert_eq!(created_edges.len(), 2);
for edge in created_edges {
assert!(matches!(edge, Value::Edge(_)));
}
} else {
panic!("Expected created edges");
}
} else {
panic!("Expected Value::Map");
}
}
#[test]
fn test_group_edges_by_property() {
let graph = Graph::new();
let v0 = graph.add_vertex("person", StdHashMap::new());
let v1 = graph.add_vertex("person", StdHashMap::new());
let mut edge1_props = StdHashMap::new();
edge1_props.insert("weight".to_string(), Value::Float(0.5));
graph.add_edge(v0, v1, "knows", edge1_props).unwrap();
let mut edge2_props = StdHashMap::new();
edge2_props.insert("weight".to_string(), Value::Float(0.8));
graph.add_edge(v1, v0, "knows", edge2_props).unwrap();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let result = g.e().group().by_key("weight").by_value().build().next();
assert!(result.is_some());
if let Some(Value::Map(map)) = result {
assert_eq!(map.len(), 2);
assert!(map.contains_key("0.5") || map.contains_key("0.8"));
} else {
panic!("Expected Value::Map");
}
}
#[test]
fn test_group_preserves_path() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let result = g
.v()
.has_label("person")
.with_path()
.group()
.by_label()
.by_value()
.build()
.next();
assert!(result.is_some());
if let Some(Value::Map(_map)) = result {
} else {
panic!("Expected Value::Map");
}
}
#[test]
fn test_group_with_bulk_traversers() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = GroupStep::with_selectors(GroupKey::Label, GroupValue::Identity);
let mut t1 = Traverser::from_vertex(crate::value::VertexId(0)); t1.bulk = 3;
let mut t2 = Traverser::from_vertex(crate::value::VertexId(1)); t2.bulk = 2;
let input = vec![t1, t2];
let result: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(result.len(), 1);
if let Value::Map(map) = &result[0].value {
if let Some(Value::List(persons)) = map.get("person") {
assert_eq!(persons.len(), 2);
} else {
panic!("Expected person group");
}
} else {
panic!("Expected Value::Map");
}
}
#[test]
fn test_group_count_by_traversal() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let key_traversal = crate::traversal::__.values("name");
let result = g
.v()
.has_label("person")
.group_count()
.by_traversal(key_traversal)
.build()
.next();
assert!(result.is_some());
if let Some(Value::Map(map)) = result {
assert_eq!(map.len(), 3);
assert_eq!(map.get("Alice"), Some(&Value::Int(1)));
assert_eq!(map.get("Bob"), Some(&Value::Int(1)));
assert_eq!(map.get("Charlie"), Some(&Value::Int(1)));
} else {
panic!("Expected Value::Map");
}
}
#[test]
fn test_group_count_edges_by_label() {
let graph = Graph::new();
let v0 = graph.add_vertex("person", StdHashMap::new());
let v1 = graph.add_vertex("person", StdHashMap::new());
let v2 = graph.add_vertex("software", StdHashMap::new());
graph.add_edge(v0, v1, "knows", StdHashMap::new()).unwrap();
graph
.add_edge(v0, v2, "created", StdHashMap::new())
.unwrap();
graph
.add_edge(v1, v2, "created", StdHashMap::new())
.unwrap();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let result = g.e().group_count().by_label().build().next();
assert!(result.is_some());
if let Some(Value::Map(map)) = result {
assert_eq!(map.len(), 2);
assert_eq!(map.get("knows"), Some(&Value::Int(1)));
assert_eq!(map.get("created"), Some(&Value::Int(2)));
} else {
panic!("Expected Value::Map");
}
}
#[test]
fn test_group_count_edges_by_property() {
let graph = Graph::new();
let v0 = graph.add_vertex("person", StdHashMap::new());
let v1 = graph.add_vertex("person", StdHashMap::new());
let mut edge1_props = StdHashMap::new();
edge1_props.insert("weight".to_string(), Value::Float(0.5));
graph.add_edge(v0, v1, "knows", edge1_props).unwrap();
let mut edge2_props = StdHashMap::new();
edge2_props.insert("weight".to_string(), Value::Float(0.5));
graph.add_edge(v1, v0, "knows", edge2_props).unwrap();
let mut edge3_props = StdHashMap::new();
edge3_props.insert("weight".to_string(), Value::Float(0.8));
graph.add_edge(v0, v1, "likes", edge3_props).unwrap();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let result = g.e().group_count().by_key("weight").build().next();
assert!(result.is_some());
if let Some(Value::Map(map)) = result {
assert_eq!(map.len(), 2);
assert_eq!(map.get("0.5"), Some(&Value::Int(2)));
assert_eq!(map.get("0.8"), Some(&Value::Int(1)));
} else {
panic!("Expected Value::Map");
}
}
#[test]
fn test_group_count_preserves_path() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let result = g
.v()
.has_label("person")
.with_path()
.group_count()
.by_label()
.build()
.next();
assert!(result.is_some());
if let Some(Value::Map(map)) = result {
assert_eq!(map.get("person"), Some(&Value::Int(3)));
} else {
panic!("Expected Value::Map");
}
}
#[test]
fn test_group_count_multiple_bulk_values() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = GroupCountStep::new(GroupKey::Property("age".to_string()));
let mut t1 = Traverser::from_vertex(crate::value::VertexId(0)); t1.bulk = 10;
let mut t2 = Traverser::from_vertex(crate::value::VertexId(1)); t2.bulk = 5;
let mut t3 = Traverser::from_vertex(crate::value::VertexId(2)); t3.bulk = 3;
let input = vec![t1, t2, t3];
let result: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(result.len(), 1);
if let Value::Map(map) = &result[0].value {
assert_eq!(map.get("29"), Some(&Value::Int(15)));
assert_eq!(map.get("30"), Some(&Value::Int(3)));
} else {
panic!("Expected Value::Map");
}
}
#[test]
fn test_group_count_with_missing_property() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let result = g.v().group_count().by_key("nonexistent").build().next();
assert!(result.is_some());
if let Some(Value::Map(map)) = result {
assert!(map.is_empty());
} else {
panic!("Expected Value::Map");
}
}
#[test]
fn test_group_with_missing_property() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let result = g
.v()
.group()
.by_key("nonexistent")
.by_value()
.build()
.next();
assert!(result.is_some());
if let Some(Value::Map(map)) = result {
assert!(map.is_empty());
} else {
panic!("Expected Value::Map");
}
}
#[test]
fn test_group_step_construction() {
let step = GroupStep::new();
assert_eq!(step.name(), "group");
let step2 = GroupStep::with_selectors(
GroupKey::Property("age".to_string()),
GroupValue::Property("name".to_string()),
);
assert_eq!(step2.name(), "group");
}
#[test]
fn test_group_count_step_construction() {
let step = GroupCountStep::new(GroupKey::Label);
assert_eq!(step.name(), "groupCount");
let step2 = GroupCountStep::new(GroupKey::Property("age".to_string()));
assert_eq!(step2.name(), "groupCount");
}
#[test]
fn test_count_step_construction() {
let step = CountStep::new();
assert_eq!(step.name(), "count");
let step_default = CountStep::default();
assert_eq!(step_default.name(), "count");
}
#[test]
fn test_count_step_empty_input() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = CountStep::new();
let input: Vec<Traverser> = vec![];
let result: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(result.len(), 1);
assert_eq!(result[0].value, Value::Int(0));
}
#[test]
fn test_count_step_single_traverser() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = CountStep::new();
let input = vec![Traverser::from_vertex(crate::value::VertexId(0))];
let result: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(result.len(), 1);
assert_eq!(result[0].value, Value::Int(1));
}
#[test]
fn test_count_step_multiple_traversers() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = CountStep::new();
let input = vec![
Traverser::from_vertex(crate::value::VertexId(0)),
Traverser::from_vertex(crate::value::VertexId(1)),
Traverser::from_vertex(crate::value::VertexId(2)),
Traverser::from_vertex(crate::value::VertexId(3)),
];
let result: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(result.len(), 1);
assert_eq!(result[0].value, Value::Int(4));
}
#[test]
fn test_count_step_respects_bulk() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = CountStep::new();
let mut t1 = Traverser::from_vertex(crate::value::VertexId(0));
t1.bulk = 5;
let mut t2 = Traverser::from_vertex(crate::value::VertexId(1));
t2.bulk = 3;
let mut t3 = Traverser::from_vertex(crate::value::VertexId(2));
t3.bulk = 2;
let input = vec![t1, t2, t3];
let result: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(result.len(), 1);
assert_eq!(result[0].value, Value::Int(10));
}
#[test]
fn test_count_step_clone_box() {
let step = CountStep::new();
let cloned = step.clone_box();
assert_eq!(cloned.dyn_name(), "count");
}
#[test]
fn test_count_step_is_copy() {
let step = CountStep;
let copied = step; assert_eq!(step.name(), "count");
assert_eq!(copied.name(), "count");
}
#[test]
fn test_count_via_bound_traversal() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let count = g.v().count();
assert_eq!(count, 4);
}
#[test]
fn test_count_with_filter() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let count = g.v().has_label("person").count();
assert_eq!(count, 3);
let count = g.v().has_label("software").count();
assert_eq!(count, 1);
}
#[test]
fn test_count_empty_result() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let count = g.v().has_label("nonexistent").count();
assert_eq!(count, 0);
}
#[test]
fn test_count_edges() {
let graph = Graph::new();
let v0 = graph.add_vertex("person", StdHashMap::new());
let v1 = graph.add_vertex("person", StdHashMap::new());
let v2 = graph.add_vertex("software", StdHashMap::new());
graph.add_edge(v0, v1, "knows", StdHashMap::new()).unwrap();
graph
.add_edge(v0, v2, "created", StdHashMap::new())
.unwrap();
graph
.add_edge(v1, v2, "created", StdHashMap::new())
.unwrap();
let snapshot = graph.snapshot();
let g = snapshot.gremlin();
let count = g.e().count();
assert_eq!(count, 3);
let count = g.e().has_label("knows").count();
assert_eq!(count, 1);
let count = g.e().has_label("created").count();
assert_eq!(count, 2);
}
}