use lellm_graph::{
BarrierDecision, BarrierDefaultAction, BarrierNode, BuildError, BuildErrors, Diagnostic,
DiagnosticCategory, GraphBuilder, GraphError, GraphEvent, GraphExecution, NodeContext,
NodeKind, SK_COUNT, SK_STEPS, SimpleExecutor, State, StateExt, StateKey, StateMutation,
TaskNode, TerminalError, TraceId,
};
use std::sync::Arc;
use std::time::Duration;
fn build_graph<F>(name: &str, f: F) -> Result<lellm_graph::Graph, BuildErrors>
where
F: FnOnce(&mut GraphBuilder) -> Result<(), BuildError>,
{
let mut g = GraphBuilder::new(name);
let _ = f(&mut g);
g.build()
}
#[tokio::test]
async fn test_linear_pipeline() {
let graph = build_graph("linear", |g| {
let _ = g.start("a");
let _ = g.node(
"a",
NodeKind::Task(TaskNode::new("a", |ctx: &mut NodeContext<'_>| {
ctx.record(StateMutation::Put("step".into(), serde_json::json!("a")));
Ok(())
})),
);
let _ = g.node(
"b",
NodeKind::Task(TaskNode::new("b", |ctx: &mut NodeContext<'_>| {
ctx.record(StateMutation::Put("step".into(), serde_json::json!("b")));
Ok(())
})),
);
let _ = g.node(
"c",
NodeKind::Task(TaskNode::new("c", |ctx: &mut NodeContext<'_>| {
ctx.record(StateMutation::Put("step".into(), serde_json::json!("c")));
Ok(())
})),
);
let _ = g.edge("a", "b");
let _ = g.edge("b", "c");
let _ = g.end("c");
Ok(())
})
.expect("build should succeed");
let initial_state = State::new();
let result = SimpleExecutor::default()
.execute(Arc::new(graph), initial_state)
.await
.expect("execution should succeed");
assert_eq!(result.state.get("step").unwrap(), &serde_json::json!("c"));
assert_eq!(result.execution_log.len(), 3);
}
#[tokio::test]
async fn test_condition_branching() {
let graph = build_graph("condition", |g| {
let _ = g.start("check");
let _ = g.node(
"check",
NodeKind::Condition(
lellm_graph::ConditionNode::builder("check")
.branch("yes", |s: &State| {
s.get("flag").and_then(|v| v.as_bool()).unwrap_or(false)
})
.branch("no", |_| true)
.build(),
),
);
let _ = g.node(
"yes",
NodeKind::Task(TaskNode::new("yes", |ctx: &mut NodeContext<'_>| {
ctx.record(StateMutation::Put(
"result".into(),
serde_json::json!("yes"),
));
Ok(())
})),
);
let _ = g.node(
"no",
NodeKind::Task(TaskNode::new("no", |ctx: &mut NodeContext<'_>| {
ctx.record(StateMutation::Put("result".into(), serde_json::json!("no")));
Ok(())
})),
);
let _ = g.edge("check", "yes");
let _ = g.edge("check", "no");
let _ = g.edge("yes", "yes_end");
let _ = g.edge("no", "no_end");
let _ = g.node(
"yes_end",
NodeKind::Task(TaskNode::new(
"yes_end",
|_ctx: &mut NodeContext<'_>| Ok(()),
)),
);
let _ = g.node(
"no_end",
NodeKind::Task(TaskNode::new("no_end", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.end("yes_end");
Ok(())
})
.expect("build should succeed");
let mut initial_state = State::new();
initial_state.insert("flag".into(), serde_json::json!(true));
let result = SimpleExecutor::default()
.execute(Arc::new(graph), initial_state)
.await
.expect("execution should succeed");
assert_eq!(
result.state.get("result").unwrap(),
&serde_json::json!("yes")
);
}
#[tokio::test]
async fn test_task_node_error() {
let graph = build_graph("error", |g| {
let _ = g.start("fail");
let _ = g.node(
"fail",
NodeKind::Task(TaskNode::new("fail", |_ctx: &mut NodeContext<'_>| {
Err(GraphError::Terminal(TerminalError::StateError(
"boom".into(),
)))
})),
);
let _ = g.end("fail");
Ok(())
})
.expect("build should succeed");
let result = SimpleExecutor::default()
.execute(Arc::new(graph), State::new())
.await;
assert!(result.is_err());
match result.unwrap_err() {
GraphError::Terminal(TerminalError::StateError(msg)) => assert_eq!(msg, "boom"),
other => panic!("expected StateError, got: {other}"),
}
}
#[test]
fn test_cyclic_graph_allowed() {
let result = build_graph("cycle", |g| {
let _ = g.start("a");
let _ = g.node(
"a",
NodeKind::Task(TaskNode::new("a", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.node(
"b",
NodeKind::Task(TaskNode::new("b", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.edge("a", "b");
let _ = g.edge("b", "a");
let _ = g.end("b");
Ok(())
});
assert!(result.is_ok(), "cyclic graph should be allowed to build");
}
#[tokio::test]
async fn test_cyclic_graph_steps_exceeded() {
let graph = build_graph("infinite_cycle", |g| {
let _ = g.start("a");
let _ = g.node(
"a",
NodeKind::Task(TaskNode::new("a", |ctx: &mut NodeContext<'_>| {
let count = ctx
.state()
.get("count")
.and_then(|v| v.as_u64())
.unwrap_or(0);
ctx.record(StateMutation::Put(
"count".into(),
serde_json::json!(count + 1),
));
Ok(())
})),
);
let _ = g.node(
"b",
NodeKind::Task(TaskNode::new("b", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.node(
"done",
NodeKind::Task(TaskNode::new("done", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.edge("a", "b");
let _ = g.edge("b", "a");
let _ = g.end("done");
Ok(())
})
.expect("cyclic graph should build");
let executor = SimpleExecutor::new(5);
let result = executor.execute(Arc::new(graph), State::new()).await;
assert!(result.is_err());
match result.unwrap_err() {
GraphError::Terminal(TerminalError::StepsExceeded { limit }) => assert_eq!(limit, 5),
other => panic!("expected StepsExceeded, got: {other}"),
}
}
#[tokio::test]
async fn test_cyclic_graph_with_edge_if_exit() {
let graph = build_graph("cyclic_with_exit", |g| {
let _ = g.start("a");
let _ = g.node(
"a",
NodeKind::Task(TaskNode::new("a", |ctx: &mut NodeContext<'_>| {
let count = ctx
.state()
.get("count")
.and_then(|v| v.as_u64())
.unwrap_or(0);
ctx.record(StateMutation::Put(
"count".into(),
serde_json::json!(count + 1),
));
Ok(())
})),
);
let _ = g.node(
"b",
NodeKind::Task(TaskNode::new("b", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.node(
"end",
NodeKind::Task(TaskNode::new("end", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.edge("a", "b");
let _ = g.edge_if("b", "a", |s: &State| {
s.get("count")
.and_then(|v| v.as_u64())
.map(|c| c < 3)
.unwrap_or(true)
});
let _ = g.edge("b", "end");
let _ = g.end("end");
Ok(())
})
.expect("build should succeed");
let result = SimpleExecutor::default()
.execute(Arc::new(graph), State::new())
.await
.expect("execution should succeed");
assert_eq!(result.state.get("count").unwrap(), &serde_json::json!(3));
assert_eq!(result.execution_log.len(), 7);
}
#[tokio::test]
async fn test_condition_node_back_jump() {
let graph = build_graph("cond_back_jump", |g| {
let _ = g.start("a");
let _ = g.node(
"a",
NodeKind::Task(TaskNode::new("a", |ctx: &mut NodeContext<'_>| {
let count = ctx
.state()
.get("count")
.and_then(|v| v.as_u64())
.unwrap_or(0);
ctx.record(StateMutation::Put(
"count".into(),
serde_json::json!(count + 1),
));
Ok(())
})),
);
let _ = g.node(
"route",
NodeKind::Condition(
lellm_graph::ConditionNode::builder("route")
.branch("a", |s: &State| {
s.get("count")
.and_then(|v| v.as_u64())
.map(|c| c < 2)
.unwrap_or(true)
})
.branch("end", |_| true)
.build(),
),
);
let _ = g.node(
"end",
NodeKind::Task(TaskNode::new("end", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.edge("a", "route");
let _ = g.edge("route", "a");
let _ = g.edge("route", "end");
let _ = g.end("end");
Ok(())
})
.expect("build should succeed");
let result = SimpleExecutor::default()
.execute(Arc::new(graph), State::new())
.await
.expect("execution should succeed");
assert_eq!(result.state.get("count").unwrap(), &serde_json::json!(2));
}
#[test]
fn test_missing_node() {
let result = build_graph("missing", |g| {
let _ = g.start("a");
let _ = g.edge("a", "nonexistent");
let _ = g.end("nonexistent");
Ok(())
});
assert!(result.is_err());
}
#[test]
fn test_missing_start() {
let result = build_graph("no_start", |g| {
let _ = g.node(
"a",
NodeKind::Task(TaskNode::new("a", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.end("a");
Ok(())
});
assert!(result.is_err());
}
#[test]
fn test_missing_end() {
let result = build_graph("no_end", |g| {
let _ = g.start("a");
let _ = g.node(
"a",
NodeKind::Task(TaskNode::new("a", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
Ok(())
});
assert!(result.is_err());
}
#[tokio::test]
async fn test_execution_log() {
let graph = build_graph("log", |g| {
let _ = g.start("a");
let _ = g.node(
"a",
NodeKind::Task(TaskNode::new("a", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.node(
"b",
NodeKind::Task(TaskNode::new("b", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.edge("a", "b");
let _ = g.end("b");
Ok(())
})
.expect("build should succeed");
let result = SimpleExecutor::default()
.execute(Arc::new(graph), State::new())
.await
.expect("execution should succeed");
assert_eq!(result.execution_log.len(), 2);
assert!(result.execution_log.iter().all(|e| e.success));
assert!(result.duration.as_nanos() > 0);
}
#[tokio::test]
async fn test_barrier_blocked_mode_default_reject() {
let graph = build_graph("barrier_blocked", |g| {
let _ = g.start("barrier");
let _ = g.node("barrier", NodeKind::Barrier(BarrierNode::new("review")));
let _ = g.end("barrier");
Ok(())
})
.expect("build should succeed");
let result = SimpleExecutor::default()
.execute(Arc::new(graph), State::new())
.await;
assert!(
result.is_ok(),
"should complete with default reject decision"
);
}
#[tokio::test]
async fn test_barrier_approve() {
let graph = build_graph("approve_flow", |g| {
let _ = g.start("barrier");
let _ = g.node("barrier", NodeKind::Barrier(BarrierNode::new("review")));
let _ = g.node(
"after",
NodeKind::Task(TaskNode::new("after", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.edge("barrier", "after");
let _ = g.end("after");
Ok(())
})
.expect("build should succeed");
let GraphExecution { mut stream, handle } =
SimpleExecutor::default().execute_stream(Arc::new(graph), State::new());
loop {
let event = stream.recv().await.expect("stream should not close");
match event {
GraphEvent::BarrierWaiting {
node_name,
barrier_id,
..
} => {
assert_eq!(node_name, "barrier");
let _ = handle.decide(barrier_id, BarrierDecision::Approve).await;
}
GraphEvent::GraphComplete { .. } => {
break;
}
GraphEvent::GraphError { error, .. } => {
panic!("unexpected error: {error}");
}
_ => {}
}
}
}
#[tokio::test]
async fn test_barrier_reject_with_back_jump() {
let graph = build_graph("reject_flow", |g| {
let _ = g.start("task");
let _ = g.node(
"task",
NodeKind::Task(TaskNode::new("task", |ctx: &mut NodeContext<'_>| {
let count = ctx
.state()
.get("count")
.and_then(|v| v.as_u64())
.unwrap_or(0);
ctx.record(StateMutation::Put(
"count".into(),
serde_json::json!(count + 1),
));
Ok(())
})),
);
let _ = g.node("review", NodeKind::Barrier(BarrierNode::new("review")));
let _ = g.edge("task", "review");
let _ = g.edge_if("review", "task", |s: &State| {
s.get("review.reject_reason").is_some()
});
let _ = g.node(
"done",
NodeKind::Task(TaskNode::new("done", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.edge("review", "done");
let _ = g.end("done");
Ok(())
})
.expect("build should succeed");
let GraphExecution { mut stream, handle } =
SimpleExecutor::default().execute_stream(Arc::new(graph), State::new());
let mut reject_count = 0;
loop {
let event = stream.recv().await.expect("stream should not close");
match event {
GraphEvent::BarrierWaiting {
node_name,
barrier_id,
..
} => {
assert_eq!(node_name, "review");
reject_count += 1;
if reject_count == 1 {
let _ = handle
.decide(
barrier_id,
BarrierDecision::Reject {
reason: "需要改进".into(),
},
)
.await;
} else {
let _ = handle.decide(barrier_id, BarrierDecision::Approve).await;
}
}
GraphEvent::GraphComplete { .. } => {
break;
}
GraphEvent::GraphError { error, .. } => {
panic!("unexpected error: {error}");
}
_ => {}
}
}
}
#[tokio::test]
async fn test_barrier_modify() {
let graph = build_graph("modify_flow", |g| {
let _ = g.start("barrier");
let _ = g.node("barrier", NodeKind::Barrier(BarrierNode::new("input")));
let _ = g.node(
"after",
NodeKind::Task(TaskNode::new("after", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.edge("barrier", "after");
let _ = g.end("after");
Ok(())
})
.expect("build should succeed");
let GraphExecution { mut stream, handle } =
SimpleExecutor::default().execute_stream(Arc::new(graph), State::new());
loop {
let event = stream.recv().await.expect("stream should not close");
match event {
GraphEvent::BarrierWaiting { barrier_id, .. } => {
let _ = handle
.decide(
barrier_id,
BarrierDecision::Modify {
key: "user_input".into(),
value: serde_json::json!("人工补充的数据"),
},
)
.await;
}
GraphEvent::GraphComplete { .. } => {
break;
}
GraphEvent::GraphError { error, .. } => {
panic!("unexpected error: {error}");
}
_ => {}
}
}
}
#[tokio::test]
async fn test_barrier_timeout() {
let graph = build_graph("timeout_flow", |g| {
let _ = g.start("barrier");
let _ = g.node(
"barrier",
NodeKind::Barrier(
BarrierNode::new("review")
.timeout(Duration::from_millis(100))
.default_action(BarrierDefaultAction::Reject),
),
);
let _ = g.node(
"after",
NodeKind::Task(TaskNode::new("after", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.edge("barrier", "after");
let _ = g.end("after");
Ok(())
})
.expect("build should succeed");
let GraphExecution {
mut stream,
handle: _handle,
} = SimpleExecutor::default().execute_stream(Arc::new(graph), State::new());
loop {
let event = stream.recv().await.expect("stream should not close");
match event {
GraphEvent::BarrierWaiting { .. } => {
}
GraphEvent::GraphComplete { .. } => {
break;
}
GraphEvent::GraphError { ref error, .. } => {
panic!("unexpected error: {error}");
}
_ => {}
}
}
}
#[tokio::test]
async fn test_barrier_reroute() {
let graph = build_graph("reroute_flow", |g| {
let _ = g.start("barrier");
let _ = g.node("barrier", NodeKind::Barrier(BarrierNode::new("route")));
let _ = g.node(
"path_a",
NodeKind::Task(TaskNode::new("path_a", |ctx: &mut NodeContext<'_>| {
ctx.record(StateMutation::Put("path".into(), serde_json::json!("A")));
Ok(())
})),
);
let _ = g.node(
"path_b",
NodeKind::Task(TaskNode::new("path_b", |ctx: &mut NodeContext<'_>| {
ctx.record(StateMutation::Put("path".into(), serde_json::json!("B")));
Ok(())
})),
);
let _ = g.edge("barrier", "path_a");
let _ = g.edge("barrier", "path_b");
let _ = g.edge("path_a", "end");
let _ = g.edge("path_b", "end");
let _ = g.node(
"end",
NodeKind::Task(TaskNode::new("end", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.end("end");
Ok(())
})
.expect("build should succeed");
let GraphExecution { mut stream, handle } =
SimpleExecutor::default().execute_stream(Arc::new(graph), State::new());
loop {
let event = stream.recv().await.expect("stream should not close");
match event {
GraphEvent::BarrierWaiting { barrier_id, .. } => {
let _ = handle
.decide(
barrier_id,
BarrierDecision::Reroute {
target: "path_b".into(),
},
)
.await;
}
GraphEvent::GraphComplete { .. } => {
break;
}
GraphEvent::GraphError { error, .. } => {
panic!("unexpected error: {error}");
}
_ => {}
}
}
}
#[tokio::test]
async fn test_double_barrier_sequential() {
let graph = build_graph("double_barrier", |g| {
let _ = g.start("before_a");
let _ = g.node(
"before_a",
NodeKind::Task(TaskNode::new("before_a", |ctx: &mut NodeContext<'_>| {
ctx.record(StateMutation::Put(
"steps".into(),
serde_json::json!(Vec::<String>::new()),
));
Ok(())
})),
);
let _ = g.node(
"barrier_a",
NodeKind::Barrier(BarrierNode::new("barrier_a")),
);
let _ = g.node(
"between",
NodeKind::Task(TaskNode::new("between", |ctx: &mut NodeContext<'_>| {
let mut steps: Vec<String> = ctx
.state()
.get("steps")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or_default();
steps.push("passed_a".into());
ctx.record(StateMutation::Put(
"steps".into(),
serde_json::to_value(steps).unwrap(),
));
Ok(())
})),
);
let _ = g.node(
"barrier_b",
NodeKind::Barrier(BarrierNode::new("barrier_b")),
);
let _ = g.node(
"after_b",
NodeKind::Task(TaskNode::new("after_b", |ctx: &mut NodeContext<'_>| {
let mut steps: Vec<String> = ctx
.state()
.get("steps")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or_default();
steps.push("passed_b".into());
ctx.record(StateMutation::Put(
"steps".into(),
serde_json::to_value(steps).unwrap(),
));
Ok(())
})),
);
let _ = g.edge("before_a", "barrier_a");
let _ = g.edge("barrier_a", "between");
let _ = g.edge("between", "barrier_b");
let _ = g.edge("barrier_b", "after_b");
let _ = g.end("after_b");
Ok(())
})
.expect("build should succeed");
let GraphExecution { mut stream, handle } =
SimpleExecutor::default().execute_stream(Arc::new(graph), State::new());
loop {
let event = stream.recv().await.expect("stream should not close");
match event {
GraphEvent::BarrierWaiting { barrier_id, .. } => {
let _ = handle.decide(barrier_id, BarrierDecision::Approve).await;
}
GraphEvent::GraphComplete { .. } => {
break;
}
GraphEvent::GraphError { error, .. } => {
panic!("unexpected graph error: {error:?}");
}
_ => {}
}
}
}
#[test]
fn test_state_ext_getters() {
use lellm_graph::StateExt;
let mut state = State::new();
state.insert("name".into(), serde_json::json!("hello"));
state.insert("count".into(), serde_json::json!(42));
state.insert("enabled".into(), serde_json::json!(true));
state.insert("score".into(), serde_json::json!(3.14));
assert_eq!(state.get_str("name"), Some("hello"));
assert_eq!(state.get_u64("count"), Some(42));
assert_eq!(state.get_bool("enabled"), Some(true));
assert_eq!(state.get_f64("score"), Some(3.14));
assert_eq!(state.get_str("missing"), None);
assert!(state.contains("name"));
assert!(!state.contains("missing"));
}
#[test]
fn test_state_ext_set() {
use lellm_graph::StateExt;
let mut state = State::new();
state.set("count", 42u64);
state.set("name", "hello");
state.set("enabled", true);
assert_eq!(state.get_u64("count"), Some(42));
assert_eq!(state.get_str("name"), Some("hello"));
assert_eq!(state.get_bool("enabled"), Some(true));
}
#[test]
fn test_state_ext_remove() {
use lellm_graph::StateExt;
let mut state = State::new();
state.insert("key".into(), serde_json::json!("value"));
let removed = state.remove("key");
assert!(removed.is_some());
assert!(!state.contains("key"));
}
#[test]
fn test_state_ext_get_json() {
use lellm_graph::StateExt;
let mut state = State::new();
state.set("config", serde_json::json!({"nested": {"key": "value"}}));
let config: serde_json::Value = state.get_json("config").unwrap();
assert_eq!(config["nested"]["key"], "value");
let err = state.get_json::<String>("missing");
assert!(err.is_err());
}
#[test]
fn test_state_ext_append_array() {
use lellm_graph::StateExt;
let mut state = State::new();
state
.append_array("items", serde_json::json!([1, 2]))
.unwrap();
state
.append_array("items", serde_json::json!([3, 4]))
.unwrap();
let items = state.get("items").unwrap();
assert_eq!(items, &serde_json::json!([1, 2, 3, 4]));
}
#[test]
fn test_state_ext_reduce() {
use lellm_graph::StateExt;
let mut state = State::new();
state.insert("items".into(), serde_json::json!([1, 2]));
state
.append_array("items", serde_json::json!([3, 4]))
.unwrap();
let items = state.get("items").unwrap();
assert_eq!(items, &serde_json::json!([1, 2, 3, 4]));
}
#[tokio::test]
async fn test_edge_analysis_no_runtime_interference() {
let graph = build_graph("edge_analysis_ok", |g| {
let _ = g.start("a");
let _ = g.node(
"a",
NodeKind::Task(TaskNode::new("a", |ctx: &mut NodeContext<'_>| {
let count = ctx
.state()
.get("count")
.and_then(|v| v.as_u64())
.unwrap_or(0);
ctx.record(StateMutation::Put(
"count".into(),
serde_json::json!(count + 1),
));
Ok(())
})),
);
let _ = g.node(
"b",
NodeKind::Task(TaskNode::new("b", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.node(
"end",
NodeKind::Task(TaskNode::new("end", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.edge("a", "b");
let _ = g.edge_if("b", "a", |_| true).max_visits(5);
let _ = g.edge("b", "end");
let _ = g.end("end");
Ok(())
})
.expect("build should succeed");
let analysis = graph.analyze_cycles();
assert!(analysis.has_cycles);
assert!(analysis.all_protected());
}
#[test]
fn test_analyze_cycles_dag() {
let graph = build_graph("dag", |g| {
let _ = g.start("a");
let _ = g.node(
"a",
NodeKind::Task(TaskNode::new("a", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.node(
"b",
NodeKind::Task(TaskNode::new("b", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.edge("a", "b");
let _ = g.end("b");
Ok(())
})
.expect("build should succeed");
let analysis = graph.analyze_cycles();
assert!(!analysis.has_cycles);
assert!(analysis.cycles.is_empty());
assert!(analysis.all_protected());
}
#[test]
fn test_analyze_cycles_detected() {
let graph = build_graph("cycle", |g| {
let _ = g.start("a");
let _ = g.node(
"a",
NodeKind::Task(TaskNode::new("a", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.node(
"b",
NodeKind::Task(TaskNode::new("b", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.node(
"c",
NodeKind::Task(TaskNode::new("c", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.edge("a", "b");
let _ = g.edge("b", "c");
let _ = g.edge("c", "a");
let _ = g.end("a");
Ok(())
})
.expect("build should succeed");
let analysis = graph.analyze_cycles();
assert!(analysis.has_cycles);
assert!(!analysis.cycles.is_empty());
}
#[test]
fn test_analyze_cycles_protected() {
let graph = build_graph("protected_cycle", |g| {
let _ = g.start("a");
let _ = g.node(
"a",
NodeKind::Task(TaskNode::new("a", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.node(
"b",
NodeKind::Task(TaskNode::new("b", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.edge("a", "b");
let _ = g.edge("b", "a").max_visits(5);
let _ = g.edge("b", "end");
let _ = g.node(
"end",
NodeKind::Task(TaskNode::new("end", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.end("end");
Ok(())
})
.expect("build should succeed");
let analysis = graph.analyze_cycles();
assert!(analysis.has_cycles);
assert!(analysis.all_protected());
}
#[test]
fn test_analyze_cycles_report() {
let graph = build_graph("report_test", |g| {
let _ = g.start("a");
let _ = g.node(
"a",
NodeKind::Task(TaskNode::new("a", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.node(
"b",
NodeKind::Task(TaskNode::new("b", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.edge("a", "b");
let _ = g.edge("b", "a");
let _ = g.end("a");
Ok(())
})
.expect("build should succeed");
let analysis = graph.analyze_cycles();
let report = analysis.report();
assert!(report.contains("Cycle Analysis"));
assert!(report.contains("cycle"));
}
#[test]
fn test_trace_id_uniqueness() {
let id1 = TraceId::new();
let id2 = TraceId::new();
assert_ne!(id1.to_string(), id2.to_string());
}
#[tokio::test]
async fn test_stream_has_span_id() {
let graph = build_graph("trace_test", |g| {
let _ = g.start("a");
let _ = g.node(
"a",
NodeKind::Task(TaskNode::new("a", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.node(
"b",
NodeKind::Task(TaskNode::new("b", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.edge("a", "b");
let _ = g.end("b");
Ok(())
})
.expect("build should succeed");
let GraphExecution {
mut stream,
handle: _handle,
} = SimpleExecutor::default().execute_stream(Arc::new(graph), State::new());
let mut span_ids = Vec::new();
loop {
let event = stream.recv().await.expect("stream should not close");
match event {
GraphEvent::NodeStart { span_id, .. } => {
span_ids.push(span_id);
}
GraphEvent::NodeEnd { span_id, .. } => {
span_ids.push(span_id);
}
GraphEvent::GraphComplete { .. } => {
break;
}
_ => {}
}
}
assert!(span_ids.len() >= 4);
}
#[tokio::test]
async fn test_goto_edge_with_analysis() {
let graph = build_graph("goto_analysis", |g| {
let _ = g.start("a");
let _ = g.node(
"a",
NodeKind::Task(TaskNode::new("a", |ctx: &mut NodeContext<'_>| {
let count = ctx
.state()
.get("count")
.and_then(|v| v.as_u64())
.unwrap_or(0);
ctx.record(StateMutation::Put(
"count".into(),
serde_json::json!(count + 1),
));
Ok(())
})),
);
let _ = g.node(
"route",
NodeKind::Condition(
lellm_graph::ConditionNode::builder("route")
.branch("a", |s: &State| {
s.get("count")
.and_then(|v| v.as_u64())
.map(|c| c < 2)
.unwrap_or(true)
})
.branch("end", |_| true)
.build(),
),
);
let _ = g.node(
"end",
NodeKind::Task(TaskNode::new("end", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.edge("a", "route");
let _ = g.edge_if("route", "a", |_| true);
let _ = g.edge("route", "end");
let _ = g.end("end");
Ok(())
})
.expect("build should succeed");
let result = SimpleExecutor::default()
.execute(Arc::new(graph), State::new())
.await
.expect("execution should succeed");
assert_eq!(result.state.get("count").unwrap(), &serde_json::json!(2));
}
#[tokio::test]
async fn test_goto_missing_edge_error() {
let graph = build_graph("missing_edge", |g| {
let _ = g.start("route");
let _ = g.node(
"route",
NodeKind::Condition(
lellm_graph::ConditionNode::builder("route")
.branch("nonexistent", |_| true)
.build(),
),
);
let _ = g.node(
"end",
NodeKind::Task(TaskNode::new("end", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.edge("route", "end");
let _ = g.end("end");
Ok(())
})
.expect("build should succeed");
let result = SimpleExecutor::default()
.execute(Arc::new(graph), State::new())
.await;
assert!(result.is_err());
match result.unwrap_err() {
GraphError::Terminal(TerminalError::NodeNotFound(name)) => {
assert_eq!(name, "nonexistent");
}
GraphError::Terminal(TerminalError::MissingEdge { from, to }) => {
assert_eq!(from, "route");
assert_eq!(to, "nonexistent");
}
other => panic!("expected NodeNotFound or MissingEdge, got: {other}"),
}
}
#[test]
fn test_statekey_basic_read_write() {
use lellm_graph::StateKeyExt;
let mut state = State::new();
state.set_sk(&SK_COUNT, 42u64);
assert_eq!(state.get_sk(&SK_COUNT), Some(42u64));
assert_eq!(state.require_sk(&SK_COUNT).unwrap(), 42u64);
}
#[test]
fn test_statekey_type_mismatch() {
use lellm_graph::StateKeyExt;
let mut state = State::new();
state.set_sk(&SK_COUNT, 42u64);
const SK_COUNT_AS_STRING: StateKey<String> =
StateKey::new("count", lellm_graph::Reducer::Replace);
assert_eq!(state.get_sk::<String>(&SK_COUNT_AS_STRING), None);
let err = state.require_sk::<String>(&SK_COUNT_AS_STRING);
assert!(matches!(
err,
Err(lellm_graph::StateError::Deserialize(_, _))
));
}
#[test]
fn test_statekey_missing_key() {
use lellm_graph::StateKeyExt;
let state = State::new();
let err = state.require_sk(&SK_COUNT);
assert!(matches!(err, Err(lellm_graph::StateError::MissingKey(_))));
}
#[test]
fn test_statekey_contains_remove() {
use lellm_graph::StateKeyExt;
let mut state = State::new();
state.set_sk(&SK_STEPS, vec!["step1".to_string()]);
assert!(state.contains_sk(&SK_STEPS));
assert!(!state.contains_sk(&SK_COUNT));
let removed = state.remove_sk(&SK_STEPS);
assert!(removed.is_some());
assert!(!state.contains_sk(&SK_STEPS));
}
#[test]
fn test_statekey_coexist_with_stateext() {
use lellm_graph::{StateExt, StateKeyExt};
let mut state = State::new();
state.set_sk(&SK_COUNT, 100u64);
state.set("legacy_flag", true);
assert_eq!(state.get_sk(&SK_COUNT), Some(100u64));
assert_eq!(state.get_bool("legacy_flag"), Some(true));
}
#[tokio::test]
async fn test_statekey_in_graph_execution() {
use lellm_graph::StateKeyExt;
const SK_RESULT: StateKey<String> = StateKey::new("result", lellm_graph::Reducer::Replace);
let graph = build_graph("statekey_graph", |g| {
let _ = g.start("set");
let _ = g.node(
"set",
NodeKind::Task(TaskNode::new("set", |ctx: &mut NodeContext<'_>| {
ctx.record(StateMutation::Put("count".into(), serde_json::json!(0u64)));
ctx.record(StateMutation::Put(
"result".into(),
serde_json::json!("pending"),
));
Ok(())
})),
);
let _ = g.node(
"increment",
NodeKind::Task(TaskNode::new("increment", |ctx: &mut NodeContext<'_>| {
let count: u64 = ctx
.state()
.get("count")
.and_then(|v| v.as_u64())
.unwrap_or(0);
ctx.record(StateMutation::Put(
"count".into(),
serde_json::json!(count + 1),
));
if count + 1 >= 3 {
ctx.record(StateMutation::Put(
"result".into(),
serde_json::json!("done"),
));
}
Ok(())
})),
);
let _ = g.node(
"check",
NodeKind::Condition(
lellm_graph::ConditionNode::builder("check")
.branch("increment", |s: &State| {
s.get_sk(&SK_COUNT).unwrap_or(0) < 3
})
.branch("end", |_| true)
.build(),
),
);
let _ = g.node(
"end",
NodeKind::Task(TaskNode::new("end", |ctx: &mut NodeContext<'_>| {
let result = ctx
.state()
.get("result")
.and_then(|v| v.as_str())
.unwrap()
.to_string();
assert_eq!(result, "done");
Ok(())
})),
);
let _ = g.edge("set", "increment");
let _ = g.edge("increment", "check");
let _ = g.edge("check", "increment");
let _ = g.edge("check", "end");
let _ = g.end("end");
Ok(())
})
.expect("build should succeed");
let result = SimpleExecutor::default()
.execute(Arc::new(graph), State::new())
.await
.expect("execution should succeed");
assert_eq!(result.state.get_sk(&SK_COUNT).unwrap(), 3u64);
assert_eq!(result.state.get_sk(&SK_RESULT).unwrap(), "done".to_string());
}
#[tokio::test]
async fn test_trace_id_full_lifecycle() {
let graph = build_graph("trace_lifecycle", |g| {
let _ = g.start("a");
let _ = g.node(
"a",
NodeKind::Task(TaskNode::new("a", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.node(
"b",
NodeKind::Task(TaskNode::new("b", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.edge("a", "b");
let _ = g.end("b");
Ok(())
})
.expect("build should succeed");
let GraphExecution {
mut stream,
handle: _handle,
} = SimpleExecutor::default().execute_stream(Arc::new(graph), State::new());
let mut trace_id_from_start = None;
let mut trace_ids_from_nodes = Vec::new();
let mut node_count = 0;
loop {
let event = stream.recv().await.expect("stream should not close");
match event {
GraphEvent::GraphStart { trace_id } => {
trace_id_from_start = Some(trace_id);
}
GraphEvent::NodeStart { trace_id, .. } => {
trace_ids_from_nodes.push(trace_id);
node_count += 1;
}
GraphEvent::NodeEnd { trace_id, .. } => {
trace_ids_from_nodes.push(trace_id);
}
GraphEvent::GraphComplete { result } => {
assert_eq!(result.trace_id, trace_id_from_start.unwrap());
break;
}
GraphEvent::GraphError { error, .. } => {
panic!("unexpected error: {error}");
}
_ => {}
}
}
let start_trace = trace_id_from_start.unwrap();
for node_trace in trace_ids_from_nodes {
assert_eq!(
node_trace, start_trace,
"all node events should share the same trace_id"
);
}
assert!(node_count >= 2);
}
#[tokio::test]
async fn test_trace_id_blocking_mode() {
let graph = build_graph("trace_blocking", |g| {
let _ = g.start("a");
let _ = g.node(
"a",
NodeKind::Task(TaskNode::new("a", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.end("a");
Ok(())
})
.expect("build should succeed");
let result = SimpleExecutor::default()
.execute(Arc::new(graph), State::new())
.await
.expect("execution should succeed");
let trace_str = result.trace_id.to_string();
assert!(!trace_str.is_empty(), "trace_id should not be empty");
assert_eq!(
trace_str.matches('-').count(),
4,
"trace_id should be UUID format"
);
}
#[tokio::test]
async fn test_fallback_control_flow() {
use async_trait::async_trait;
use std::sync::Arc;
struct FallbackNode;
#[async_trait]
impl lellm_graph::FlowNode for FallbackNode {
async fn execute(&self, _ctx: &mut NodeContext<'_>) -> Result<(), GraphError> {
Err(GraphError::Terminal(TerminalError::NodeExecutionFailed {
node: "fallback_node".into(),
source: "node failed".into(),
}))
}
}
let graph = build_graph("fallback_flow", |g| {
let _ = g.start("fallback_node");
let _ = g.node("fallback_node", NodeKind::External(Arc::new(FallbackNode)));
let _ = g.node(
"fallback_target",
NodeKind::Task(TaskNode::new(
"fallback_target",
|ctx: &mut NodeContext<'_>| {
ctx.record(StateMutation::Put(
"recovered".into(),
serde_json::json!(true),
));
Ok(())
},
)),
);
let _ = g.node(
"end",
NodeKind::Task(TaskNode::new("end", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.edge_fallback("fallback_node", "fallback_target");
let _ = g.edge("fallback_target", "end");
let _ = g.end("end");
Ok(())
})
.expect("build should succeed");
let GraphExecution { mut stream, handle } =
SimpleExecutor::default().execute_stream(Arc::new(graph), State::new());
drop(handle);
let mut has_error = false;
while let Some(event) = stream.recv().await {
match &event {
GraphEvent::GraphError { error, .. } => {
assert!(
format!("{}", error).contains("fallback_node"),
"error should mention fallback_node: {}",
error
);
has_error = true;
}
GraphEvent::GraphComplete { .. } => {
panic!("should not complete when node fails");
}
_ => {}
}
}
assert!(has_error, "should receive GraphError");
}
#[tokio::test]
async fn test_fallback_no_edge() {
use async_trait::async_trait;
use std::sync::Arc;
struct FailingNode;
#[async_trait]
impl lellm_graph::FlowNode for FailingNode {
async fn execute(&self, _ctx: &mut NodeContext<'_>) -> Result<(), GraphError> {
Err(GraphError::Terminal(TerminalError::NodeExecutionFailed {
node: "failing_node".into(),
source: "intentional failure".into(),
}))
}
}
let graph = build_graph("no_fallback", |g| {
let _ = g.start("failing_node");
let _ = g.node("failing_node", NodeKind::External(Arc::new(FailingNode)));
let _ = g.node(
"end",
NodeKind::Task(TaskNode::new("end", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.edge("failing_node", "end");
let _ = g.end("end");
Ok(())
})
.expect("build should succeed");
let GraphExecution { mut stream, handle } =
SimpleExecutor::default().execute_stream(Arc::new(graph), State::new());
drop(handle);
let mut has_error = false;
while let Some(event) = stream.recv().await {
match &event {
GraphEvent::GraphError { error, .. } => {
assert!(
format!("{}", error).contains("failing_node"),
"error should mention failing_node: {}",
error
);
has_error = true;
}
GraphEvent::GraphComplete { .. } => {
panic!("should not complete when node fails");
}
_ => {}
}
}
assert!(has_error, "should receive GraphError");
}
#[tokio::test]
async fn test_graph_cancel() {
let graph = build_graph("cancel_test", |g| {
let _ = g.start("barrier");
let _ = g.node(
"barrier",
NodeKind::Barrier(
lellm_graph::BarrierNode::new("review").timeout(std::time::Duration::from_secs(60)),
),
);
let _ = g.end("barrier");
Ok(())
})
.expect("build should succeed");
let GraphExecution { mut stream, handle } =
SimpleExecutor::default().execute_stream(Arc::new(graph), State::new());
loop {
let event = stream.recv().await.expect("stream should not close");
match event {
GraphEvent::BarrierWaiting { .. } => {
handle.cancel();
break;
}
GraphEvent::GraphError { error, .. } => {
match error {
lellm_graph::GraphError::Terminal(
lellm_graph::TerminalError::BarrierCancelled { .. },
) => return, _ => panic!("unexpected error: {error:?}"),
}
}
_ => {}
}
}
loop {
let event = stream.recv().await.expect("stream should not close");
match event {
GraphEvent::GraphError { error, .. } => {
match error {
lellm_graph::GraphError::Terminal(
lellm_graph::TerminalError::BarrierCancelled { .. },
) => {} _ => panic!("unexpected error: {error:?}"),
}
return;
}
GraphEvent::GraphComplete { .. } => {
return;
}
_ => {}
}
}
}
#[tokio::test]
async fn test_decide_wildcard() {
let graph = build_graph("wildcard_test", |g| {
let _ = g.start("before");
let _ = g.node(
"before",
NodeKind::Task(TaskNode::new("before", |ctx: &mut NodeContext<'_>| {
ctx.record(StateMutation::Put(
"steps".into(),
serde_json::json!(Vec::<String>::new()),
));
Ok(())
})),
);
let _ = g.node(
"barrier",
NodeKind::Barrier(lellm_graph::BarrierNode::new("review")),
);
let _ = g.node(
"between",
NodeKind::Task(TaskNode::new("between", |ctx: &mut NodeContext<'_>| {
let mut steps: Vec<String> = ctx
.state()
.get("steps")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or_default();
steps.push("step1".into());
ctx.record(StateMutation::Put(
"steps".into(),
serde_json::to_value(steps).unwrap(),
));
Ok(())
})),
);
let _ = g.node(
"barrier2",
NodeKind::Barrier(lellm_graph::BarrierNode::new("review")),
);
let _ = g.node(
"done",
NodeKind::Task(TaskNode::new("done", |ctx: &mut NodeContext<'_>| {
let mut steps: Vec<String> = ctx
.state()
.get("steps")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or_default();
steps.push("step2".into());
ctx.record(StateMutation::Put(
"steps".into(),
serde_json::to_value(steps).unwrap(),
));
Ok(())
})),
);
let _ = g.edge("before", "barrier");
let _ = g.edge("barrier", "between");
let _ = g.edge("between", "barrier2");
let _ = g.edge("barrier2", "done");
let _ = g.end("done");
Ok(())
})
.expect("build should succeed");
let GraphExecution { mut stream, handle } =
SimpleExecutor::default().execute_stream(Arc::new(graph), State::new());
let mut barrier_count = 0;
loop {
let event = stream.recv().await.expect("stream should not close");
match event {
GraphEvent::BarrierWaiting { node_name, .. } => {
assert!(node_name == "barrier" || node_name == "barrier2");
barrier_count += 1;
if barrier_count == 1 {
let _ = handle
.decide_wildcard("review", BarrierDecision::Approve)
.await;
}
}
GraphEvent::GraphComplete { result } => {
let steps: Vec<String> = result.state.get_json("steps").unwrap();
assert_eq!(steps, vec!["step1", "step2"]);
break;
}
GraphEvent::GraphError { error, .. } => {
panic!("unexpected error: {error:?}");
}
_ => {}
}
}
}
#[test]
fn test_append_array_non_array_error() {
use lellm_graph::StateExt;
let mut state = State::new();
state.insert("items".into(), serde_json::json!("not_an_array"));
let err = state.append_array("items", serde_json::json!([1, 2]));
assert!(err.is_err());
assert!(err.unwrap_err().contains("existing value is not an array"));
}
#[test]
fn test_build_errors_multiple() {
let result = build_graph("multi_error", |g| {
let _ = g.start("a");
let _ = g.end("b");
let _ = g.edge("a", "nonexistent");
let _ = g.edge("also_nonexistent", "b");
Ok(())
});
assert!(result.is_err());
if let Err(errors) = result {
assert!(
errors.0.len() >= 2,
"expected multiple errors, got: {:?}",
errors.0
);
for e in &errors.0 {
assert!(
matches!(e, BuildError::MissingNode { .. }),
"expected MissingNode, got: {:?}",
e
);
}
}
}
#[test]
fn test_build_duplicate_node_warning() {
let result = build_graph("dup_node", |g| {
let _ = g.start("a");
let _ = g.node(
"a",
NodeKind::Task(TaskNode::new("a", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.node(
"a",
NodeKind::Task(TaskNode::new("a", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.end("a");
Ok(())
});
assert!(result.is_ok());
}
#[test]
fn test_build_warning_not_fatal() {
let result = build_graph("warning_test", |g| {
let _ = g.start("a");
let _ = g.node(
"a",
NodeKind::Task(TaskNode::new("a", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.node(
"b",
NodeKind::Task(TaskNode::new("b", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.edge_if("a", "b", |_| true);
let _ = g.edge_if("a", "b", |_| false);
let _ = g.end("b");
Ok(())
});
assert!(result.is_ok());
}
#[test]
fn test_build_errors_display() {
let result = build_graph("display_test", |g| {
let _ = g.edge("x", "y");
Ok(())
});
assert!(result.is_err());
if let Err(errors) = result {
let display = format!("{}", errors);
assert!(display.contains("error(s)"), "should show error count");
}
}
#[tokio::test]
async fn test_consumer_drop_cancels_execution() {
let graph = build_graph("consumer_drop", |g| {
let _ = g.start("a");
let _ = g.node(
"a",
NodeKind::Task(TaskNode::new("a", |ctx: &mut NodeContext<'_>| {
let count = ctx
.state()
.get("count")
.and_then(|v| v.as_u64())
.unwrap_or(0);
ctx.record(StateMutation::Put(
"count".into(),
serde_json::json!(count + 1),
));
Ok(())
})),
);
let _ = g.node(
"b",
NodeKind::Task(TaskNode::new("b", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.node(
"c",
NodeKind::Task(TaskNode::new("c", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.node(
"d",
NodeKind::Task(TaskNode::new("d", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.node(
"e",
NodeKind::Task(TaskNode::new("e", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.edge("a", "b");
let _ = g.edge("b", "c");
let _ = g.edge("c", "d");
let _ = g.edge("d", "e");
let _ = g.end("e");
Ok(())
})
.expect("build should succeed");
let GraphExecution {
mut stream,
handle: _handle,
} = SimpleExecutor::default().execute_stream(Arc::new(graph), State::new());
let mut received = 0;
loop {
match tokio::time::timeout(std::time::Duration::from_secs(2), stream.recv()).await {
Ok(Some(_event)) => {
received += 1;
if received >= 2 {
drop(stream);
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
return; }
}
Ok(None) => {
return;
}
Err(_) => {
panic!("stream recv timeout — executor may be stuck");
}
}
}
}
#[test]
fn test_end_node_outgoing_edge_warning() {
let result = build_graph("end_outgoing", |g| {
let _ = g.start("a");
let _ = g.node(
"a",
NodeKind::Task(TaskNode::new("a", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.node(
"end",
NodeKind::Task(TaskNode::new("end", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.node(
"after_end",
NodeKind::Task(TaskNode::new("after_end", |_ctx: &mut NodeContext<'_>| {
Ok(())
})),
);
let _ = g.edge("a", "end");
let _ = g.edge("end", "after_end");
let _ = g.end("end");
Ok(())
});
assert!(
result.is_ok(),
"end node outgoing edges should not block build"
);
}
#[test]
fn test_end_node_no_outgoing_edge() {
let result = build_graph("end_no_outgoing", |g| {
let _ = g.start("a");
let _ = g.node(
"a",
NodeKind::Task(TaskNode::new("a", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.node(
"end",
NodeKind::Task(TaskNode::new("end", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.edge("a", "end");
let _ = g.end("end");
Ok(())
});
assert!(result.is_ok());
}
#[tokio::test]
async fn test_end_node_stops_execution() {
let graph = build_graph("end_stops", |g| {
let _ = g.start("a");
let _ = g.node(
"a",
NodeKind::Task(TaskNode::new("a", |ctx: &mut NodeContext<'_>| {
ctx.record(StateMutation::Put(
"visited_a".into(),
serde_json::json!(true),
));
Ok(())
})),
);
let _ = g.node(
"end",
NodeKind::Task(TaskNode::new("end", |ctx: &mut NodeContext<'_>| {
ctx.record(StateMutation::Put(
"visited_end".into(),
serde_json::json!(true),
));
Ok(())
})),
);
let _ = g.node(
"unreachable",
NodeKind::Task(TaskNode::new("unreachable", |ctx: &mut NodeContext<'_>| {
ctx.record(StateMutation::Put(
"visited_unreachable".into(),
serde_json::json!(true),
));
Ok(())
})),
);
let _ = g.edge("a", "end");
let _ = g.edge("end", "unreachable"); let _ = g.end("end");
Ok(())
})
.expect("build should succeed");
let result = SimpleExecutor::default()
.execute(Arc::new(graph), State::new())
.await
.expect("execution should succeed");
assert_eq!(
result.state.get("visited_a").unwrap(),
&serde_json::json!(true)
);
assert_eq!(
result.state.get("visited_end").unwrap(),
&serde_json::json!(true)
);
assert!(
result.state.get("visited_unreachable").is_none(),
"unreachable node should not be executed"
);
assert_eq!(result.execution_log.len(), 2);
}
#[test]
fn test_analyze_dag_clean() {
let graph = build_graph("dag", |g| {
let _ = g.start("a");
let _ = g.node(
"a",
NodeKind::Task(TaskNode::new("a", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.node(
"b",
NodeKind::Task(TaskNode::new("b", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.node(
"end",
NodeKind::Task(TaskNode::new("end", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.edge("a", "b");
let _ = g.edge("b", "end");
let _ = g.end("end");
Ok(())
})
.expect("build should succeed");
let diag = graph.analyze();
assert!(
diag.warnings.is_empty(),
"DAG should have no warnings, got: {:?}",
diag.warnings
);
}
#[test]
fn test_analyze_unprotected_cycle() {
let graph = build_graph("cycle", |g| {
let _ = g.start("a");
let _ = g.node(
"a",
NodeKind::Task(TaskNode::new("a", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.node(
"b",
NodeKind::Task(TaskNode::new("b", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.node(
"end",
NodeKind::Task(TaskNode::new("end", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.edge("a", "b");
let _ = g.edge("b", "a"); let _ = g.edge("b", "end");
let _ = g.end("end");
Ok(())
})
.expect("build should succeed");
let diag = graph.analyze();
let cycle_warnings: Vec<&Diagnostic> = diag
.warnings
.iter()
.filter(|w| w.category == DiagnosticCategory::Cycle)
.collect();
assert!(
!cycle_warnings.is_empty(),
"Should detect unprotected cycle"
);
}
#[test]
fn test_analyze_unreachable_node() {
let graph = build_graph("unreachable", |g| {
let _ = g.start("a");
let _ = g.node(
"a",
NodeKind::Task(TaskNode::new("a", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.node(
"end",
NodeKind::Task(TaskNode::new("end", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.node(
"orphan",
NodeKind::Task(TaskNode::new("orphan", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.edge("a", "end");
let _ = g.end("end");
Ok(())
})
.expect("build should succeed");
let diag = graph.analyze();
let unreachable_infos: Vec<&Diagnostic> = diag
.infos
.iter()
.filter(|i| i.category == DiagnosticCategory::Unreachable)
.collect();
assert!(
!unreachable_infos.is_empty(),
"Should detect unreachable node 'orphan'"
);
}
#[test]
fn test_analyze_end_node_outgoing() {
let graph = build_graph("end-outgoing", |g| {
let _ = g.start("a");
let _ = g.node(
"a",
NodeKind::Task(TaskNode::new("a", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.node(
"end",
NodeKind::Task(TaskNode::new("end", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.node(
"extra",
NodeKind::Task(TaskNode::new("extra", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.edge("a", "end");
let _ = g.edge("end", "extra"); let _ = g.end("end");
Ok(())
})
.expect("build should succeed");
let diag = graph.analyze();
let end_outgoing: Vec<&Diagnostic> = diag
.infos
.iter()
.filter(|i| i.category == DiagnosticCategory::EndNodeOutgoing)
.collect();
assert!(
!end_outgoing.is_empty(),
"Should detect end node has outgoing edges"
);
}
#[test]
fn test_analyze_fallback_in_cycle() {
let graph = build_graph("fallback-cycle", |g| {
let _ = g.start("a");
let _ = g.node(
"a",
NodeKind::Task(TaskNode::new("a", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.node(
"b",
NodeKind::Task(TaskNode::new("b", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.node(
"end",
NodeKind::Task(TaskNode::new("end", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.edge("a", "b");
let _ = g.edge_fallback("b", "a"); let _ = g.edge("b", "end");
let _ = g.end("end");
Ok(())
})
.expect("build should succeed");
let diag = graph.analyze();
let cycle_warnings: Vec<&Diagnostic> = diag
.warnings
.iter()
.filter(|w| w.category == DiagnosticCategory::Cycle)
.collect();
assert!(!cycle_warnings.is_empty(), "Should detect cycle");
let fallback_warnings: Vec<&Diagnostic> = diag
.warnings
.iter()
.filter(|w| w.category == DiagnosticCategory::FallbackInCycle)
.collect();
assert!(
!fallback_warnings.is_empty(),
"Should detect fallback edge in cycle"
);
}
#[test]
fn test_analyze_protected_cycle() {
let graph = build_graph("protected-cycle", |g| {
let _ = g.start("a");
let _ = g.node(
"a",
NodeKind::Task(TaskNode::new("a", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.node(
"b",
NodeKind::Task(TaskNode::new("b", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.node(
"end",
NodeKind::Task(TaskNode::new("end", |_ctx: &mut NodeContext<'_>| Ok(()))),
);
let _ = g.edge("a", "b");
let _ = g.edge("b", "a").max_visits(5); let _ = g.edge("b", "end");
let _ = g.end("end");
Ok(())
})
.expect("build should succeed");
let diag = graph.analyze();
let cycle_warnings: Vec<&Diagnostic> = diag
.warnings
.iter()
.filter(|w| w.category == DiagnosticCategory::Cycle)
.collect();
assert!(
cycle_warnings.is_empty(),
"Protected cycle should not produce warnings"
);
let cycle_infos: Vec<&Diagnostic> = diag
.infos
.iter()
.filter(|i| i.category == DiagnosticCategory::Cycle)
.collect();
assert!(
!cycle_infos.is_empty(),
"Protected cycle should produce info"
);
}