use crate::impl_flatmap_step;
use crate::traversal::{ExecutionContext, Traverser};
use crate::value::Value;
#[derive(Clone, Copy, Debug, Default)]
pub struct FoldStep;
impl FoldStep {
pub fn new() -> Self {
Self
}
}
impl crate::traversal::step::Step for FoldStep {
type Iter<'a>
= std::iter::Once<Traverser>
where
Self: 'a;
fn apply<'a>(
&'a self,
_ctx: &'a ExecutionContext<'a>,
input: Box<dyn Iterator<Item = Traverser> + 'a>,
) -> Self::Iter<'a> {
let mut values = Vec::new();
let mut last_path = None;
for t in input {
last_path = Some(t.path.clone());
for _ in 0..t.bulk {
values.push(t.value.clone());
}
}
let result = Traverser {
value: Value::List(values),
path: last_path.unwrap_or_default(),
loops: 0,
sack: None,
bulk: 1,
};
std::iter::once(result)
}
fn name(&self) -> &'static str {
"fold"
}
fn is_barrier(&self) -> bool {
true
}
fn category(&self) -> crate::traversal::explain::StepCategory {
crate::traversal::explain::StepCategory::Transform
}
fn apply_streaming(
&self,
_ctx: crate::traversal::context::StreamingContext,
input: Traverser,
) -> Box<dyn Iterator<Item = Traverser> + Send + 'static> {
Box::new(std::iter::once(input))
}
}
#[derive(Clone, Copy, Debug, Default)]
pub struct SumStep;
impl SumStep {
pub fn new() -> Self {
Self
}
}
impl crate::traversal::step::Step for SumStep {
type Iter<'a>
= std::iter::Once<Traverser>
where
Self: 'a;
fn apply<'a>(
&'a self,
_ctx: &'a ExecutionContext<'a>,
input: Box<dyn Iterator<Item = Traverser> + 'a>,
) -> Self::Iter<'a> {
let mut int_sum: i64 = 0;
let mut float_sum: f64 = 0.0;
let mut has_float = false;
let mut last_path = None;
for t in input {
last_path = Some(t.path.clone());
let multiplier = t.bulk as i64;
match &t.value {
Value::Int(n) => {
int_sum += n * multiplier;
}
Value::Float(f) => {
has_float = true;
float_sum += f * (t.bulk as f64);
}
_ => {} }
}
let result_value = if has_float {
Value::Float(float_sum + (int_sum as f64))
} else {
Value::Int(int_sum)
};
let result = Traverser {
value: result_value,
path: last_path.unwrap_or_default(),
loops: 0,
sack: None,
bulk: 1,
};
std::iter::once(result)
}
fn name(&self) -> &'static str {
"sum"
}
fn is_barrier(&self) -> bool {
true
}
fn category(&self) -> crate::traversal::explain::StepCategory {
crate::traversal::explain::StepCategory::Transform
}
fn apply_streaming(
&self,
_ctx: crate::traversal::context::StreamingContext,
input: Traverser,
) -> Box<dyn Iterator<Item = Traverser> + Send + 'static> {
Box::new(std::iter::once(input))
}
}
#[derive(Clone, Copy, Debug, Default)]
pub struct CountLocalStep;
impl CountLocalStep {
pub fn new() -> Self {
Self
}
}
impl crate::traversal::step::Step for CountLocalStep {
type Iter<'a>
= impl Iterator<Item = Traverser> + 'a
where
Self: 'a;
fn apply<'a>(
&'a self,
_ctx: &'a ExecutionContext<'a>,
input: Box<dyn Iterator<Item = Traverser> + 'a>,
) -> Self::Iter<'a> {
input.map(|t| {
let count = match &t.value {
Value::List(items) => items.len() as i64,
Value::Map(map) => map.len() as i64,
Value::String(s) => s.len() as i64,
_ => 1, };
t.with_value(Value::Int(count))
})
}
fn name(&self) -> &'static str {
"count(local)"
}
fn category(&self) -> crate::traversal::explain::StepCategory {
crate::traversal::explain::StepCategory::Transform
}
fn apply_streaming(
&self,
_ctx: crate::traversal::context::StreamingContext,
input: Traverser,
) -> Box<dyn Iterator<Item = Traverser> + Send + 'static> {
let count = match &input.value {
Value::List(items) => items.len() as i64,
Value::Map(map) => map.len() as i64,
Value::String(s) => s.len() as i64,
_ => 1,
};
Box::new(std::iter::once(input.with_value(Value::Int(count))))
}
}
#[derive(Clone, Copy, Debug, Default)]
pub struct SumLocalStep;
impl SumLocalStep {
pub fn new() -> Self {
Self
}
}
impl crate::traversal::step::Step for SumLocalStep {
type Iter<'a>
= impl Iterator<Item = Traverser> + 'a
where
Self: 'a;
fn apply<'a>(
&'a self,
_ctx: &'a ExecutionContext<'a>,
input: Box<dyn Iterator<Item = Traverser> + 'a>,
) -> Self::Iter<'a> {
input.map(|t| {
let sum = match &t.value {
Value::List(items) => {
let mut int_sum: i64 = 0;
let mut float_sum: f64 = 0.0;
let mut has_float = false;
for item in items {
match item {
Value::Int(n) => int_sum += n,
Value::Float(f) => {
has_float = true;
float_sum += f;
}
_ => {}
}
}
if has_float {
Value::Float(float_sum + (int_sum as f64))
} else {
Value::Int(int_sum)
}
}
Value::Int(n) => Value::Int(*n),
Value::Float(f) => Value::Float(*f),
_ => Value::Int(0),
};
t.with_value(sum)
})
}
fn name(&self) -> &'static str {
"sum(local)"
}
fn category(&self) -> crate::traversal::explain::StepCategory {
crate::traversal::explain::StepCategory::Transform
}
fn apply_streaming(
&self,
_ctx: crate::traversal::context::StreamingContext,
input: Traverser,
) -> Box<dyn Iterator<Item = Traverser> + Send + 'static> {
let sum = match &input.value {
Value::List(items) => {
let mut int_sum: i64 = 0;
let mut float_sum: f64 = 0.0;
let mut has_float = false;
for item in items {
match item {
Value::Int(n) => int_sum += n,
Value::Float(f) => {
has_float = true;
float_sum += f;
}
_ => {}
}
}
if has_float {
Value::Float(float_sum + (int_sum as f64))
} else {
Value::Int(int_sum)
}
}
Value::Int(n) => Value::Int(*n),
Value::Float(f) => Value::Float(*f),
_ => Value::Int(0),
};
Box::new(std::iter::once(input.with_value(sum)))
}
}
#[derive(Clone, Copy, Debug, Default)]
pub struct UnfoldStep;
impl UnfoldStep {
pub fn new() -> Self {
Self
}
fn expand<'a>(
&self,
_ctx: &'a ExecutionContext<'a>,
traverser: Traverser,
) -> impl Iterator<Item = Traverser> + 'a {
let values = match &traverser.value {
Value::List(items) => {
items.clone()
}
Value::Map(map) => {
map.iter()
.map(|(k, v)| {
let mut entry = std::collections::HashMap::new();
entry.insert(k.clone(), v.clone());
Value::Map(entry.into_iter().collect())
})
.collect()
}
other => vec![other.clone()],
};
values
.into_iter()
.map(move |value| traverser.split(value))
.collect::<Vec<_>>()
.into_iter()
}
fn expand_streaming(
&self,
_ctx: &crate::traversal::context::StreamingContext,
traverser: Traverser,
) -> Box<dyn Iterator<Item = Traverser> + Send + 'static> {
let values: Vec<crate::value::Value> = match &traverser.value {
crate::value::Value::List(items) => items.clone(),
crate::value::Value::Map(map) => map
.iter()
.map(|(k, v)| {
let mut entry = std::collections::HashMap::new();
entry.insert(k.clone(), v.clone());
crate::value::Value::Map(entry.into_iter().collect())
})
.collect(),
other => vec![other.clone()],
};
Box::new(
values
.into_iter()
.map(move |value| traverser.split(value))
.collect::<Vec<_>>()
.into_iter(),
)
}
}
impl_flatmap_step!(UnfoldStep, "unfold", category = crate::traversal::explain::StepCategory::Transform);
#[derive(Clone, Copy, Debug, Default)]
pub struct MeanStep;
impl MeanStep {
pub fn new() -> Self {
Self
}
}
impl crate::traversal::step::Step for MeanStep {
type Iter<'a>
= impl Iterator<Item = Traverser> + 'a
where
Self: 'a;
fn apply<'a>(
&'a self,
_ctx: &'a ExecutionContext<'a>,
input: Box<dyn Iterator<Item = Traverser> + 'a>,
) -> Self::Iter<'a> {
let mut sum = 0.0_f64;
let mut count = 0_u64;
let mut last_path = None;
for t in input {
last_path = Some(t.path.clone());
match &t.value {
Value::Int(n) => {
sum += *n as f64;
count += 1;
}
Value::Float(f) => {
sum += *f;
count += 1;
}
_ => {} }
}
let result = if count == 0 {
None
} else {
let mean = sum / count as f64;
Some(Traverser {
value: Value::Float(mean),
path: last_path.unwrap_or_default(),
loops: 0,
sack: None,
bulk: 1,
})
};
result.into_iter()
}
fn name(&self) -> &'static str {
"mean"
}
fn category(&self) -> crate::traversal::explain::StepCategory {
crate::traversal::explain::StepCategory::Transform
}
fn apply_streaming(
&self,
_ctx: crate::traversal::context::StreamingContext,
input: Traverser,
) -> Box<dyn Iterator<Item = Traverser> + Send + 'static> {
Box::new(std::iter::once(input))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::Graph;
use crate::traversal::step::Step;
use crate::traversal::SnapshotLike;
use crate::value::{EdgeId, VertexId};
fn create_test_graph() -> Graph {
Graph::new()
}
mod unfold_step_list {
use super::*;
#[test]
fn unfolds_list_into_elements() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = UnfoldStep::new();
let list = Value::List(vec![Value::Int(1), Value::Int(2), Value::Int(3)]);
let input = vec![Traverser::new(list)];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 3);
assert_eq!(output[0].value, Value::Int(1));
assert_eq!(output[1].value, Value::Int(2));
assert_eq!(output[2].value, Value::Int(3));
}
#[test]
fn unfolds_empty_list_into_nothing() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = UnfoldStep::new();
let list = Value::List(vec![]);
let input = vec![Traverser::new(list)];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert!(output.is_empty());
}
#[test]
fn unfolds_nested_lists_one_level() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = UnfoldStep::new();
let nested = Value::List(vec![
Value::List(vec![Value::Int(1)]),
Value::List(vec![Value::Int(2)]),
]);
let input = vec![Traverser::new(nested)];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 2);
assert_eq!(output[0].value, Value::List(vec![Value::Int(1)]));
assert_eq!(output[1].value, Value::List(vec![Value::Int(2)]));
}
}
mod unfold_step_map {
use super::*;
#[test]
fn unfolds_map_into_entries() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = UnfoldStep::new();
let mut map = crate::value::ValueMap::new();
map.insert("a".to_string(), Value::Int(1));
map.insert("b".to_string(), Value::Int(2));
let input = vec![Traverser::new(Value::Map(map))];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 2);
let values: Vec<Value> = output.into_iter().map(|t| t.value).collect();
for val in values {
if let Value::Map(m) = val {
assert_eq!(m.len(), 1);
if m.contains_key("a") {
assert_eq!(m.get("a"), Some(&Value::Int(1)));
} else if m.contains_key("b") {
assert_eq!(m.get("b"), Some(&Value::Int(2)));
} else {
panic!("Unexpected key in map");
}
} else {
panic!("Expected Value::Map");
}
}
}
#[test]
fn unfolds_empty_map_into_nothing() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = UnfoldStep::new();
let map = crate::value::ValueMap::new();
let input = vec![Traverser::new(Value::Map(map))];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert!(output.is_empty());
}
}
mod unfold_step_non_collection {
use super::*;
#[test]
fn passes_single_value_through() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = UnfoldStep::new();
let input = vec![Traverser::new(Value::Int(42))];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
assert_eq!(output[0].value, Value::Int(42));
}
#[test]
fn passes_vertex_through() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = UnfoldStep::new();
let input = vec![Traverser::from_vertex(VertexId(1))];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
assert_eq!(output[0].value, Value::Vertex(VertexId(1)));
}
}
mod unfold_step_metadata {
use super::*;
#[test]
fn preserves_path_for_each_unfolded_item() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = UnfoldStep::new();
let mut traverser = Traverser::new(Value::List(vec![Value::Int(1), Value::Int(2)]));
traverser.extend_path_labeled("start");
let input = vec![traverser];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 2);
assert!(output[0].path.has_label("start"));
assert!(output[1].path.has_label("start"));
}
#[test]
fn preserves_other_metadata() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = UnfoldStep::new();
let mut traverser = Traverser::new(Value::List(vec![Value::Int(1)]));
traverser.loops = 5;
traverser.bulk = 10;
let input = vec![traverser];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
assert_eq!(output[0].loops, 5);
assert_eq!(output[0].bulk, 10);
}
}
mod unfold_step_integration {
use super::*;
#[test]
fn handles_multiple_input_traversers() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = UnfoldStep::new();
let input = vec![
Traverser::new(Value::List(vec![Value::Int(1), Value::Int(2)])),
Traverser::new(Value::Int(3)), Traverser::new(Value::List(vec![])), ];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 3);
assert_eq!(output[0].value, Value::Int(1));
assert_eq!(output[1].value, Value::Int(2));
assert_eq!(output[2].value, Value::Int(3));
}
}
mod mean_step_construction {
use super::*;
#[test]
fn test_new() {
let step = MeanStep::new();
assert_eq!(step.name(), "mean");
}
}
mod mean_step_numeric_tests {
use super::*;
#[test]
fn calculates_mean_of_integers() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = MeanStep::new();
let input = vec![
Traverser::new(Value::Int(10)),
Traverser::new(Value::Int(20)),
Traverser::new(Value::Int(30)),
];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
assert_eq!(output[0].value, Value::Float(20.0));
}
#[test]
fn calculates_mean_of_floats() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = MeanStep::new();
let input = vec![
Traverser::new(Value::Float(1.5)),
Traverser::new(Value::Float(2.5)),
Traverser::new(Value::Float(3.5)),
];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
assert_eq!(output[0].value, Value::Float(2.5));
}
#[test]
fn calculates_mean_of_mixed_numbers() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = MeanStep::new();
let input = vec![
Traverser::new(Value::Int(10)),
Traverser::new(Value::Float(20.5)),
];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
assert_eq!(output[0].value, Value::Float(15.25));
}
#[test]
fn handles_negative_numbers() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = MeanStep::new();
let input = vec![
Traverser::new(Value::Int(-10)),
Traverser::new(Value::Int(10)),
];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
assert_eq!(output[0].value, Value::Float(0.0));
}
}
mod mean_step_non_numeric_tests {
use super::*;
#[test]
fn ignores_non_numeric_values() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = MeanStep::new();
let input = vec![
Traverser::new(Value::Int(10)),
Traverser::new(Value::String("skip me".to_string())),
Traverser::new(Value::Bool(true)),
Traverser::new(Value::Int(20)),
];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
assert_eq!(output[0].value, Value::Float(15.0)); }
#[test]
fn ignores_vertices_and_edges() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = MeanStep::new();
let input = vec![
Traverser::new(Value::Int(10)),
Traverser::from_vertex(VertexId(1)),
Traverser::from_edge(EdgeId(1)),
Traverser::new(Value::Int(30)),
];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
assert_eq!(output[0].value, Value::Float(20.0));
}
}
mod mean_step_empty_tests {
use super::*;
#[test]
fn empty_input_returns_empty_output() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = MeanStep::new();
let input: Vec<Traverser> = vec![];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert!(output.is_empty());
}
#[test]
fn all_non_numeric_returns_empty_output() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = MeanStep::new();
let input = vec![
Traverser::new(Value::String("a".to_string())),
Traverser::new(Value::Bool(false)),
];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert!(output.is_empty());
}
}
mod mean_step_path_tests {
use super::*;
#[test]
fn preserves_path_from_last_element() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = MeanStep::new();
let mut t1 = Traverser::new(Value::Int(10));
t1.extend_path_labeled("p1");
let mut t2 = Traverser::new(Value::Int(20));
t2.extend_path_labeled("p2");
let input = vec![t1, t2];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
assert!(output[0].path.has_label("p2")); }
}
mod mean_step_traverser_fields {
use super::*;
#[test]
fn resets_loops_and_bulk() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = MeanStep::new();
let mut t1 = Traverser::new(Value::Int(10));
t1.loops = 5;
t1.bulk = 10;
let input = vec![t1];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
assert_eq!(output[0].loops, 0); assert_eq!(output[0].bulk, 1); }
}
mod mean_step_integration {
use super::*;
#[test]
fn calculates_mean_of_large_set() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = MeanStep::new();
let input: Vec<Traverser> = (1..=100).map(|i| Traverser::new(Value::Int(i))).collect();
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
assert_eq!(output[0].value, Value::Float(50.5));
}
}
mod fold_step_construction {
use super::*;
#[test]
fn test_new() {
let step = FoldStep::new();
assert_eq!(step.name(), "fold");
}
#[test]
fn is_barrier_returns_true() {
let step = FoldStep::new();
assert!(step.is_barrier());
}
#[test]
fn is_clonable() {
let step = FoldStep::new();
let _cloned = step.clone();
}
}
mod fold_step_basic {
use super::*;
#[test]
fn folds_integers_into_list() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = FoldStep::new();
let input = vec![
Traverser::new(Value::Int(1)),
Traverser::new(Value::Int(2)),
Traverser::new(Value::Int(3)),
];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
if let Value::List(list) = &output[0].value {
assert_eq!(list.len(), 3);
assert_eq!(list[0], Value::Int(1));
assert_eq!(list[1], Value::Int(2));
assert_eq!(list[2], Value::Int(3));
} else {
panic!("Expected Value::List");
}
}
#[test]
fn folds_mixed_types_into_list() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = FoldStep::new();
let input = vec![
Traverser::new(Value::Int(42)),
Traverser::new(Value::String("hello".to_string())),
Traverser::new(Value::Bool(true)),
];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
if let Value::List(list) = &output[0].value {
assert_eq!(list.len(), 3);
assert_eq!(list[0], Value::Int(42));
assert_eq!(list[1], Value::String("hello".to_string()));
assert_eq!(list[2], Value::Bool(true));
} else {
panic!("Expected Value::List");
}
}
#[test]
fn folds_empty_input_to_empty_list() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = FoldStep::new();
let input: Vec<Traverser> = vec![];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
if let Value::List(list) = &output[0].value {
assert!(list.is_empty());
} else {
panic!("Expected Value::List");
}
}
#[test]
fn folds_single_value_to_single_element_list() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = FoldStep::new();
let input = vec![Traverser::new(Value::Int(42))];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
if let Value::List(list) = &output[0].value {
assert_eq!(list.len(), 1);
assert_eq!(list[0], Value::Int(42));
} else {
panic!("Expected Value::List");
}
}
}
mod fold_step_vertices_edges {
use super::*;
#[test]
fn folds_vertices_into_list() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = FoldStep::new();
let input = vec![
Traverser::from_vertex(VertexId(1)),
Traverser::from_vertex(VertexId(2)),
];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
if let Value::List(list) = &output[0].value {
assert_eq!(list.len(), 2);
assert_eq!(list[0], Value::Vertex(VertexId(1)));
assert_eq!(list[1], Value::Vertex(VertexId(2)));
} else {
panic!("Expected Value::List");
}
}
#[test]
fn folds_edges_into_list() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = FoldStep::new();
let input = vec![
Traverser::from_edge(EdgeId(1)),
Traverser::from_edge(EdgeId(2)),
];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
if let Value::List(list) = &output[0].value {
assert_eq!(list.len(), 2);
assert_eq!(list[0], Value::Edge(EdgeId(1)));
assert_eq!(list[1], Value::Edge(EdgeId(2)));
} else {
panic!("Expected Value::List");
}
}
}
mod sum_step_construction {
use super::*;
#[test]
fn test_new() {
let step = SumStep::new();
assert_eq!(step.name(), "sum");
}
#[test]
fn is_barrier_returns_true() {
let step = SumStep::new();
assert!(step.is_barrier());
}
#[test]
fn is_clonable() {
let step = SumStep::new();
let _cloned = step.clone();
}
}
mod sum_step_integers {
use super::*;
#[test]
fn sums_integers() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = SumStep::new();
let input = vec![
Traverser::new(Value::Int(10)),
Traverser::new(Value::Int(20)),
Traverser::new(Value::Int(30)),
];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
assert_eq!(output[0].value, Value::Int(60));
}
#[test]
fn sums_negative_integers() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = SumStep::new();
let input = vec![
Traverser::new(Value::Int(-10)),
Traverser::new(Value::Int(20)),
Traverser::new(Value::Int(-5)),
];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
assert_eq!(output[0].value, Value::Int(5));
}
}
mod sum_step_floats {
use super::*;
#[test]
fn sums_floats() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = SumStep::new();
let input = vec![
Traverser::new(Value::Float(1.5)),
Traverser::new(Value::Float(2.5)),
Traverser::new(Value::Float(3.0)),
];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
assert_eq!(output[0].value, Value::Float(7.0));
}
}
mod sum_step_mixed {
use super::*;
#[test]
fn sums_mixed_int_and_float_returns_float() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = SumStep::new();
let input = vec![
Traverser::new(Value::Int(10)),
Traverser::new(Value::Float(5.5)),
];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
assert_eq!(output[0].value, Value::Float(15.5));
}
#[test]
fn ignores_non_numeric_values() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = SumStep::new();
let input = vec![
Traverser::new(Value::Int(10)),
Traverser::new(Value::String("skip".to_string())),
Traverser::new(Value::Bool(true)),
Traverser::new(Value::Int(20)),
];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
assert_eq!(output[0].value, Value::Int(30));
}
}
mod sum_step_empty {
use super::*;
#[test]
fn empty_input_returns_zero() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = SumStep::new();
let input: Vec<Traverser> = vec![];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
assert_eq!(output[0].value, Value::Int(0));
}
#[test]
fn all_non_numeric_returns_zero() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = SumStep::new();
let input = vec![
Traverser::new(Value::String("a".to_string())),
Traverser::new(Value::Bool(false)),
];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
assert_eq!(output[0].value, Value::Int(0));
}
}
mod count_local_step_construction {
use super::*;
#[test]
fn test_new() {
let step = CountLocalStep::new();
assert_eq!(step.name(), "count(local)");
}
#[test]
fn is_not_barrier() {
let step = CountLocalStep::new();
assert!(!step.is_barrier());
}
#[test]
fn is_clonable() {
let step = CountLocalStep::new();
let _cloned = step.clone();
}
}
mod count_local_step_list {
use super::*;
#[test]
fn counts_list_elements() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = CountLocalStep::new();
let list = Value::List(vec![Value::Int(1), Value::Int(2), Value::Int(3)]);
let input = vec![Traverser::new(list)];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
assert_eq!(output[0].value, Value::Int(3));
}
#[test]
fn counts_empty_list_as_zero() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = CountLocalStep::new();
let input = vec![Traverser::new(Value::List(vec![]))];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
assert_eq!(output[0].value, Value::Int(0));
}
}
mod count_local_step_map {
use super::*;
#[test]
fn counts_map_entries() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = CountLocalStep::new();
let mut map = crate::value::ValueMap::new();
map.insert("a".to_string(), Value::Int(1));
map.insert("b".to_string(), Value::Int(2));
let input = vec![Traverser::new(Value::Map(map))];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
assert_eq!(output[0].value, Value::Int(2));
}
#[test]
fn counts_empty_map_as_zero() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = CountLocalStep::new();
let input = vec![Traverser::new(Value::Map(crate::value::ValueMap::new()))];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
assert_eq!(output[0].value, Value::Int(0));
}
}
mod count_local_step_string {
use super::*;
#[test]
fn counts_string_length() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = CountLocalStep::new();
let input = vec![Traverser::new(Value::String("hello".to_string()))];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
assert_eq!(output[0].value, Value::Int(5));
}
#[test]
fn counts_empty_string_as_zero() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = CountLocalStep::new();
let input = vec![Traverser::new(Value::String("".to_string()))];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
assert_eq!(output[0].value, Value::Int(0));
}
}
mod count_local_step_other {
use super::*;
#[test]
fn counts_non_collection_as_one() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = CountLocalStep::new();
let input = vec![
Traverser::new(Value::Int(42)),
Traverser::new(Value::Bool(true)),
Traverser::new(Value::Float(3.14)),
];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 3);
assert_eq!(output[0].value, Value::Int(1));
assert_eq!(output[1].value, Value::Int(1));
assert_eq!(output[2].value, Value::Int(1));
}
#[test]
fn counts_vertex_as_one() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = CountLocalStep::new();
let input = vec![Traverser::from_vertex(VertexId(1))];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
assert_eq!(output[0].value, Value::Int(1));
}
}
mod sum_local_step_construction {
use super::*;
#[test]
fn test_new() {
let step = SumLocalStep::new();
assert_eq!(step.name(), "sum(local)");
}
#[test]
fn is_not_barrier() {
let step = SumLocalStep::new();
assert!(!step.is_barrier());
}
#[test]
fn is_clonable() {
let step = SumLocalStep::new();
let _cloned = step.clone();
}
}
mod sum_local_step_list {
use super::*;
#[test]
fn sums_integer_list() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = SumLocalStep::new();
let list = Value::List(vec![Value::Int(10), Value::Int(20), Value::Int(30)]);
let input = vec![Traverser::new(list)];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
assert_eq!(output[0].value, Value::Int(60));
}
#[test]
fn sums_float_list() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = SumLocalStep::new();
let list = Value::List(vec![Value::Float(1.5), Value::Float(2.5)]);
let input = vec![Traverser::new(list)];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
assert_eq!(output[0].value, Value::Float(4.0));
}
#[test]
fn sums_mixed_list_returns_float() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = SumLocalStep::new();
let list = Value::List(vec![Value::Int(10), Value::Float(5.5)]);
let input = vec![Traverser::new(list)];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
assert_eq!(output[0].value, Value::Float(15.5));
}
#[test]
fn sums_empty_list_returns_zero() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = SumLocalStep::new();
let input = vec![Traverser::new(Value::List(vec![]))];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
assert_eq!(output[0].value, Value::Int(0));
}
#[test]
fn ignores_non_numeric_in_list() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = SumLocalStep::new();
let list = Value::List(vec![
Value::Int(10),
Value::String("skip".to_string()),
Value::Int(20),
]);
let input = vec![Traverser::new(list)];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
assert_eq!(output[0].value, Value::Int(30));
}
}
mod sum_local_step_scalar {
use super::*;
#[test]
fn returns_int_unchanged() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = SumLocalStep::new();
let input = vec![Traverser::new(Value::Int(42))];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
assert_eq!(output[0].value, Value::Int(42));
}
#[test]
fn returns_float_unchanged() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = SumLocalStep::new();
let input = vec![Traverser::new(Value::Float(3.14))];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
assert_eq!(output[0].value, Value::Float(3.14));
}
}
mod sum_local_step_other {
use super::*;
#[test]
fn non_numeric_non_list_returns_zero() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = SumLocalStep::new();
let input = vec![
Traverser::new(Value::String("hello".to_string())),
Traverser::new(Value::Bool(true)),
];
let output: Vec<Traverser> = step.apply(&ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 2);
assert_eq!(output[0].value, Value::Int(0));
assert_eq!(output[1].value, Value::Int(0));
}
}
}