use super::*;
#[tokio::test]
async fn test_e2e_http_try_catch_recovery() {
let api_ok = warp::path("users").and(warp::path("1")).map(|| {
warp::reply::json(&serde_json::json!({
"id": 1,
"name": "Alice"
}))
});
let api_fail = warp::path("users").and(warp::path("999")).map(|| {
warp::reply::with_status(
"Internal Server Error",
warp::http::StatusCode::INTERNAL_SERVER_ERROR,
)
});
let routes = api_ok.or(api_fail);
let port = start_mock_server(routes);
let yaml_str = r#"
document:
dsl: '1.0.0'
namespace: test
name: http-try-catch
version: '0.1.0'
do:
- getUser:
call: http
with:
method: get
endpoint:
uri: http://localhost:PORT/users/1
export:
as: '${ {userName: .name} }'
- tryRiskyCall:
try:
- callFail:
call: http
with:
method: get
endpoint:
uri: http://localhost:PORT/users/999
catch:
errors:
with:
type: communication
status: 500
do:
- handleError:
set:
errorHandled: true
- setResult:
set:
userName: '${ $context.userName }}'
recovered: '${ if .errorHandled then true else false end }}'
"#
.replace("PORT", &port.to_string());
let output = run_workflow_yaml(&yaml_str, json!({})).await.unwrap();
assert_eq!(output["userName"], json!("Alice"));
assert_eq!(output["recovered"], json!(true));
}
#[tokio::test]
async fn test_e2e_http_for_batch_call() {
let get_item = warp::path!("items" / i32).map(|id| {
warp::reply::json(&serde_json::json!({
"id": id,
"name": format!("Item-{}", id),
"price": id * 10
}))
});
let port = start_mock_server(get_item);
let yaml_str = r#"
document:
dsl: '1.0.0'
namespace: test
name: http-for-batch
version: '0.1.0'
do:
- fetchItems:
for:
each: itemId
in: '${ .ids }'
do:
- getItem:
call: http
with:
method: get
endpoint:
uri: '${ "http://localhost:PORT/items/" + ($itemId | tostring) }'
output:
as: '${ {id: .id, name: .name, price: .price} }'
export:
as: '${ {collected: ((if $context == null then [] elif $context.collected == null then [] else $context.collected end) + [{id: .id, name: .name, price: .price}])} }'
"#.replace("PORT", &port.to_string());
let output = run_workflow_yaml(&yaml_str, json!({"ids": [1, 2, 3]}))
.await
.unwrap();
assert_eq!(output["id"], json!(3));
assert_eq!(output["name"], json!("Item-3"));
}
#[tokio::test]
async fn test_e2e_http_export_switch_order() {
let get_order = warp::path!("orders" / i32).map(|id| {
warp::reply::json(&serde_json::json!({
"orderId": id,
"status": if id == 1 { "shipped" } else { "pending" },
"total": id * 100
}))
});
let port = start_mock_server(get_order);
let yaml_str = r#"
document:
dsl: '1.0.0'
namespace: test
name: http-export-switch-order
version: '0.1.0'
do:
- fetchOrder:
call: http
with:
method: get
endpoint:
uri: http://localhost:PORT/orders/1
export:
as: '${ {orderId: .orderId, orderStatus: .status} }'
- processByStatus:
switch:
- shipped:
when: '${ $context.orderStatus == "shipped" }'
then: handleShipped
- pending:
when: '${ $context.orderStatus == "pending" }'
then: handlePending
- handleShipped:
set:
result: 'shipped'
orderId: '${ $context.orderId }'
then: end
- handlePending:
set:
result: 'pending'
orderId: '${ $context.orderId }'
then: end
"#
.replace("PORT", &port.to_string());
let output = run_workflow_yaml(&yaml_str, json!({})).await.unwrap();
assert_eq!(output["result"], json!("shipped"));
assert_eq!(output["orderId"], json!(1));
}
#[tokio::test]
async fn test_e2e_http_export_switch_pending_order() {
let get_order = warp::path!("orders" / i32).map(|id| {
warp::reply::json(&serde_json::json!({
"orderId": id,
"status": "pending",
"total": id * 100
}))
});
let port = start_mock_server(get_order);
let yaml_str = r#"
document:
dsl: '1.0.0'
namespace: test
name: http-export-switch-pending
version: '0.1.0'
do:
- fetchOrder:
call: http
with:
method: get
endpoint:
uri: http://localhost:PORT/orders/2
export:
as: '${ {orderId: .orderId, orderStatus: .status} }'
- processByStatus:
switch:
- shipped:
when: '${ $context.orderStatus == "shipped" }'
then: handleShipped
- pending:
when: '${ $context.orderStatus == "pending" }'
then: handlePending
- handleShipped:
set:
result: 'shipped'
orderId: '${ $context.orderId }'
then: end
- handlePending:
set:
result: 'pending'
orderId: '${ $context.orderId }'
then: end
"#
.replace("PORT", &port.to_string());
let output = run_workflow_yaml(&yaml_str, json!({})).await.unwrap();
assert_eq!(output["result"], json!("pending"));
assert_eq!(output["orderId"], json!(2));
}
#[tokio::test]
async fn test_e2e_etl_pipeline() {
let submit = warp::post()
.and(warp::path("api"))
.and(warp::path("submit"))
.and(warp::body::json())
.map(|body: serde_json::Value| {
warp::reply::json(&serde_json::json!({
"accepted": true,
"count": body["items"].as_array().map(|a| a.len()).unwrap_or(0),
"totalValue": body["total"]
}))
});
let port = start_mock_server(submit);
let yaml_str = r#"
document:
dsl: '1.0.0'
namespace: test
name: etl-pipeline
version: '0.1.0'
do:
# Transform: compute total and filter high-value items
- transform:
set:
total: '${ [.items[] | .price] | add }'
highValueItems: '${ [.items[] | select(.price > 50)] }'
export:
as: '${ {total: .total, highValueItems: .highValueItems} }'
# Load: submit to API with try/catch
- load:
try:
- submitData:
call: http
with:
method: post
endpoint:
uri: http://localhost:PORT/api/submit
body:
items: '${ .highValueItems }'
total: '${ .total }'
catch:
errors:
with:
type: communication
do:
- handleLoadError:
set:
loadError: true
- buildFinalResult:
set:
itemCount: '${ ($context.highValueItems | length) }'
totalAmount: '${ $context.total }'
submitted: '${ .accepted }'
"#
.replace("PORT", &port.to_string());
let workflow: WorkflowDefinition = serde_yaml::from_str(&yaml_str).unwrap();
let runner = WorkflowRunner::new(workflow).unwrap();
let output = runner
.run(json!({
"items": [
{"name": "Widget", "price": 30},
{"name": "Gadget", "price": 80},
{"name": "Doohickey", "price": 120},
{"name": "Thingamajig", "price": 15}
]
}))
.await
.unwrap();
assert_eq!(output["itemCount"], json!(2)); assert_eq!(output["totalAmount"], json!(245)); assert_eq!(output["submitted"], json!(true));
}
#[tokio::test]
async fn test_e2e_http_fork_parallel_calls() {
let users = warp::path("users")
.map(|| warp::reply::json(&serde_json::json!([{"id": 1, "name": "Alice"}])));
let products = warp::path("products")
.map(|| warp::reply::json(&serde_json::json!([{"id": 1, "name": "Widget"}])));
let routes = users.or(products);
let port = start_mock_server(routes);
let yaml_str = r#"
document:
dsl: '1.0.0'
namespace: test
name: http-fork-parallel
version: '0.1.0'
do:
- parallelFetch:
fork:
compete: false
branches:
- fetchUsers:
call: http
with:
method: get
endpoint:
uri: http://localhost:PORT/users
- fetchProducts:
call: http
with:
method: get
endpoint:
uri: http://localhost:PORT/products
- mergeResults:
set:
hasUsers: '${ (to_entries | length) > 0 }'
hasProducts: '${ (to_entries | length) > 0 }'
"#
.replace("PORT", &port.to_string());
let output = run_workflow_yaml(&yaml_str, json!({})).await.unwrap();
assert!(output["hasUsers"].is_boolean());
assert!(output["hasProducts"].is_boolean());
}
#[tokio::test]
async fn test_e2e_concatenating_strings() {
let yaml_str = r#"
document:
name: concatenating-strings
dsl: '1.0.0'
namespace: default
version: '1.0.0'
do:
- task1:
set:
firstName: "John"
lastName: ""
- task2:
set:
firstName: "${ .firstName }"
lastName: "Doe"
- task3:
set:
fullName: "${ .firstName + ' ' + .lastName }"
"#;
let output = run_workflow_yaml(&yaml_str, json!({})).await.unwrap();
assert_eq!(output["fullName"], json!("John Doe"));
}
#[tokio::test]
async fn test_e2e_conditional_logic() {
let yaml_str = r#"
document:
name: conditional-logic
dsl: '1.0.0'
namespace: default
version: '1.0.0'
do:
- task1:
set:
temperature: 30
- task2:
set:
weather: "${ if .temperature > 25 then 'hot' else 'cold' end }"
"#;
let output = run_workflow_yaml(&yaml_str, json!({})).await.unwrap();
assert_eq!(output["weather"], json!("hot"));
}
#[tokio::test]
async fn test_e2e_conditional_logic_input_from() {
let yaml_str = r#"
document:
name: conditional-logic
dsl: '1.0.0'
namespace: default
version: '1.0.0'
input:
from: "${ .localWeather }"
do:
- task2:
set:
weather: "${ if .temperature > 25 then 'hot' else 'cold' end }"
"#;
let output = run_workflow_yaml(&yaml_str, json!({"localWeather": {"temperature": 34}}))
.await
.unwrap();
assert_eq!(output["weather"], json!("hot"));
}
#[tokio::test]
async fn test_e2e_sequential_set_colors() {
let yaml_str = r#"
document:
dsl: '1.0.0'
namespace: default
name: do
version: '1.0.0'
do:
- setRed:
set:
colors: "${ .colors + ['red'] }"
- setGreen:
set:
colors: "${ .colors + ['green'] }"
- setBlue:
set:
colors: "${ .colors + ['blue'] }"
output:
as: "${ { resultColors: .colors } }"
"#;
let output = run_workflow_yaml(&yaml_str, json!({})).await.unwrap();
assert_eq!(output["resultColors"], json!(["red", "green", "blue"]));
}
#[tokio::test]
async fn test_e2e_sequential_set_colors_workflow_output() {
let yaml_str = r#"
document:
dsl: '1.0.0'
namespace: default
name: do
version: '1.0.0'
do:
- setRed:
set:
colors: "${ .colors + ['red'] }"
- setGreen:
set:
colors: "${ .colors + ['green'] }"
- setBlue:
set:
colors: "${ .colors + ['blue'] }"
output:
as: "${ { result: .colors } }"
"#;
let output = run_workflow_yaml(&yaml_str, json!({})).await.unwrap();
assert_eq!(output["result"], json!(["red", "green", "blue"]));
}
#[tokio::test]
async fn test_e2e_http_try_catch() {
let ok_endpoint = warp::path("data").map(|| {
warp::reply::json(&serde_json::json!({
"result": "success",
"value": 42
}))
});
let err_endpoint = warp::path("fail").map(|| {
warp::reply::with_status(
"Internal Server Error",
warp::http::StatusCode::INTERNAL_SERVER_ERROR,
)
});
let routes = ok_endpoint.or(err_endpoint);
let port = start_mock_server(routes);
let yaml_str = std::fs::read_to_string(testdata("e2e_http_try_catch.yaml")).unwrap();
let yaml_str = yaml_str.replace("PORT", &port.to_string());
let output = run_workflow_yaml(&yaml_str, json!({})).await.unwrap();
assert_eq!(output["apiResult"], json!("success"));
assert_eq!(output["errorHandled"], json!(true));
}
#[tokio::test]
async fn test_e2e_http_for_batch() {
let items = warp::path!("items" / i32).map(|id| {
warp::reply::json(&serde_json::json!({
"id": id,
"name": format!("Item {}", id)
}))
});
let port = start_mock_server(items);
let yaml_str = std::fs::read_to_string(testdata("e2e_http_for_batch.yaml")).unwrap();
let yaml_str = yaml_str.replace("PORT", &port.to_string());
let output = run_workflow_yaml(&yaml_str, json!({"itemIds": [1, 2, 3]}))
.await
.unwrap();
assert!(
output.is_object() || output.is_array(),
"for loop should return structured data"
);
}
#[tokio::test]
async fn test_e2e_http_export_switch() {
let order_api = warp::path!("orders" / i32).map(|id| {
warp::reply::json(&serde_json::json!({
"orderId": id,
"status": if id == 1 { "shipped" } else { "pending" },
"total": 99.99
}))
});
let port = start_mock_server(order_api);
let yaml_str = std::fs::read_to_string(testdata("e2e_http_export_switch.yaml")).unwrap();
let yaml_str = yaml_str.replace("PORT", &port.to_string());
let output = run_workflow_yaml(&yaml_str, json!({})).await.unwrap();
assert_eq!(output["notified"], json!(true));
assert_eq!(output["message"], json!("Your order has been shipped!"));
}
#[tokio::test]
async fn test_e2e_etl_workflow() {
let yaml_str = r#"
document:
dsl: '1.0.0'
namespace: test
name: e2e-etl
version: '0.1.0'
do:
- extract:
run:
shell:
command: "echo 42"
return: stdout
- clean:
set:
rawData: ${ . | tonumber }
source: shell
- transform:
for:
each: item
in: ${ [.rawData, .rawData + 1, .rawData + 2] }
do:
- doubleIt:
set:
original: ${ $item }
doubled: ${ $item * 2 }
- aggregate:
try:
- combine:
set:
summary: "ETL complete"
itemCount: 3
catch:
do:
- recovery:
set:
summary: "ETL partial"
itemCount: 0
"#;
let output = run_workflow_yaml(&yaml_str, json!({})).await.unwrap();
assert_eq!(output["summary"], json!("ETL complete"));
assert_eq!(output["itemCount"], json!(3));
}