use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use either::Either;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::json;
use crate::clients::{
Client, ClientError, ClientFactory, ClientOptions, ClientOutput, ClientResponse, Message,
Provider, ToolCall,
};
use crate::commons::{Agent, AgentConfig};
use crate::context::{Context, FlowConf};
use crate::flows::flows::{Flow, FlowError, FlowGraph, FlowRuntime, RunOut};
use crate::tools::{Tool, ToolBox, ToolError};
struct MockClientHandle {
responses: Arc<Mutex<VecDeque<ClientResponse>>>,
}
#[async_trait]
impl Client for MockClientHandle {
async fn execute(&self, _messages: &[Message]) -> Result<ClientResponse, ClientError> {
self.responses
.lock()
.unwrap()
.pop_front()
.ok_or_else(|| ClientError::Llm("mock: response queue exhausted".into()))
}
}
struct MockFactory {
responses: Arc<Mutex<VecDeque<ClientResponse>>>,
}
impl MockFactory {
fn new(responses: Vec<ClientResponse>) -> Self {
Self {
responses: Arc::new(Mutex::new(responses.into())),
}
}
}
impl ClientFactory for MockFactory {
fn create(&self, _url: &str, _opts: ClientOptions) -> Result<Box<dyn Client>, ClientError> {
Ok(Box::new(MockClientHandle {
responses: Arc::clone(&self.responses),
}))
}
}
fn structured(val: serde_json::Value) -> ClientResponse {
ClientResponse::new(Provider::OpenAi, ClientOutput::Output(val))
}
fn tool_calls(calls: Vec<ToolCall>) -> ClientResponse {
ClientResponse::new(
Provider::OpenAi,
ClientOutput::ToolCalls {
thought: None,
calls,
},
)
}
fn call(name: &str, args: serde_json::Value) -> ToolCall {
ToolCall {
id: format!("id-{name}"),
name: name.to_string(),
args,
thought_signatures: None,
}
}
fn ctx() -> Context {
Context::new(FlowConf {
working_dir: Some(std::env::temp_dir()),
..Default::default()
})
}
macro_rules! run_to_done {
($runtime:expr) => {{
let c = ctx();
loop {
match $runtime.next(c.clone()).await.expect("next() failed") {
RunOut::Continue => {}
RunOut::Done(v) => break v,
RunOut::Suspend { .. } => panic!("unexpected suspension"),
}
}
}};
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq)]
struct WkA {
val: i32,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq)]
struct WkB {
val: i32,
}
impl Flow for WkA {
type Output = WkB;
fn build() -> Result<FlowGraph, FlowError> {
FlowGraph::builder()
.work::<WkA, WkB, _, _>(|a, _| async move { Ok(WkB { val: a.val * 2 }) })
.build()
}
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
struct WkChainIn {
val: i32,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq)]
struct WkChainMid {
val: i32,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq)]
struct WkChainOut {
val: i32,
}
impl Flow for WkChainIn {
type Output = WkChainOut;
fn build() -> Result<FlowGraph, FlowError> {
FlowGraph::builder()
.work::<WkChainIn, WkChainMid, _, _>(|a, _| async move {
Ok(WkChainMid { val: a.val + 1 })
})
.work::<WkChainMid, WkChainOut, _, _>(|b, _| async move {
Ok(WkChainOut { val: b.val * 3 })
})
.build()
}
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
struct WkErrIn {
val: i32,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
struct WkErrOut {
val: i32,
}
impl Flow for WkErrIn {
type Output = WkErrOut;
fn build() -> Result<FlowGraph, FlowError> {
FlowGraph::builder()
.work::<WkErrIn, WkErrOut, _, _>(|_, _| async move {
Err(FlowError::AgentError("deliberate error".into()))
})
.build()
}
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
struct WkSame {
val: i32,
}
impl Flow for WkSame {
type Output = WkSame;
fn build() -> Result<FlowGraph, FlowError> {
FlowGraph::builder()
.work::<WkSame, WkSame, _, _>(|a, _| async move { Ok(WkSame { val: a.val }) })
.build()
}
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
struct WkDupIn {
val: i32,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
struct WkDupOut {
val: i32,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
struct WkDupOut2 {
val: i32,
}
impl Flow for WkDupIn {
type Output = WkDupOut;
fn build() -> Result<FlowGraph, FlowError> {
FlowGraph::builder()
.work::<WkDupIn, WkDupOut, _, _>(|a, _| async move { Ok(WkDupOut { val: a.val }) })
.work::<WkDupIn, WkDupOut2, _, _>(|a, _| async move { Ok(WkDupOut2 { val: a.val }) })
.build()
}
}
#[tokio::test]
async fn work_basic() {
let mut rt = FlowRuntime::new(WkA { val: 3 }).unwrap();
assert!(matches!(rt.next(ctx()).await.unwrap(), RunOut::Continue));
assert_eq!(run_to_done!(rt), WkB { val: 6 });
}
#[tokio::test]
async fn work_chain() {
let mut rt = FlowRuntime::new(WkChainIn { val: 4 }).unwrap();
assert!(matches!(rt.next(ctx()).await.unwrap(), RunOut::Continue));
assert!(matches!(rt.next(ctx()).await.unwrap(), RunOut::Continue));
let out = run_to_done!(rt);
assert_eq!(out, WkChainOut { val: 15 });
}
#[tokio::test]
async fn work_error_propagates() {
let mut rt = FlowRuntime::new(WkErrIn { val: 0 }).unwrap();
let err = rt.next(ctx()).await.unwrap_err();
assert!(matches!(err, FlowError::AgentError(ref s) if s.contains("deliberate error")));
}
#[tokio::test]
async fn work_same_type_rejected_at_build() {
let err = FlowRuntime::new(WkSame { val: 0 }).unwrap_err();
match err {
FlowError::Invalid(problems) => {
assert!(problems.iter().any(|p| p.contains("exit_name equals input name")));
}
other => panic!("expected Invalid, got {other:?}"),
}
}
#[tokio::test]
async fn work_duplicate_node_rejected() {
let err = FlowRuntime::new(WkDupIn { val: 0 }).unwrap_err();
match err {
FlowError::Invalid(problems) => {
assert!(problems.iter().any(|p| p.contains("duplicate node key")));
}
other => panic!("expected Invalid, got {other:?}"),
}
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
struct AgentSimpleIn {
goal: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq)]
struct AgentSimpleOut {
result: String,
}
impl Agent for AgentSimpleIn {
type Output = AgentSimpleOut;
fn build() -> AgentConfig {
AgentConfig::new("test agent", "openai://test-model")
}
}
impl Flow for AgentSimpleIn {
type Output = AgentSimpleOut;
fn build() -> Result<FlowGraph, FlowError> {
FlowGraph::builder()
.agent::<AgentSimpleIn>()
.build()
}
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
struct AgentToolIn {
goal: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq)]
struct AgentToolOut {
answer: String,
}
#[derive(Debug, Deserialize, JsonSchema)]
struct EchoTool {
text: String,
}
impl Tool for EchoTool {
type Output = String;
fn name() -> &'static str {
"echo"
}
fn description() -> &'static str {
"Echo text back"
}
async fn call(self, _ctx: Context) -> Result<Self::Output, ToolError> {
Ok(self.text)
}
}
impl Agent for AgentToolIn {
type Output = AgentToolOut;
fn build() -> AgentConfig {
AgentConfig::new("tool agent", "openai://test-model")
.with_tools(ToolBox::builder().tool::<EchoTool>().build())
}
}
impl Flow for AgentToolIn {
type Output = AgentToolOut;
fn build() -> Result<FlowGraph, FlowError> {
FlowGraph::builder()
.agent::<AgentToolIn>()
.build()
}
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
struct AgentWorkIn {
goal: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
struct AgentWorkMid {
text: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq)]
struct AgentWorkFinal {
upper: String,
}
impl Agent for AgentWorkIn {
type Output = AgentWorkMid;
fn build() -> AgentConfig {
AgentConfig::new("test", "openai://test-model")
}
}
impl Flow for AgentWorkIn {
type Output = AgentWorkFinal;
fn build() -> Result<FlowGraph, FlowError> {
FlowGraph::builder()
.agent::<AgentWorkIn>()
.work::<AgentWorkMid, AgentWorkFinal, _, _>(|m, _| async move {
Ok(AgentWorkFinal {
upper: m.text.to_uppercase(),
})
})
.build()
}
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
struct AgentEmptyModel {
goal: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
struct AgentEmptyModelOut {
result: String,
}
impl Agent for AgentEmptyModel {
type Output = AgentEmptyModelOut;
fn build() -> AgentConfig {
AgentConfig::new("test", "") }
}
impl Flow for AgentEmptyModel {
type Output = AgentEmptyModelOut;
fn build() -> Result<FlowGraph, FlowError> {
FlowGraph::builder()
.agent::<AgentEmptyModel>()
.build()
}
}
#[tokio::test]
async fn agent_structured_output() {
let factory = MockFactory::new(vec![structured(json!({"result": "done"}))]);
let mut rt = FlowRuntime::new(AgentSimpleIn {
goal: "test".into(),
})
.unwrap()
.with_factory(factory);
assert!(matches!(rt.next(ctx()).await.unwrap(), RunOut::Continue));
match rt.next(ctx()).await.unwrap() {
RunOut::Done(out) => assert_eq!(out.result, "done"),
other => panic!("expected Done, got {other:?}"),
}
}
#[tokio::test]
async fn agent_tool_then_exit_via_submit() {
let submit_args = json!({"answer": "42"});
let factory = MockFactory::new(vec![
tool_calls(vec![call("echo", json!({"text": "hello"}))]),
tool_calls(vec![call("submit", submit_args)]),
]);
let mut rt = FlowRuntime::new(AgentToolIn {
goal: "find answer".into(),
})
.unwrap()
.with_factory(factory);
assert!(matches!(rt.next(ctx()).await.unwrap(), RunOut::Continue));
assert!(matches!(rt.next(ctx()).await.unwrap(), RunOut::Continue));
match rt.next(ctx()).await.unwrap() {
RunOut::Done(out) => assert_eq!(out.answer, "42"),
other => panic!("expected Done, got {other:?}"),
}
}
#[tokio::test]
async fn agent_unknown_tool_errors() {
let factory = MockFactory::new(vec![tool_calls(vec![call(
"nonexistent_tool",
json!({}),
)])]);
let mut rt = FlowRuntime::new(AgentToolIn {
goal: "test".into(),
})
.unwrap()
.with_factory(factory);
let err = rt.next(ctx()).await.unwrap_err();
assert!(matches!(err, FlowError::AgentError(ref s) if s.contains("nonexistent_tool")));
}
#[tokio::test]
async fn agent_followed_by_work() {
let factory =
MockFactory::new(vec![structured(json!({"text": "hello from agent"}))]);
let mut rt = FlowRuntime::new(AgentWorkIn {
goal: "greet".into(),
})
.unwrap()
.with_factory(factory);
assert!(matches!(rt.next(ctx()).await.unwrap(), RunOut::Continue));
assert!(matches!(rt.next(ctx()).await.unwrap(), RunOut::Continue));
match rt.next(ctx()).await.unwrap() {
RunOut::Done(out) => assert_eq!(out.upper, "HELLO FROM AGENT"),
other => panic!("expected Done, got {other:?}"),
}
}
#[tokio::test]
async fn agent_empty_model_url_rejected() {
let err = FlowRuntime::new(AgentEmptyModel {
goal: "test".into(),
})
.unwrap_err();
match err {
FlowError::Invalid(problems) => {
assert!(problems.iter().any(|p| p.contains("model is empty")));
}
other => panic!("expected Invalid, got {other:?}"),
}
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
struct EitherIn {
route_left: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq)]
struct EitherLeft {
val: i32,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq)]
struct EitherRight {
val: i32,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq)]
struct EitherFinal {
val: i32,
from_left: bool,
}
impl Flow for EitherIn {
type Output = EitherFinal;
fn build() -> Result<FlowGraph, FlowError> {
FlowGraph::builder()
.either::<EitherIn, EitherLeft, EitherRight, _>(|inp, _| {
if inp.route_left {
Ok(Either::Left(EitherLeft { val: 1 }))
} else {
Ok(Either::Right(EitherRight { val: 2 }))
}
})
.work::<EitherLeft, EitherFinal, _, _>(|l, _| async move {
Ok(EitherFinal {
val: l.val,
from_left: true,
})
})
.work::<EitherRight, EitherFinal, _, _>(|r, _| async move {
Ok(EitherFinal {
val: r.val,
from_left: false,
})
})
.build()
}
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
struct EitherSameBranchIn {
x: i32,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
struct EitherSameBranchOut {
x: i32,
}
impl Flow for EitherSameBranchIn {
type Output = EitherSameBranchOut;
fn build() -> Result<FlowGraph, FlowError> {
FlowGraph::builder()
.either::<EitherSameBranchIn, EitherSameBranchOut, EitherSameBranchOut, _>(
|_, _| Ok(Either::Left(EitherSameBranchOut { x: 0 })),
)
.build()
}
}
#[tokio::test]
async fn either_takes_left_branch() {
let mut rt = FlowRuntime::new(EitherIn { route_left: true }).unwrap();
assert!(matches!(rt.next(ctx()).await.unwrap(), RunOut::Continue));
assert!(matches!(rt.next(ctx()).await.unwrap(), RunOut::Continue));
match rt.next(ctx()).await.unwrap() {
RunOut::Done(out) => {
assert_eq!(out.val, 1);
assert!(out.from_left);
}
other => panic!("expected Done, got {other:?}"),
}
}
#[tokio::test]
async fn either_takes_right_branch() {
let mut rt = FlowRuntime::new(EitherIn { route_left: false }).unwrap();
assert!(matches!(rt.next(ctx()).await.unwrap(), RunOut::Continue));
assert!(matches!(rt.next(ctx()).await.unwrap(), RunOut::Continue));
match rt.next(ctx()).await.unwrap() {
RunOut::Done(out) => {
assert_eq!(out.val, 2);
assert!(!out.from_left);
}
other => panic!("expected Done, got {other:?}"),
}
}
#[tokio::test]
async fn either_same_branches_rejected() {
let err = FlowRuntime::new(EitherSameBranchIn { x: 0 }).unwrap_err();
match err {
FlowError::Invalid(problems) => {
assert!(problems.iter().any(|p| p.contains("both branches resolve to the same schema name")));
}
other => panic!("expected Invalid, got {other:?}"),
}
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
struct ForkIn {
val: i32,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
struct ForkBranchA {
val: i32,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
struct ForkBranchB {
val: i32,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq)]
struct ForkOut {
sum: i32,
}
impl Flow for ForkIn {
type Output = ForkOut;
fn build() -> Result<FlowGraph, FlowError> {
FlowGraph::builder()
.fork::<ForkIn, ForkBranchA, ForkBranchB, _>(|inp, _| {
Ok((ForkBranchA { val: inp.val }, ForkBranchB { val: inp.val * 2 }))
})
.join::<ForkBranchA, ForkBranchB, ForkOut, _>(|a, b, _| {
Ok(ForkOut { sum: a.val + b.val })
})
.build()
}
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
struct ForkWorkIn {
val: i32,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
struct ForkWorkBranchA {
val: i32,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
struct ForkWorkBranchB {
val: i32,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
struct ForkWorkBranchBProcessed {
val: i32,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq)]
struct ForkWorkOut {
product: i32,
}
impl Flow for ForkWorkIn {
type Output = ForkWorkOut;
fn build() -> Result<FlowGraph, FlowError> {
FlowGraph::builder()
.fork::<ForkWorkIn, ForkWorkBranchA, ForkWorkBranchB, _>(|inp, _| {
Ok((ForkWorkBranchA { val: inp.val }, ForkWorkBranchB { val: inp.val }))
})
.work::<ForkWorkBranchB, ForkWorkBranchBProcessed, _, _>(|b, _| async move {
Ok(ForkWorkBranchBProcessed { val: b.val * 3 })
})
.join::<ForkWorkBranchA, ForkWorkBranchBProcessed, ForkWorkOut, _>(|a, b, _| {
Ok(ForkWorkOut { product: a.val * b.val })
})
.build()
}
}
#[tokio::test]
async fn fork_and_join_basic() {
let mut rt = FlowRuntime::new(ForkIn { val: 3 }).unwrap();
assert!(matches!(rt.next(ctx()).await.unwrap(), RunOut::Continue));
assert!(matches!(rt.next(ctx()).await.unwrap(), RunOut::Continue));
match rt.next(ctx()).await.unwrap() {
RunOut::Done(out) => assert_eq!(out, ForkOut { sum: 9 }),
other => panic!("expected Done, got {other:?}"),
}
}
#[tokio::test]
async fn fork_with_work_on_branch_before_join() {
let mut rt = FlowRuntime::new(ForkWorkIn { val: 4 }).unwrap();
assert!(matches!(rt.next(ctx()).await.unwrap(), RunOut::Continue));
assert!(matches!(rt.next(ctx()).await.unwrap(), RunOut::Continue));
assert!(matches!(rt.next(ctx()).await.unwrap(), RunOut::Continue));
match rt.next(ctx()).await.unwrap() {
RunOut::Done(out) => assert_eq!(out, ForkWorkOut { product: 48 }),
other => panic!("expected Done, got {other:?}"),
}
}
#[tokio::test]
async fn join_does_not_fire_until_both_branches_ready() {
let mut rt = FlowRuntime::new(ForkWorkIn { val: 2 }).unwrap();
rt.next(ctx()).await.unwrap(); let step2 = rt.next(ctx()).await.unwrap();
assert!(matches!(step2, RunOut::Continue), "expected Continue while join not ready");
assert!(matches!(rt.next(ctx()).await.unwrap(), RunOut::Continue));
match rt.next(ctx()).await.unwrap() {
RunOut::Done(out) => assert_eq!(out.product, 12),
other => panic!("{other:?}"),
}
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
struct SuspendIn {
task: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq)]
struct SuspendOut {
approved_by: String,
}
#[derive(Debug, Deserialize, JsonSchema)]
struct ApprovalTool {
reason: String,
}
impl Tool for ApprovalTool {
type Output = serde_json::Value;
fn name() -> &'static str {
"request_approval"
}
fn description() -> &'static str {
"Request external approval before continuing"
}
async fn call(self, _ctx: Context) -> Result<Self::Output, ToolError> {
Err(ToolError::suspend(json!({"reason": self.reason})))
}
}
impl Agent for SuspendIn {
type Output = SuspendOut;
fn build() -> AgentConfig {
AgentConfig::new("approval agent", "openai://test-model")
.with_tools(ToolBox::builder().tool::<ApprovalTool>().build())
}
}
impl Flow for SuspendIn {
type Output = SuspendOut;
fn build() -> Result<FlowGraph, FlowError> {
FlowGraph::builder()
.agent::<SuspendIn>()
.build()
}
}
#[tokio::test]
async fn suspend_and_resume_completes() {
let factory = MockFactory::new(vec![
tool_calls(vec![call("request_approval", json!({"reason": "needs sign-off"}))]),
tool_calls(vec![call("submit", json!({"approved_by": "alice"}))]),
]);
let mut rt = FlowRuntime::new(SuspendIn {
task: "deploy".into(),
})
.unwrap()
.with_factory(factory);
let suspend_out = rt.next(ctx()).await.unwrap();
let tool_id = match suspend_out {
RunOut::Suspend { tool_id, .. } => tool_id,
other => panic!("expected Suspend, got {other:?}"),
};
assert!(tool_id.contains("request_approval"));
let resume_out = rt.resume(ctx(), (tool_id, json!({"approved": true}))).await.unwrap();
assert!(matches!(resume_out, RunOut::Continue));
assert!(matches!(rt.next(ctx()).await.unwrap(), RunOut::Continue));
match rt.next(ctx()).await.unwrap() {
RunOut::Done(out) => assert_eq!(out.approved_by, "alice"),
other => panic!("expected Done, got {other:?}"),
}
}
#[tokio::test]
async fn resume_with_wrong_tool_id_errors() {
let factory = MockFactory::new(vec![tool_calls(vec![call(
"request_approval",
json!({"reason": "test"}),
)])]);
let mut rt = FlowRuntime::new(SuspendIn {
task: "test".into(),
})
.unwrap()
.with_factory(factory);
rt.next(ctx()).await.unwrap();
let err = rt
.resume(ctx(), ("wrong::tool_id".into(), json!({})))
.await
.unwrap_err();
assert!(matches!(err, FlowError::ResumeMismatchError(_)));
}
#[tokio::test]
async fn next_when_suspended_errors() {
let factory = MockFactory::new(vec![tool_calls(vec![call(
"request_approval",
json!({"reason": "gate"}),
)])]);
let mut rt = FlowRuntime::new(SuspendIn {
task: "test".into(),
})
.unwrap()
.with_factory(factory);
rt.next(ctx()).await.unwrap();
let err = rt.next(ctx()).await.unwrap_err();
assert!(matches!(err, FlowError::ResumeRequired(_)));
}
#[tokio::test]
async fn resume_when_not_suspended_errors() {
let factory = MockFactory::new(vec![structured(json!({"approved_by": "bot"}))]);
let mut rt = FlowRuntime::new(SuspendIn {
task: "test".into(),
})
.unwrap()
.with_factory(factory);
rt.next(ctx()).await.unwrap();
let err = rt
.resume(ctx(), ("SuspendIn::request_approval".into(), json!({})))
.await
.unwrap_err();
assert!(matches!(err, FlowError::UnexpectedResumption(_)));
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
struct ValBadEntry {
x: i32,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
struct ValBadEntryOut {
x: i32,
}
impl Flow for ValBadEntry {
type Output = ValBadEntryOut;
fn build() -> Result<FlowGraph, FlowError> {
FlowGraph::builder().build()
}
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
struct ValReachIn {
x: i32,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
struct ValReachOut {
x: i32,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
struct ValOrphanIn {
x: i32,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
struct ValOrphanOut {
x: i32,
}
impl Flow for ValReachIn {
type Output = ValReachOut;
fn build() -> Result<FlowGraph, FlowError> {
FlowGraph::builder()
.work::<ValReachIn, ValReachOut, _, _>(|a, _| async move {
Ok(ValReachOut { x: a.x })
})
.work::<ValOrphanIn, ValOrphanOut, _, _>(|a, _| async move {
Ok(ValOrphanOut { x: a.x })
})
.build()
}
}
#[tokio::test]
async fn validation_entry_not_registered() {
let err = FlowRuntime::new(ValBadEntry { x: 0 }).unwrap_err();
match err {
FlowError::Invalid(problems) => {
assert!(problems.iter().any(|p| p.contains("not a registered node")));
}
other => panic!("expected Invalid, got {other:?}"),
}
}
#[tokio::test]
async fn validation_unreachable_node() {
let err = FlowRuntime::new(ValReachIn { x: 0 }).unwrap_err();
match err {
FlowError::Invalid(problems) => {
assert!(problems.iter().any(|p| p.contains("unreachable from entry")));
}
other => panic!("expected Invalid, got {other:?}"),
}
}
#[tokio::test]
async fn flow_error_display_is_meaningful() {
let e = FlowError::NotFound("foo".into());
assert!(e.to_string().contains("foo"));
let e = FlowError::Invalid(vec!["problem one".into(), "problem two".into()]);
let s = e.to_string();
assert!(s.contains("problem one"));
assert!(s.contains("problem two"));
let e = FlowError::ResumeRequired("tool::x".into());
assert!(e.to_string().contains("tool::x"));
}
#[tokio::test]
async fn run_out_continue_and_done_are_distinct() {
let c: RunOut<i32> = RunOut::Continue;
assert!(matches!(c, RunOut::Continue));
let d: RunOut<i32> = RunOut::Done(42);
assert!(matches!(d, RunOut::Done(42)));
let s: RunOut<i32> = RunOut::Suspend {
value: json!(null),
tool_id: "t".into(),
};
assert!(matches!(s, RunOut::Suspend { .. }));
}