use std::cell::Cell;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use crate::time::Instant;
use parking_lot::RwLock;
use crate::traversal::context::ExecutionContext;
use crate::traversal::step::{execute_traversal_from, Step};
use crate::traversal::{Traversal, Traverser};
use crate::value::{IntoValueMap, Value};
#[derive(Clone, Debug)]
pub struct StoreStep {
key: String,
}
impl StoreStep {
pub fn new(key: impl Into<String>) -> Self {
Self { key: key.into() }
}
#[inline]
pub fn key(&self) -> &str {
&self.key
}
}
impl Step for StoreStep {
type Iter<'a>
= impl Iterator<Item = Traverser> + 'a
where
Self: 'a;
fn apply<'a>(
&'a self,
ctx: &'a ExecutionContext<'a>,
input: Box<dyn Iterator<Item = Traverser> + 'a>,
) -> Self::Iter<'a> {
let key = self.key.clone();
input.inspect(move |t| {
ctx.side_effects.store(&key, t.value.clone());
})
}
fn name(&self) -> &'static str {
"store"
}
fn category(&self) -> crate::traversal::explain::StepCategory {
crate::traversal::explain::StepCategory::SideEffect
}
fn apply_streaming(
&self,
ctx: crate::traversal::context::StreamingContext,
input: Traverser,
) -> Box<dyn Iterator<Item = Traverser> + Send + 'static> {
ctx.side_effects().store(&self.key, input.value.clone());
Box::new(std::iter::once(input))
}
}
#[derive(Clone, Debug)]
pub struct AggregateStep {
key: String,
}
impl AggregateStep {
pub fn new(key: impl Into<String>) -> Self {
Self { key: key.into() }
}
#[inline]
pub fn key(&self) -> &str {
&self.key
}
}
impl Step for AggregateStep {
type Iter<'a>
= std::vec::IntoIter<Traverser>
where
Self: 'a;
fn apply<'a>(
&'a self,
ctx: &'a ExecutionContext<'a>,
input: Box<dyn Iterator<Item = Traverser> + 'a>,
) -> Self::Iter<'a> {
let traversers: Vec<Traverser> = input.collect();
for t in &traversers {
ctx.side_effects.store(&self.key, t.value.clone());
}
traversers.into_iter()
}
fn name(&self) -> &'static str {
"aggregate"
}
fn is_barrier(&self) -> bool {
true
}
fn category(&self) -> crate::traversal::explain::StepCategory {
crate::traversal::explain::StepCategory::SideEffect
}
fn apply_streaming(
&self,
ctx: crate::traversal::context::StreamingContext,
input: Traverser,
) -> Box<dyn Iterator<Item = Traverser> + Send + 'static> {
ctx.side_effects().store(&self.key, input.value.clone());
Box::new(std::iter::once(input))
}
}
#[derive(Clone, Debug)]
pub struct CapStep {
keys: Vec<String>,
}
impl CapStep {
pub fn new(key: impl Into<String>) -> Self {
Self {
keys: vec![key.into()],
}
}
pub fn multi<I, S>(keys: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
Self {
keys: keys.into_iter().map(Into::into).collect(),
}
}
pub fn keys(&self) -> &[String] {
&self.keys
}
}
impl Step for CapStep {
type Iter<'a>
= std::iter::Once<Traverser>
where
Self: 'a;
fn apply<'a>(
&'a self,
ctx: &'a ExecutionContext<'a>,
input: Box<dyn Iterator<Item = Traverser> + 'a>,
) -> Self::Iter<'a> {
input.for_each(drop);
let result = if self.keys.len() == 1 {
let values = ctx.side_effects.get(&self.keys[0]).unwrap_or_default();
Value::List(values)
} else {
let mut map = HashMap::new();
for key in &self.keys {
let values = ctx.side_effects.get(key).unwrap_or_default();
map.insert(key.clone(), Value::List(values));
}
Value::Map(map.into_value_map())
};
std::iter::once(Traverser::new(result))
}
fn name(&self) -> &'static str {
"cap"
}
fn category(&self) -> crate::traversal::explain::StepCategory {
crate::traversal::explain::StepCategory::SideEffect
}
fn apply_streaming(
&self,
ctx: crate::traversal::context::StreamingContext,
_input: Traverser,
) -> Box<dyn Iterator<Item = Traverser> + Send + 'static> {
let result = if self.keys.len() == 1 {
let values = ctx.side_effects().get(&self.keys[0]).unwrap_or_default();
Value::List(values)
} else {
let mut map = HashMap::new();
for key in &self.keys {
let values = ctx.side_effects().get(key).unwrap_or_default();
map.insert(key.clone(), Value::List(values));
}
Value::Map(map.into_value_map())
};
Box::new(std::iter::once(Traverser::new(result)))
}
}
#[derive(Clone)]
pub struct SideEffectStep {
side_traversal: Traversal<Value, Value>,
}
impl SideEffectStep {
pub fn new(side_traversal: Traversal<Value, Value>) -> Self {
Self { side_traversal }
}
}
impl std::fmt::Debug for SideEffectStep {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SideEffectStep")
.field(
"side_traversal",
&format!("<{} steps>", self.side_traversal.step_count()),
)
.finish()
}
}
impl Step for SideEffectStep {
type Iter<'a>
= impl Iterator<Item = Traverser> + 'a
where
Self: 'a;
fn apply<'a>(
&'a self,
ctx: &'a ExecutionContext<'a>,
input: Box<dyn Iterator<Item = Traverser> + 'a>,
) -> Self::Iter<'a> {
let side_traversal = self.side_traversal.clone();
input.inspect(move |t| {
let side_input = Box::new(std::iter::once(t.clone()));
execute_traversal_from(ctx, &side_traversal, side_input).for_each(drop);
})
}
fn name(&self) -> &'static str {
"sideEffect"
}
fn category(&self) -> crate::traversal::explain::StepCategory {
crate::traversal::explain::StepCategory::SideEffect
}
fn apply_streaming(
&self,
ctx: crate::traversal::context::StreamingContext,
input: Traverser,
) -> Box<dyn Iterator<Item = Traverser> + Send + 'static> {
use crate::traversal::step::execute_traversal_streaming;
execute_traversal_streaming(&ctx, &self.side_traversal, input.clone()).for_each(drop);
Box::new(std::iter::once(input))
}
}
#[derive(Clone)]
pub struct ProfileStep {
key: Option<String>,
streaming_count: Arc<AtomicU64>,
streaming_start: Arc<RwLock<Option<Instant>>>,
}
impl ProfileStep {
pub fn new() -> Self {
Self {
key: None,
streaming_count: Arc::new(AtomicU64::new(0)),
streaming_start: Arc::new(RwLock::new(None)),
}
}
pub fn with_key(key: impl Into<String>) -> Self {
Self {
key: Some(key.into()),
streaming_count: Arc::new(AtomicU64::new(0)),
streaming_start: Arc::new(RwLock::new(None)),
}
}
pub fn key(&self) -> Option<&str> {
self.key.as_deref()
}
}
impl Default for ProfileStep {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Debug for ProfileStep {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ProfileStep")
.field("key", &self.key)
.field(
"streaming_count",
&self.streaming_count.load(Ordering::Relaxed),
)
.finish()
}
}
impl Step for ProfileStep {
type Iter<'a>
= ProfileIterator<'a, Box<dyn Iterator<Item = Traverser> + 'a>>
where
Self: 'a;
fn apply<'a>(
&'a self,
ctx: &'a ExecutionContext<'a>,
input: Box<dyn Iterator<Item = Traverser> + 'a>,
) -> Self::Iter<'a> {
let key = self.key.clone().unwrap_or_else(|| "profile".to_string());
ProfileIterator {
inner: input,
ctx,
key,
count: Cell::new(0),
start: Instant::now(),
finished: Cell::new(false),
}
}
fn name(&self) -> &'static str {
"profile"
}
fn category(&self) -> crate::traversal::explain::StepCategory {
crate::traversal::explain::StepCategory::SideEffect
}
fn apply_streaming(
&self,
ctx: crate::traversal::context::StreamingContext,
input: Traverser,
) -> Box<dyn Iterator<Item = Traverser> + Send + 'static> {
{
let mut start_guard = self.streaming_start.write();
if start_guard.is_none() {
*start_guard = Some(Instant::now());
}
}
let count = self.streaming_count.fetch_add(1, Ordering::Relaxed) + 1;
let elapsed_ms = {
let start_guard = self.streaming_start.read();
start_guard
.map(|start| start.elapsed().as_secs_f64() * 1000.0)
.unwrap_or(0.0)
};
let key = self.key.clone().unwrap_or_else(|| "profile".to_string());
let profile = Value::Map({
let mut m = crate::value::ValueMap::new();
m.insert("count".to_string(), Value::Int(count as i64));
m.insert("time_ms".to_string(), Value::Float(elapsed_ms));
m
});
ctx.side_effects()
.store(&format!("{}_streaming", key), profile);
Box::new(std::iter::once(input))
}
}
pub struct ProfileIterator<'a, I> {
inner: I,
ctx: &'a ExecutionContext<'a>,
key: String,
count: Cell<u64>,
start: Instant,
finished: Cell<bool>,
}
impl<'a, I: Iterator<Item = Traverser>> Iterator for ProfileIterator<'a, I> {
type Item = Traverser;
fn next(&mut self) -> Option<Self::Item> {
match self.inner.next() {
Some(t) => {
self.count.set(self.count.get() + 1);
Some(t)
}
None => {
if !self.finished.get() {
self.finished.set(true);
let elapsed = self.start.elapsed();
let profile = Value::Map({
let mut m = crate::value::ValueMap::new();
m.insert("count".to_string(), Value::Int(self.count.get() as i64));
m.insert(
"time_ms".to_string(),
Value::Float(elapsed.as_secs_f64() * 1000.0),
);
m
});
self.ctx.side_effects.store(&self.key, profile);
}
None
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::Graph;
use crate::traversal::SnapshotLike;
use crate::value::{Value, VertexId};
use std::collections::HashMap;
fn create_test_graph() -> Graph {
let graph = Graph::new();
graph.add_vertex("person", {
let mut props = HashMap::new();
props.insert("name".to_string(), Value::String("Alice".to_string()));
props.insert("age".to_string(), Value::Int(30));
props
});
graph.add_vertex("person", {
let mut props = HashMap::new();
props.insert("name".to_string(), Value::String("Bob".to_string()));
props.insert("age".to_string(), Value::Int(25));
props
});
graph.add_vertex("software", {
let mut props = HashMap::new();
props.insert("name".to_string(), Value::String("Graph DB".to_string()));
props
});
graph
}
mod store_step_tests {
use super::*;
use crate::traversal::DynStep;
#[test]
fn store_step_new_creates_step_with_key() {
let step = StoreStep::new("test_key");
assert_eq!(step.key(), "test_key");
}
#[test]
fn store_step_new_accepts_string() {
let step = StoreStep::new(String::from("my_collection"));
assert_eq!(step.key(), "my_collection");
}
#[test]
fn store_step_name_is_store() {
let step = StoreStep::new("x");
assert_eq!(Step::name(&step), "store");
}
#[test]
fn store_step_is_cloneable() {
let step = StoreStep::new("test");
let cloned = step.clone();
assert_eq!(cloned.key(), "test");
}
#[test]
fn store_step_clone_box() {
let step = StoreStep::new("test");
let cloned = DynStep::clone_box(&step);
assert_eq!(cloned.dyn_name(), "store");
}
#[test]
fn store_step_debug_output() {
let step = StoreStep::new("my_key");
let debug_str = format!("{:?}", step);
assert!(debug_str.contains("StoreStep"));
assert!(debug_str.contains("my_key"));
}
#[test]
fn store_step_stores_values_in_side_effects() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = StoreStep::new("collected");
let input: Vec<Traverser> = vec![
Traverser::new(Value::Int(1)),
Traverser::new(Value::Int(2)),
Traverser::new(Value::Int(3)),
];
let output: Vec<Traverser> =
Step::apply(&step, &ctx, Box::new(input.into_iter())).collect();
let stored = ctx.side_effects.get("collected").unwrap();
assert_eq!(stored.len(), 3);
assert_eq!(stored[0], Value::Int(1));
assert_eq!(stored[1], Value::Int(2));
assert_eq!(stored[2], Value::Int(3));
assert_eq!(output.len(), 3);
}
#[test]
fn store_step_passes_traversers_through_unchanged() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = StoreStep::new("x");
let mut t = Traverser::from_vertex(VertexId(42));
t.extend_path_labeled("start");
t.loops = 5;
t.bulk = 10;
let input = vec![t];
let output: Vec<Traverser> =
Step::apply(&step, &ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
assert_eq!(output[0].value, Value::Vertex(VertexId(42)));
assert!(output[0].path.has_label("start"));
assert_eq!(output[0].loops, 5);
assert_eq!(output[0].bulk, 10);
}
#[test]
fn store_step_handles_empty_input() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = StoreStep::new("empty");
let input: Vec<Traverser> = vec![];
let output: Vec<Traverser> =
Step::apply(&step, &ctx, Box::new(input.into_iter())).collect();
assert!(output.is_empty());
assert!(ctx.side_effects.get("empty").is_none());
}
#[test]
fn store_step_stores_multiple_values_sequentially() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = StoreStep::new("items");
let input: Vec<Traverser> = vec![
Traverser::new(Value::String("first".to_string())),
Traverser::new(Value::String("second".to_string())),
Traverser::new(Value::String("third".to_string())),
];
let _output: Vec<Traverser> =
Step::apply(&step, &ctx, Box::new(input.into_iter())).collect();
let stored = ctx.side_effects.get("items").unwrap();
assert_eq!(stored.len(), 3);
assert_eq!(stored[0], Value::String("first".to_string()));
assert_eq!(stored[1], Value::String("second".to_string()));
assert_eq!(stored[2], Value::String("third".to_string()));
}
#[test]
fn store_step_stores_various_value_types() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = StoreStep::new("mixed");
let input: Vec<Traverser> = vec![
Traverser::new(Value::Int(42)),
Traverser::new(Value::String("hello".to_string())),
Traverser::new(Value::Bool(true)),
Traverser::new(Value::Float(3.15)),
Traverser::new(Value::Vertex(VertexId(1))),
Traverser::new(Value::Null),
];
let _output: Vec<Traverser> =
Step::apply(&step, &ctx, Box::new(input.into_iter())).collect();
let stored = ctx.side_effects.get("mixed").unwrap();
assert_eq!(stored.len(), 6);
assert_eq!(stored[0], Value::Int(42));
assert_eq!(stored[1], Value::String("hello".to_string()));
assert_eq!(stored[2], Value::Bool(true));
assert_eq!(stored[3], Value::Float(3.15));
assert_eq!(stored[4], Value::Vertex(VertexId(1)));
assert_eq!(stored[5], Value::Null);
}
#[test]
fn store_step_multiple_stores_to_same_key_append() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step1 = StoreStep::new("data");
let input1 = vec![Traverser::new(Value::Int(1)), Traverser::new(Value::Int(2))];
let _: Vec<_> = Step::apply(&step1, &ctx, Box::new(input1.into_iter())).collect();
let step2 = StoreStep::new("data");
let input2 = vec![Traverser::new(Value::Int(3)), Traverser::new(Value::Int(4))];
let _: Vec<_> = Step::apply(&step2, &ctx, Box::new(input2.into_iter())).collect();
let stored = ctx.side_effects.get("data").unwrap();
assert_eq!(stored.len(), 4);
assert_eq!(stored[0], Value::Int(1));
assert_eq!(stored[1], Value::Int(2));
assert_eq!(stored[2], Value::Int(3));
assert_eq!(stored[3], Value::Int(4));
}
#[test]
fn store_step_is_lazy_not_barrier() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = StoreStep::new("lazy_test");
let input = vec![
Traverser::new(Value::Int(1)),
Traverser::new(Value::Int(2)),
Traverser::new(Value::Int(3)),
];
let mut iter = Step::apply(&step, &ctx, Box::new(input.into_iter()));
let first = iter.next();
assert!(first.is_some());
let stored_after_first = ctx.side_effects.get("lazy_test").unwrap();
assert_eq!(stored_after_first.len(), 1);
assert_eq!(stored_after_first[0], Value::Int(1));
let _ = iter.next();
let stored_after_second = ctx.side_effects.get("lazy_test").unwrap();
assert_eq!(stored_after_second.len(), 2);
let _ = iter.collect::<Vec<_>>();
let stored_final = ctx.side_effects.get("lazy_test").unwrap();
assert_eq!(stored_final.len(), 3);
}
#[test]
fn store_step_can_be_used_as_dyn_step() {
let step: Box<dyn DynStep> = Box::new(StoreStep::new("test"));
assert_eq!(step.dyn_name(), "store");
}
#[test]
fn store_step_can_be_stored_in_vec_with_other_steps() {
use crate::traversal::step::IdentityStep;
let steps: Vec<Box<dyn DynStep>> = vec![
Box::new(IdentityStep::new()),
Box::new(StoreStep::new("collected")),
Box::new(IdentityStep::new()),
];
assert_eq!(steps.len(), 3);
assert_eq!(steps[0].dyn_name(), "identity");
assert_eq!(steps[1].dyn_name(), "store");
assert_eq!(steps[2].dyn_name(), "identity");
}
#[test]
fn store_step_different_keys_store_independently() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step_a = StoreStep::new("a");
let input_a = vec![Traverser::new(Value::Int(1))];
let _: Vec<_> = Step::apply(&step_a, &ctx, Box::new(input_a.into_iter())).collect();
let step_b = StoreStep::new("b");
let input_b = vec![Traverser::new(Value::Int(2))];
let _: Vec<_> = Step::apply(&step_b, &ctx, Box::new(input_b.into_iter())).collect();
let stored_a = ctx.side_effects.get("a").unwrap();
let stored_b = ctx.side_effects.get("b").unwrap();
assert_eq!(stored_a.len(), 1);
assert_eq!(stored_a[0], Value::Int(1));
assert_eq!(stored_b.len(), 1);
assert_eq!(stored_b[0], Value::Int(2));
}
}
mod aggregate_step_tests {
use super::*;
use crate::traversal::DynStep;
#[test]
fn aggregate_step_new_creates_step_with_key() {
let step = AggregateStep::new("test_key");
assert_eq!(step.key(), "test_key");
}
#[test]
fn aggregate_step_name_is_aggregate() {
let step = AggregateStep::new("x");
assert_eq!(Step::name(&step), "aggregate");
}
#[test]
fn aggregate_step_is_cloneable() {
let step = AggregateStep::new("test");
let cloned = step.clone();
assert_eq!(cloned.key(), "test");
}
#[test]
fn aggregate_step_clone_box() {
let step = AggregateStep::new("test");
let cloned = DynStep::clone_box(&step);
assert_eq!(cloned.dyn_name(), "aggregate");
}
#[test]
fn aggregate_step_stores_all_values() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = AggregateStep::new("collected");
let input: Vec<Traverser> = vec![
Traverser::new(Value::Int(1)),
Traverser::new(Value::Int(2)),
Traverser::new(Value::Int(3)),
];
let output: Vec<Traverser> =
Step::apply(&step, &ctx, Box::new(input.into_iter())).collect();
let stored = ctx.side_effects.get("collected").unwrap();
assert_eq!(stored.len(), 3);
assert_eq!(stored[0], Value::Int(1));
assert_eq!(stored[1], Value::Int(2));
assert_eq!(stored[2], Value::Int(3));
assert_eq!(output.len(), 3);
}
#[test]
fn aggregate_step_is_barrier() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = AggregateStep::new("barrier_test");
let input = vec![
Traverser::new(Value::Int(1)),
Traverser::new(Value::Int(2)),
Traverser::new(Value::Int(3)),
];
let mut iter = Step::apply(&step, &ctx, Box::new(input.into_iter()));
let stored_before = ctx.side_effects.get("barrier_test").unwrap();
assert_eq!(stored_before.len(), 3);
let _ = iter.next();
let _ = iter.next();
let _ = iter.next();
let stored_after = ctx.side_effects.get("barrier_test").unwrap();
assert_eq!(stored_after.len(), 3);
}
#[test]
fn aggregate_step_handles_empty_input() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = AggregateStep::new("empty");
let input: Vec<Traverser> = vec![];
let output: Vec<Traverser> =
Step::apply(&step, &ctx, Box::new(input.into_iter())).collect();
assert!(output.is_empty());
assert!(ctx.side_effects.get("empty").is_none());
}
#[test]
fn aggregate_step_passes_traversers_through_unchanged() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = AggregateStep::new("x");
let mut t = Traverser::from_vertex(VertexId(42));
t.extend_path_labeled("start");
t.loops = 5;
t.bulk = 10;
let input = vec![t];
let output: Vec<Traverser> =
Step::apply(&step, &ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
assert_eq!(output[0].value, Value::Vertex(VertexId(42)));
assert!(output[0].path.has_label("start"));
assert_eq!(output[0].loops, 5);
assert_eq!(output[0].bulk, 10);
}
}
mod cap_step_tests {
use super::*;
use crate::traversal::DynStep;
#[test]
fn cap_step_new_creates_single_key() {
let step = CapStep::new("test");
assert_eq!(step.keys(), &["test"]);
}
#[test]
fn cap_step_multi_creates_multiple_keys() {
let step = CapStep::multi(["a", "b", "c"]);
assert_eq!(step.keys(), &["a", "b", "c"]);
}
#[test]
fn cap_step_name_is_cap() {
let step = CapStep::new("x");
assert_eq!(Step::name(&step), "cap");
}
#[test]
fn cap_step_is_cloneable() {
let step = CapStep::new("test");
let cloned = step.clone();
assert_eq!(cloned.keys(), &["test"]);
}
#[test]
fn cap_step_clone_box() {
let step = CapStep::new("test");
let cloned = DynStep::clone_box(&step);
assert_eq!(cloned.dyn_name(), "cap");
}
#[test]
fn cap_step_single_key_returns_list() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
ctx.side_effects.store("items", Value::Int(1));
ctx.side_effects.store("items", Value::Int(2));
ctx.side_effects.store("items", Value::Int(3));
let step = CapStep::new("items");
let input: Vec<Traverser> = vec![];
let output: Vec<Traverser> =
Step::apply(&step, &ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
match &output[0].value {
Value::List(values) => {
assert_eq!(values.len(), 3);
assert_eq!(values[0], Value::Int(1));
assert_eq!(values[1], Value::Int(2));
assert_eq!(values[2], Value::Int(3));
}
_ => panic!("Expected List"),
}
}
#[test]
fn cap_step_multiple_keys_returns_map() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
ctx.side_effects.store("vertices", Value::Int(1));
ctx.side_effects.store("vertices", Value::Int(2));
ctx.side_effects.store("edges", Value::Int(10));
let step = CapStep::multi(["vertices", "edges"]);
let input: Vec<Traverser> = vec![];
let output: Vec<Traverser> =
Step::apply(&step, &ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
match &output[0].value {
Value::Map(map) => {
assert!(map.contains_key("vertices"));
assert!(map.contains_key("edges"));
if let Value::List(v) = map.get("vertices").unwrap() {
assert_eq!(v.len(), 2);
} else {
panic!("Expected List for vertices");
}
if let Value::List(e) = map.get("edges").unwrap() {
assert_eq!(e.len(), 1);
} else {
panic!("Expected List for edges");
}
}
_ => panic!("Expected Map"),
}
}
#[test]
fn cap_step_missing_key_returns_empty_list() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = CapStep::new("nonexistent");
let input: Vec<Traverser> = vec![];
let output: Vec<Traverser> =
Step::apply(&step, &ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
match &output[0].value {
Value::List(values) => {
assert!(values.is_empty());
}
_ => panic!("Expected List"),
}
}
#[test]
fn cap_step_consumes_input_stream() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let store_step = StoreStep::new("items");
let cap_step = CapStep::new("items");
let input = vec![Traverser::new(Value::Int(1)), Traverser::new(Value::Int(2))];
let after_store = Step::apply(&store_step, &ctx, Box::new(input.into_iter()));
let output: Vec<Traverser> =
Step::apply(&cap_step, &ctx, Box::new(after_store)).collect();
assert_eq!(output.len(), 1);
match &output[0].value {
Value::List(values) => {
assert_eq!(values.len(), 2);
}
_ => panic!("Expected List"),
}
}
#[test]
fn cap_step_with_empty_input_still_returns_result() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
ctx.side_effects.store("x", Value::Int(42));
let step = CapStep::new("x");
let input: Vec<Traverser> = vec![];
let output: Vec<Traverser> =
Step::apply(&step, &ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
match &output[0].value {
Value::List(values) => {
assert_eq!(values.len(), 1);
assert_eq!(values[0], Value::Int(42));
}
_ => panic!("Expected List"),
}
}
}
mod side_effect_step_tests {
use super::*;
use crate::traversal::DynStep;
#[test]
fn side_effect_step_name_is_side_effect() {
let step = SideEffectStep::new(Traversal::new());
assert_eq!(Step::name(&step), "sideEffect");
}
#[test]
fn side_effect_step_is_cloneable() {
let step = SideEffectStep::new(Traversal::new());
let _cloned = step.clone();
}
#[test]
fn side_effect_step_clone_box() {
let step = SideEffectStep::new(Traversal::new());
let cloned = DynStep::clone_box(&step);
assert_eq!(cloned.dyn_name(), "sideEffect");
}
#[test]
fn side_effect_step_passes_traversers_through() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = SideEffectStep::new(Traversal::new());
let input = vec![Traverser::new(Value::Int(1)), Traverser::new(Value::Int(2))];
let output: Vec<Traverser> =
Step::apply(&step, &ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 2);
assert_eq!(output[0].value, Value::Int(1));
assert_eq!(output[1].value, Value::Int(2));
}
#[test]
fn side_effect_step_executes_sub_traversal() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let side_traversal =
Traversal::<Value, Value>::new().add_step(StoreStep::new("side_stored"));
let step = SideEffectStep::new(side_traversal);
let input = vec![Traverser::new(Value::Int(1)), Traverser::new(Value::Int(2))];
let output: Vec<Traverser> =
Step::apply(&step, &ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 2);
assert_eq!(output[0].value, Value::Int(1));
assert_eq!(output[1].value, Value::Int(2));
let stored = ctx.side_effects.get("side_stored").unwrap();
assert_eq!(stored.len(), 2);
assert_eq!(stored[0], Value::Int(1));
assert_eq!(stored[1], Value::Int(2));
}
#[test]
fn side_effect_step_handles_empty_input() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let side_traversal =
Traversal::<Value, Value>::new().add_step(StoreStep::new("empty_side"));
let step = SideEffectStep::new(side_traversal);
let input: Vec<Traverser> = vec![];
let output: Vec<Traverser> =
Step::apply(&step, &ctx, Box::new(input.into_iter())).collect();
assert!(output.is_empty());
assert!(ctx.side_effects.get("empty_side").is_none());
}
#[test]
fn side_effect_step_preserves_traverser_metadata() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = SideEffectStep::new(Traversal::new());
let mut t = Traverser::from_vertex(VertexId(42));
t.extend_path_labeled("start");
t.loops = 5;
t.bulk = 10;
let input = vec![t];
let output: Vec<Traverser> =
Step::apply(&step, &ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
assert_eq!(output[0].value, Value::Vertex(VertexId(42)));
assert!(output[0].path.has_label("start"));
assert_eq!(output[0].loops, 5);
assert_eq!(output[0].bulk, 10);
}
#[test]
fn side_effect_step_debug_output() {
let step = SideEffectStep::new(Traversal::new());
let debug_str = format!("{:?}", step);
assert!(debug_str.contains("SideEffectStep"));
}
}
mod profile_step_tests {
use super::*;
use crate::traversal::DynStep;
#[test]
fn profile_step_new_uses_default_key() {
let step = ProfileStep::new();
assert_eq!(step.key(), None);
}
#[test]
fn profile_step_with_key_uses_custom_key() {
let step = ProfileStep::with_key("my_profile");
assert_eq!(step.key(), Some("my_profile"));
}
#[test]
fn profile_step_name_is_profile() {
let step = ProfileStep::new();
assert_eq!(Step::name(&step), "profile");
}
#[test]
fn profile_step_is_cloneable() {
let step = ProfileStep::with_key("test");
let cloned = step.clone();
assert_eq!(cloned.key(), Some("test"));
}
#[test]
fn profile_step_clone_box() {
let step = ProfileStep::new();
let cloned = DynStep::clone_box(&step);
assert_eq!(cloned.dyn_name(), "profile");
}
#[test]
fn profile_step_default_impl() {
let step = ProfileStep::default();
assert_eq!(step.key(), None);
}
#[test]
fn profile_step_records_count() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = ProfileStep::with_key("count_test");
let input = vec![
Traverser::new(Value::Int(1)),
Traverser::new(Value::Int(2)),
Traverser::new(Value::Int(3)),
];
let output: Vec<Traverser> =
Step::apply(&step, &ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 3);
let profile = ctx.side_effects.get("count_test").unwrap();
assert_eq!(profile.len(), 1);
if let Value::Map(map) = &profile[0] {
assert_eq!(map.get("count"), Some(&Value::Int(3)));
} else {
panic!("Expected Map");
}
}
#[test]
fn profile_step_records_time_ms() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = ProfileStep::with_key("time_test");
let input = vec![Traverser::new(Value::Int(1))];
let _: Vec<_> = Step::apply(&step, &ctx, Box::new(input.into_iter())).collect();
let profile = ctx.side_effects.get("time_test").unwrap();
if let Value::Map(map) = &profile[0] {
if let Some(Value::Float(time_ms)) = map.get("time_ms") {
assert!(*time_ms >= 0.0);
} else {
panic!("Expected time_ms to be Float");
}
} else {
panic!("Expected Map");
}
}
#[test]
fn profile_step_uses_default_key_when_none_specified() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = ProfileStep::new();
let input = vec![Traverser::new(Value::Int(1))];
let _: Vec<_> = Step::apply(&step, &ctx, Box::new(input.into_iter())).collect();
let profile = ctx.side_effects.get("profile").unwrap();
assert_eq!(profile.len(), 1);
}
#[test]
fn profile_step_handles_empty_input() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = ProfileStep::with_key("empty_test");
let input: Vec<Traverser> = vec![];
let output: Vec<Traverser> =
Step::apply(&step, &ctx, Box::new(input.into_iter())).collect();
assert!(output.is_empty());
let profile = ctx.side_effects.get("empty_test").unwrap();
assert_eq!(profile.len(), 1);
if let Value::Map(map) = &profile[0] {
assert_eq!(map.get("count"), Some(&Value::Int(0)));
} else {
panic!("Expected Map");
}
}
#[test]
fn profile_step_passes_traversers_through_unchanged() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = ProfileStep::new();
let mut t = Traverser::from_vertex(VertexId(42));
t.extend_path_labeled("start");
t.loops = 5;
t.bulk = 10;
let input = vec![t];
let output: Vec<Traverser> =
Step::apply(&step, &ctx, Box::new(input.into_iter())).collect();
assert_eq!(output.len(), 1);
assert_eq!(output[0].value, Value::Vertex(VertexId(42)));
assert!(output[0].path.has_label("start"));
assert_eq!(output[0].loops, 5);
assert_eq!(output[0].bulk, 10);
}
#[test]
fn profile_step_only_records_once_on_exhaustion() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = ProfileStep::with_key("once_test");
let input = vec![Traverser::new(Value::Int(1))];
let mut iter = Step::apply(&step, &ctx, Box::new(input.into_iter()));
let _ = iter.next();
let _ = iter.next();
let _ = iter.next();
let _ = iter.next();
let profile = ctx.side_effects.get("once_test").unwrap();
assert_eq!(profile.len(), 1);
}
#[test]
fn profile_step_debug_output() {
let step = ProfileStep::with_key("debug_test");
let debug_str = format!("{:?}", step);
assert!(debug_str.contains("ProfileStep"));
assert!(debug_str.contains("debug_test"));
}
}
mod streaming_tests {
use super::*;
use crate::traversal::context::StreamingContext;
use crate::traversal::step::Step;
use crate::traversal::SnapshotLike;
fn create_test_graph() -> Graph {
let graph = Graph::new();
graph.add_vertex("person", {
let mut props = HashMap::new();
props.insert("name".to_string(), Value::String("Alice".to_string()));
props.insert("age".to_string(), Value::Int(30));
props
});
graph.add_vertex("person", {
let mut props = HashMap::new();
props.insert("name".to_string(), Value::String("Bob".to_string()));
props.insert("age".to_string(), Value::Int(25));
props
});
graph.add_vertex("software", {
let mut props = HashMap::new();
props.insert("name".to_string(), Value::String("Graph DB".to_string()));
props
});
graph
}
#[test]
fn store_step_streaming_stores_values() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = StreamingContext::new(snapshot.arc_streamable(), snapshot.arc_interner());
let step = StoreStep::new("streamed");
let inputs = vec![
Traverser::new(Value::Int(1)),
Traverser::new(Value::Int(2)),
Traverser::new(Value::Int(3)),
];
let mut outputs = vec![];
for input in inputs {
let result: Vec<_> = Step::apply_streaming(&step, ctx.clone(), input).collect();
outputs.extend(result);
}
let stored = ctx.side_effects().get("streamed").unwrap();
assert_eq!(stored.len(), 3);
assert_eq!(stored[0], Value::Int(1));
assert_eq!(stored[1], Value::Int(2));
assert_eq!(stored[2], Value::Int(3));
assert_eq!(outputs.len(), 3);
}
#[test]
fn store_step_streaming_matches_eager() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let eager_ctx = ExecutionContext::new(snapshot.storage(), snapshot.interner());
let step = StoreStep::new("items");
let input = vec![Traverser::new(Value::Int(42))];
let _: Vec<_> = Step::apply(&step, &eager_ctx, Box::new(input.into_iter())).collect();
let eager_stored = eager_ctx.side_effects.get("items").unwrap();
let streaming_ctx =
StreamingContext::new(snapshot.arc_streamable(), snapshot.arc_interner());
let step = StoreStep::new("items");
let _: Vec<_> =
Step::apply_streaming(&step, streaming_ctx.clone(), Traverser::new(Value::Int(42)))
.collect();
let streaming_stored = streaming_ctx.side_effects().get("items").unwrap();
assert_eq!(eager_stored, streaming_stored);
}
#[test]
fn aggregate_step_streaming_stores_values() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = StreamingContext::new(snapshot.arc_streamable(), snapshot.arc_interner());
let step = AggregateStep::new("collected");
let inputs = vec![
Traverser::new(Value::String("a".to_string())),
Traverser::new(Value::String("b".to_string())),
];
for input in inputs {
let _: Vec<_> = Step::apply_streaming(&step, ctx.clone(), input).collect();
}
let stored = ctx.side_effects().get("collected").unwrap();
assert_eq!(stored.len(), 2);
}
#[test]
fn side_effect_step_streaming_executes_sub_traversal() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = StreamingContext::new(snapshot.arc_streamable(), snapshot.arc_interner());
let sub = Traversal::<Value, Value>::new().add_step(StoreStep::new("side_stored"));
let step = SideEffectStep::new(sub);
let input = Traverser::new(Value::Int(99));
let output: Vec<_> = Step::apply_streaming(&step, ctx.clone(), input).collect();
assert_eq!(output.len(), 1);
assert_eq!(output[0].value, Value::Int(99));
let stored = ctx.side_effects().get("side_stored").unwrap();
assert_eq!(stored.len(), 1);
assert_eq!(stored[0], Value::Int(99));
}
#[test]
fn cap_step_streaming_retrieves_side_effects() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = StreamingContext::new(snapshot.arc_streamable(), snapshot.arc_interner());
ctx.side_effects().store("data", Value::Int(1));
ctx.side_effects().store("data", Value::Int(2));
let step = CapStep::new("data");
let output: Vec<_> =
Step::apply_streaming(&step, ctx.clone(), Traverser::new(Value::Null)).collect();
assert_eq!(output.len(), 1);
match &output[0].value {
Value::List(values) => {
assert_eq!(values.len(), 2);
assert_eq!(values[0], Value::Int(1));
assert_eq!(values[1], Value::Int(2));
}
_ => panic!("Expected List"),
}
}
#[test]
fn profile_step_streaming_tracks_count() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = StreamingContext::new(snapshot.arc_streamable(), snapshot.arc_interner());
let step = ProfileStep::with_key("profile_test");
for i in 0..5 {
let _: Vec<_> =
Step::apply_streaming(&step, ctx.clone(), Traverser::new(Value::Int(i)))
.collect();
}
let profile = ctx.side_effects().get("profile_test_streaming").unwrap();
assert_eq!(profile.len(), 5);
if let Value::Map(map) = &profile[4] {
assert_eq!(map.get("count"), Some(&Value::Int(5)));
if let Some(Value::Float(time_ms)) = map.get("time_ms") {
assert!(*time_ms >= 0.0);
} else {
panic!("Expected time_ms to be Float");
}
} else {
panic!("Expected Map");
}
}
#[test]
fn profile_step_streaming_passes_through_unchanged() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = StreamingContext::new(snapshot.arc_streamable(), snapshot.arc_interner());
let step = ProfileStep::new();
let mut t = Traverser::from_vertex(VertexId(42));
t.extend_path_labeled("test");
t.loops = 3;
let output: Vec<_> = Step::apply_streaming(&step, ctx.clone(), t).collect();
assert_eq!(output.len(), 1);
assert_eq!(output[0].value, Value::Vertex(VertexId(42)));
assert!(output[0].path.has_label("test"));
assert_eq!(output[0].loops, 3);
}
#[test]
fn streaming_context_shares_side_effects_across_clones() {
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = StreamingContext::new(snapshot.arc_streamable(), snapshot.arc_interner());
let ctx_clone = ctx.clone();
ctx.side_effects().store("shared", Value::Int(1));
ctx_clone.side_effects().store("shared", Value::Int(2));
let original_view = ctx.side_effects().get("shared").unwrap();
let clone_view = ctx_clone.side_effects().get("shared").unwrap();
assert_eq!(original_view, clone_view);
assert_eq!(original_view.len(), 2);
}
#[test]
fn index_step_streaming_increments_counter() {
use crate::traversal::transform::metadata::IndexStep;
let graph = create_test_graph();
let snapshot = graph.snapshot();
let ctx = StreamingContext::new(snapshot.arc_streamable(), snapshot.arc_interner());
let step = IndexStep::new();
let mut results = vec![];
for i in 0..3 {
let output: Vec<_> = Step::apply_streaming(
&step,
ctx.clone(),
Traverser::new(Value::String(format!("item_{}", i))),
)
.collect();
results.extend(output);
}
assert_eq!(results.len(), 3);
for (i, result) in results.iter().enumerate() {
match &result.value {
Value::List(parts) => {
assert_eq!(parts.len(), 2);
assert_eq!(parts[1], Value::Int(i as i64));
}
_ => panic!("Expected List"),
}
}
}
}
}