#[macro_use]
extern crate proptest;
use proptest::prelude::{Strategy, prop};
fn node_name_strategy() -> impl Strategy<Value = String> {
let base = prop::string::string_regex("[A-Za-z][A-Za-z0-9_]{0,16}").unwrap();
base.prop_filter("exclude reserved and reserved root name", |s| {
s != "Start" && s != "End" && s != "Root"
})
}
proptest! {
#[test]
fn prop_node_name_non_empty(name in node_name_strategy()) {
prop_assert!(!name.is_empty());
prop_assert!(name.chars().next().unwrap().is_ascii_alphabetic());
}
}
mod common;
use common::*;
use proptest::prelude::any;
use rustc_hash::FxHashSet;
use std::sync::Arc;
use weavegraph::graphs::{EdgePredicate, GraphBuilder};
use weavegraph::runtimes::{AppRunner, CheckpointerType, SessionInit, StepOptions, StepResult};
use weavegraph::state::StateSnapshot;
use weavegraph::types::NodeKind;
fn block_on<F: std::future::Future<Output = ()>>(fut: F) {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(fut);
}
proptest! {
#[test]
fn prop_valid_only_predicate_targets(
mut names in prop::collection::vec(node_name_strategy(), 1..8),
include_end in any::<bool>(),
) {
names.sort();
names.dedup();
block_on(async move {
let mut gb = GraphBuilder::new()
.add_node(NodeKind::Custom("Root".into()), TestNode { name: "root" })
.add_edge(NodeKind::Start, NodeKind::Custom("Root".into()))
.add_edge(NodeKind::Custom("Root".into()), NodeKind::End);
for n in &names {
gb = gb.add_node(NodeKind::Custom(n.clone()), TestNode { name: "t" });
}
let mut targets: Vec<String> = names.clone();
if include_end { targets.push("End".into()); }
let predicate: EdgePredicate = Arc::new(move |_snap| targets.clone());
gb = gb.add_conditional_edge(NodeKind::Custom("Root".into()), predicate);
let app = gb.compile().unwrap();
let mut runner = AppRunner::builder().app(app).checkpointer(CheckpointerType::InMemory).build().await;
let initial = state_with_user("seed");
match runner.create_session("sess_valid".into(), initial).await.unwrap() {
SessionInit::Fresh => {}
_ => panic!("expected fresh session"),
}
let report = runner.run_step("sess_valid", StepOptions::default()).await.unwrap();
let rep = match report { StepResult::Completed(rep) => rep, _ => panic!("expected completed") };
let nf: FxHashSet<_> = rep.next_frontier.into_iter().collect();
let allowed: FxHashSet<_> = names.clone().into_iter().collect();
for n in names.clone() {
assert!(nf.contains(&NodeKind::Custom(n)));
}
if include_end { assert!(nf.contains(&NodeKind::End)); }
for k in nf {
if let NodeKind::Custom(s) = k { assert!(allowed.contains(&s)); }
}
});
}
}
proptest! {
#[test]
fn prop_mixed_valid_invalid_targets(
mut valid in prop::collection::vec(node_name_strategy(), 1..6),
mut invalid in prop::collection::vec(node_name_strategy(), 1..6),
) {
valid.sort(); valid.dedup();
invalid.sort(); invalid.dedup();
invalid.retain(|n| !valid.contains(n));
prop_assume!(!valid.is_empty());
prop_assume!(!invalid.is_empty());
block_on(async move {
let mut gb = GraphBuilder::new()
.add_node(NodeKind::Custom("Root".into()), TestNode { name: "root" })
.add_edge(NodeKind::Start, NodeKind::Custom("Root".into()))
.add_edge(NodeKind::Custom("Root".into()), NodeKind::End);
for n in &valid { gb = gb.add_node(NodeKind::Custom(n.clone()), TestNode { name: "t" }); }
let mut targets = valid.clone();
targets.extend(invalid.clone());
targets.push("End".into());
let predicate: EdgePredicate = Arc::new(move |_snap| targets.clone());
gb = gb.add_conditional_edge(NodeKind::Custom("Root".into()), predicate);
let app = gb.compile().unwrap();
let mut runner = AppRunner::builder().app(app).checkpointer(CheckpointerType::InMemory).build().await;
match runner.create_session("sess_mix".into(), state_with_user("x")).await.unwrap() {
SessionInit::Fresh => {}, _ => panic!("fresh")
}
let rep = match runner.run_step("sess_mix", StepOptions::default()).await.unwrap() { StepResult::Completed(rep) => rep, _ => unreachable!() };
let nf: FxHashSet<_> = rep.next_frontier.into_iter().collect();
for n in &valid { assert!(nf.contains(&NodeKind::Custom(n.clone()))); }
assert!(nf.contains(&NodeKind::End));
for n in &invalid { assert!(!nf.contains(&NodeKind::Custom(n.clone()))); }
});
}
}
proptest! {
#[test]
fn prop_stress_fan_out_dedup(
mut pool in prop::collection::vec(node_name_strategy(), 2..16),
fanout in 1usize..64,
) {
pool.sort(); pool.dedup();
block_on(async move {
let mut gb = GraphBuilder::new()
.add_node(NodeKind::Custom("Root".into()), TestNode { name: "root" })
.add_edge(NodeKind::Start, NodeKind::Custom("Root".into()))
.add_edge(NodeKind::Custom("Root".into()), NodeKind::End);
for n in &pool { gb = gb.add_node(NodeKind::Custom(n.clone()), TestNode { name: "t" }); }
let mut outs: Vec<String> = Vec::new();
for i in 0..fanout { outs.push(pool[i % pool.len()].clone()); }
if fanout % 2 == 0 { outs.push("End".into()); }
let predicate: EdgePredicate = Arc::new(move |_snap| outs.clone());
gb = gb.add_conditional_edge(NodeKind::Custom("Root".into()), predicate);
let app = gb.compile().unwrap();
let mut runner = AppRunner::builder().app(app).checkpointer(CheckpointerType::InMemory).build().await;
match runner.create_session("sess_fan".into(), state_with_user("y")).await.unwrap() {
SessionInit::Fresh => {}, _ => panic!("fresh")
}
let rep = match runner.run_step("sess_fan", StepOptions::default()).await.unwrap() { StepResult::Completed(rep) => rep, _ => unreachable!() };
let mut counts = std::collections::HashMap::<String, usize>::new();
for k in rep.next_frontier {
if let NodeKind::Custom(s) = k { *counts.entry(s).or_insert(0) += 1; }
}
for n in pool { assert!(counts.get(&n).cloned().unwrap_or(0) <= 1); }
});
}
}
fn extra_key_strategy() -> impl Strategy<Value = String> {
prop::string::string_regex("[a-z][a-z0-9_]{0,8}").unwrap()
}
proptest! {
#[test]
fn prop_conditional_routing_based_on_extra_data(
key in extra_key_strategy(),
threshold in 0i64..100,
value in 0i64..100,
) {
block_on(async move {
let key_clone = key.clone();
let predicate: EdgePredicate = Arc::new(move |snap: StateSnapshot| {
if let Some(val) = snap.extra.get(&key_clone)
&& let Some(num) = val.as_i64()
&& num >= threshold {
return vec!["HighPath".to_string()];
}
vec!["LowPath".to_string()]
});
let app = GraphBuilder::new()
.add_node(NodeKind::Custom("Root".into()), TestNode { name: "root" })
.add_node(NodeKind::Custom("HighPath".into()), TestNode { name: "high" })
.add_node(NodeKind::Custom("LowPath".into()), TestNode { name: "low" })
.add_edge(NodeKind::Start, NodeKind::Custom("Root".into()))
.add_edge(NodeKind::Custom("Root".into()), NodeKind::End)
.add_edge(NodeKind::Custom("HighPath".into()), NodeKind::End)
.add_edge(NodeKind::Custom("LowPath".into()), NodeKind::End)
.add_conditional_edge(NodeKind::Custom("Root".into()), predicate)
.compile()
.unwrap();
let mut runner = AppRunner::builder().app(app).checkpointer(CheckpointerType::InMemory).build().await;
let mut state = state_with_user("test");
state.extra.get_mut().insert(key.clone(), serde_json::json!(value));
runner.create_session("sess_cond".into(), state).await.unwrap();
let rep = match runner.run_step("sess_cond", StepOptions::default()).await.unwrap() {
StepResult::Completed(rep) => rep,
_ => panic!("expected completed"),
};
let nf: FxHashSet<_> = rep.next_frontier.into_iter().collect();
if value >= threshold {
assert!(nf.contains(&NodeKind::Custom("HighPath".into())),
"expected HighPath when value {} >= threshold {}", value, threshold);
assert!(!nf.contains(&NodeKind::Custom("LowPath".into())),
"unexpected LowPath when value {} >= threshold {}", value, threshold);
} else {
assert!(nf.contains(&NodeKind::Custom("LowPath".into())),
"expected LowPath when value {} < threshold {}", value, threshold);
assert!(!nf.contains(&NodeKind::Custom("HighPath".into())),
"unexpected HighPath when value {} < threshold {}", value, threshold);
}
});
}
}
proptest! {
#[test]
fn prop_multiple_conditional_edges_same_source(
mut targets_a in prop::collection::vec(node_name_strategy(), 1..4),
mut targets_b in prop::collection::vec(node_name_strategy(), 1..4),
) {
targets_a.sort(); targets_a.dedup();
targets_b.sort(); targets_b.dedup();
targets_b.retain(|n| !targets_a.contains(n));
prop_assume!(!targets_a.is_empty());
prop_assume!(!targets_b.is_empty());
block_on(async move {
let mut gb = GraphBuilder::new()
.add_node(NodeKind::Custom("Root".into()), TestNode { name: "root" })
.add_edge(NodeKind::Start, NodeKind::Custom("Root".into()))
.add_edge(NodeKind::Custom("Root".into()), NodeKind::End);
for n in targets_a.iter().chain(targets_b.iter()) {
gb = gb.add_node(NodeKind::Custom(n.clone()), TestNode { name: "t" });
}
let ta = targets_a.clone();
let pred_a: EdgePredicate = Arc::new(move |_| ta.clone());
gb = gb.add_conditional_edge(NodeKind::Custom("Root".into()), pred_a);
let tb = targets_b.clone();
let pred_b: EdgePredicate = Arc::new(move |_| tb.clone());
gb = gb.add_conditional_edge(NodeKind::Custom("Root".into()), pred_b);
let app = gb.compile().unwrap();
let mut runner = AppRunner::builder().app(app).checkpointer(CheckpointerType::InMemory).build().await;
runner.create_session("sess_multi".into(), state_with_user("x")).await.unwrap();
let rep = match runner.run_step("sess_multi", StepOptions::default()).await.unwrap() {
StepResult::Completed(rep) => rep,
_ => panic!("expected completed"),
};
let nf: FxHashSet<_> = rep.next_frontier.into_iter().collect();
for n in &targets_a {
assert!(nf.contains(&NodeKind::Custom(n.clone())),
"expected target_a node {} in frontier", n);
}
for n in &targets_b {
assert!(nf.contains(&NodeKind::Custom(n.clone())),
"expected target_b node {} in frontier", n);
}
});
}
}
proptest! {
#[test]
fn prop_empty_predicate_result_safe(
mut registered in prop::collection::vec(node_name_strategy(), 1..5),
) {
registered.sort(); registered.dedup();
prop_assume!(!registered.is_empty());
block_on(async move {
let mut gb = GraphBuilder::new()
.add_node(NodeKind::Custom("Root".into()), TestNode { name: "root" })
.add_edge(NodeKind::Start, NodeKind::Custom("Root".into()))
.add_edge(NodeKind::Custom("Root".into()), NodeKind::End);
for n in ®istered {
gb = gb.add_node(NodeKind::Custom(n.clone()), TestNode { name: "t" });
}
let pred: EdgePredicate = Arc::new(|_| Vec::new());
gb = gb.add_conditional_edge(NodeKind::Custom("Root".into()), pred);
let app = gb.compile().unwrap();
let mut runner = AppRunner::builder().app(app).checkpointer(CheckpointerType::InMemory).build().await;
runner.create_session("sess_empty".into(), state_with_user("x")).await.unwrap();
let rep = match runner.run_step("sess_empty", StepOptions::default()).await.unwrap() {
StepResult::Completed(rep) => rep,
_ => panic!("expected completed"),
};
let nf: FxHashSet<_> = rep.next_frontier.into_iter().collect();
assert!(nf.contains(&NodeKind::End), "End should be in frontier");
for n in ®istered {
assert!(!nf.contains(&NodeKind::Custom(n.clone())),
"registered node {} should not be in frontier with empty predicate", n);
}
});
}
}
proptest! {
#[test]
fn prop_predicate_can_route_to_end(
include_end in any::<bool>(),
mut targets in prop::collection::vec(node_name_strategy(), 0..3),
) {
targets.sort(); targets.dedup();
block_on(async move {
let mut gb = GraphBuilder::new()
.add_node(NodeKind::Custom("Root".into()), TestNode { name: "root" })
.add_edge(NodeKind::Start, NodeKind::Custom("Root".into()));
for n in &targets {
gb = gb.add_node(NodeKind::Custom(n.clone()), TestNode { name: "t" });
gb = gb.add_edge(NodeKind::Custom(n.clone()), NodeKind::End);
}
let mut pred_targets = targets.clone();
if include_end {
pred_targets.push("End".to_string());
}
let pred: EdgePredicate = Arc::new(move |_| pred_targets.clone());
gb = gb.add_conditional_edge(NodeKind::Custom("Root".into()), pred);
let app = gb.compile().unwrap();
let mut runner = AppRunner::builder().app(app).checkpointer(CheckpointerType::InMemory).build().await;
runner.create_session("sess_end".into(), state_with_user("x")).await.unwrap();
let rep = match runner.run_step("sess_end", StepOptions::default()).await.unwrap() {
StepResult::Completed(rep) => rep,
_ => panic!("expected completed"),
};
let nf: FxHashSet<_> = rep.next_frontier.into_iter().collect();
assert_eq!(nf.contains(&NodeKind::End), include_end,
"End presence in frontier should match include_end={}", include_end);
for n in &targets {
assert!(nf.contains(&NodeKind::Custom(n.clone())),
"target {} should be in frontier", n);
}
});
}
}