mod common;
use std::collections::HashMap;
use std::sync::Arc;
use runkon_flow::dsl::OnChildFail;
use runkon_flow::executors::foreach::execute_foreach;
use runkon_flow::traits::persistence::WorkflowPersistence;
use common::{
foreach_node, make_foreach_state, make_foreach_state_cancellable, make_persistence,
ordered_foreach_node, CancellingMockRunner, FailingOrderedItemProvider, MockChildRunner,
MockItemProvider, MockOrderedItemProvider,
};
fn fan_out_items(
persistence: &runkon_flow::persistence_memory::InMemoryWorkflowPersistence,
workflow_run_id: &str,
step_name_prefix: &str,
) -> Vec<runkon_flow::types::FanOutItemRow> {
let steps = persistence.get_steps(workflow_run_id).unwrap();
let step = steps
.iter()
.find(|s| s.step_name.contains(step_name_prefix))
.unwrap_or_else(|| panic!("no step matching '{step_name_prefix}'"));
persistence.get_fan_out_items(&step.id, None).unwrap()
}
fn count_status(items: &[runkon_flow::types::FanOutItemRow], status: &str) -> usize {
items.iter().filter(|i| i.status == status).count()
}
#[test]
fn test_foreach_sequential_all_succeed() {
let items_data = vec![
("ticket", "t1", "T-1"),
("ticket", "t2", "T-2"),
("ticket", "t3", "T-3"),
];
let outcomes: HashMap<String, bool> = items_data
.iter()
.map(|(_, id, _)| (id.to_string(), true))
.collect();
let persistence = make_persistence();
let mut state = make_foreach_state(
"sequential-test",
Arc::clone(&persistence),
MockChildRunner::new(outcomes),
MockItemProvider::new("tickets", items_data),
);
let node = foreach_node("fan-out", "tickets", "child-wf", 1, OnChildFail::Halt);
let result = execute_foreach(&mut state, &node, 0);
assert!(result.is_ok(), "expected Ok, got: {:?}", result);
assert!(state.all_succeeded, "all_succeeded should be true");
let items = fan_out_items(&persistence, &state.workflow_run_id, "foreach:fan-out");
assert_eq!(items.len(), 3, "should have 3 fan-out items");
assert_eq!(
count_status(&items, "completed"),
3,
"all 3 should be completed"
);
assert_eq!(count_status(&items, "failed"), 0);
assert_eq!(count_status(&items, "pending"), 0);
}
#[test]
fn test_foreach_parallel_fan_out_all_succeed() {
let items_data: Vec<(&str, &str, &str)> = vec![
("ticket", "t1", "T-1"),
("ticket", "t2", "T-2"),
("ticket", "t3", "T-3"),
("ticket", "t4", "T-4"),
("ticket", "t5", "T-5"),
];
let outcomes: HashMap<String, bool> = items_data
.iter()
.map(|(_, id, _)| (id.to_string(), true))
.collect();
let persistence = make_persistence();
let mut state = make_foreach_state(
"parallel-test",
Arc::clone(&persistence),
MockChildRunner::new(outcomes),
MockItemProvider::new("tickets", items_data),
);
let node = foreach_node("fan-out", "tickets", "child-wf", 3, OnChildFail::Continue);
let result = execute_foreach(&mut state, &node, 0);
assert!(result.is_ok(), "expected Ok, got: {:?}", result);
assert!(state.all_succeeded, "all_succeeded should be true");
let items = fan_out_items(&persistence, &state.workflow_run_id, "foreach:fan-out");
assert_eq!(items.len(), 5, "should have 5 fan-out items");
assert_eq!(
count_status(&items, "completed"),
5,
"all 5 should be completed"
);
assert_eq!(count_status(&items, "failed"), 0);
assert_eq!(count_status(&items, "pending"), 0);
}
#[test]
fn test_foreach_on_child_fail_halt() {
let items_data = vec![
("ticket", "t_fail", "T-fail"), ("ticket", "t1", "T-1"),
("ticket", "t2", "T-2"),
];
let mut outcomes = HashMap::new();
outcomes.insert("t_fail".to_string(), false); outcomes.insert("t1".to_string(), true);
outcomes.insert("t2".to_string(), true);
let persistence = make_persistence();
let mut state = make_foreach_state(
"halt-test",
Arc::clone(&persistence),
MockChildRunner::new(outcomes),
MockItemProvider::new("tickets", items_data),
);
let node = foreach_node("fan-out", "tickets", "child-wf", 1, OnChildFail::Halt);
let result = execute_foreach(&mut state, &node, 0);
assert!(
result.is_ok(),
"expected Ok (fail_fast=false), got: {:?}",
result
);
assert!(!state.all_succeeded, "all_succeeded should be false");
let items = fan_out_items(&persistence, &state.workflow_run_id, "foreach:fan-out");
assert_eq!(items.len(), 3, "should have 3 fan-out items");
assert_eq!(
count_status(&items, "completed"),
0,
"no items should complete"
);
assert_eq!(count_status(&items, "failed"), 1, "t_fail should fail");
assert_eq!(
count_status(&items, "pending"),
2,
"t1 and t2 should remain pending (not dispatched)"
);
}
#[test]
fn test_foreach_on_child_fail_continue() {
let items_data = vec![
("ticket", "t1", "T-1"),
("ticket", "t2", "T-2"),
("ticket", "t3", "T-3"),
];
let mut outcomes = HashMap::new();
outcomes.insert("t1".to_string(), true);
outcomes.insert("t2".to_string(), false); outcomes.insert("t3".to_string(), true);
let persistence = make_persistence();
let mut state = make_foreach_state(
"continue-test",
Arc::clone(&persistence),
MockChildRunner::new(outcomes),
MockItemProvider::new("tickets", items_data),
);
let node = foreach_node("fan-out", "tickets", "child-wf", 1, OnChildFail::Continue);
let result = execute_foreach(&mut state, &node, 0);
assert!(
result.is_ok(),
"expected Ok (fail_fast=false), got: {:?}",
result
);
assert!(
!state.all_succeeded,
"all_succeeded should be false (one item failed)"
);
let items = fan_out_items(&persistence, &state.workflow_run_id, "foreach:fan-out");
assert_eq!(items.len(), 3);
assert_eq!(
count_status(&items, "completed"),
2,
"t1 and t3 should complete"
);
assert_eq!(count_status(&items, "failed"), 1, "t2 should fail");
assert_eq!(
count_status(&items, "pending"),
0,
"no items should remain pending"
);
}
#[test]
fn test_foreach_on_child_fail_skip_dependents() {
let items_data = vec![
("ticket", "t1", "T-1"),
("ticket", "t2", "T-2"),
("ticket", "t3", "T-3"),
];
let mut outcomes = HashMap::new();
outcomes.insert("t1".to_string(), false); outcomes.insert("t2".to_string(), true); outcomes.insert("t3".to_string(), true);
let persistence = make_persistence();
let mut state = make_foreach_state(
"skip-deps-test",
Arc::clone(&persistence),
MockChildRunner::new(outcomes),
MockOrderedItemProvider::new("tickets", items_data, vec![("t1", "t2")]),
);
let node = ordered_foreach_node(
"fan-out",
"tickets",
"child-wf",
1,
OnChildFail::SkipDependents,
);
let result = execute_foreach(&mut state, &node, 0);
assert!(
result.is_ok(),
"expected Ok (fail_fast=false), got: {:?}",
result
);
assert!(!state.all_succeeded, "all_succeeded should be false");
let items = fan_out_items(&persistence, &state.workflow_run_id, "foreach:fan-out");
assert_eq!(items.len(), 3);
assert_eq!(count_status(&items, "failed"), 1, "t1 should fail");
assert_eq!(count_status(&items, "skipped"), 1, "t2 should be skipped");
assert_eq!(count_status(&items, "completed"), 1, "t3 should complete");
assert_eq!(
count_status(&items, "pending"),
0,
"no items should remain pending"
);
}
#[test]
fn test_foreach_ordered_with_dependencies() {
let items_data = vec![
("ticket", "t1", "T-1"),
("ticket", "t2", "T-2"),
("ticket", "t3", "T-3"),
];
let outcomes: HashMap<String, bool> = items_data
.iter()
.map(|(_, id, _)| (id.to_string(), true))
.collect();
let persistence = make_persistence();
let mut state = make_foreach_state(
"ordered-test",
Arc::clone(&persistence),
MockChildRunner::new(outcomes),
MockOrderedItemProvider::new("tickets", items_data, vec![("t1", "t2"), ("t2", "t3")]),
);
let node = ordered_foreach_node("fan-out", "tickets", "child-wf", 3, OnChildFail::Continue);
let result = execute_foreach(&mut state, &node, 0);
assert!(result.is_ok(), "expected Ok, got: {:?}", result);
assert!(state.all_succeeded, "all items should succeed");
let items = fan_out_items(&persistence, &state.workflow_run_id, "foreach:fan-out");
assert_eq!(items.len(), 3);
assert_eq!(
count_status(&items, "completed"),
3,
"all 3 should complete"
);
assert_eq!(count_status(&items, "pending"), 0);
assert_eq!(count_status(&items, "failed"), 0);
}
#[test]
fn test_foreach_cancellation() {
let items_data = vec![
("ticket", "t1", "T-1"),
("ticket", "t2", "T-2"),
("ticket", "t3", "T-3"),
];
let outcomes: HashMap<String, bool> = items_data
.iter()
.map(|(_, id, _)| (id.to_string(), true))
.collect();
let cancellation = runkon_flow::CancellationToken::new();
let persistence = make_persistence();
let mut state = make_foreach_state_cancellable(
"cancel-test",
Arc::clone(&persistence),
CancellingMockRunner::new(outcomes, 1, cancellation.clone()),
MockItemProvider::new("tickets", items_data),
cancellation,
);
let node = foreach_node("fan-out", "tickets", "child-wf", 1, OnChildFail::Continue);
let result = execute_foreach(&mut state, &node, 0);
assert!(result.is_ok(), "expected Ok, got: {:?}", result);
let items = fan_out_items(&persistence, &state.workflow_run_id, "foreach:fan-out");
assert_eq!(items.len(), 3, "all 3 fan-out items should be created");
assert_eq!(
count_status(&items, "completed"),
1,
"only t1 should complete"
);
assert_eq!(
count_status(&items, "pending"),
2,
"t2 and t3 should remain pending after cancellation"
);
}
#[test]
fn test_foreach_empty_items() {
let persistence = make_persistence();
let mut state = make_foreach_state(
"empty-test",
Arc::clone(&persistence),
MockChildRunner::all_succeed(&[]),
MockItemProvider::new("tickets", vec![]),
);
let node = foreach_node("fan-out", "tickets", "child-wf", 1, OnChildFail::Halt);
let result = execute_foreach(&mut state, &node, 0);
assert!(result.is_ok(), "expected Ok, got: {:?}", result);
assert!(state.all_succeeded, "empty foreach should succeed");
let steps = persistence.get_steps(&state.workflow_run_id).unwrap();
let step = steps
.iter()
.find(|s| s.step_name == "foreach:fan-out")
.unwrap();
assert_eq!(
step.status,
runkon_flow::status::WorkflowStepStatus::Completed,
"step should be Completed"
);
let items = persistence.get_fan_out_items(&step.id, None).unwrap();
assert_eq!(items.len(), 0, "no fan-out items for empty provider");
}
#[test]
fn test_foreach_persistence_error_propagates() {
let persistence = make_persistence();
let mut state = make_foreach_state(
"persistence-fail-test",
Arc::clone(&persistence),
MockChildRunner::all_succeed(&["t1"]),
MockItemProvider::new("tickets", vec![("ticket", "t1", "T-1")]),
);
persistence.set_fail_get_fan_out_items(true);
let node = foreach_node("fan-out", "tickets", "child-wf", 1, OnChildFail::Halt);
let result = execute_foreach(&mut state, &node, 0);
assert!(result.is_err(), "expected Err from persistence failure");
assert!(
matches!(
result.unwrap_err(),
runkon_flow::engine_error::EngineError::Persistence(_)
),
"error should be EngineError::Persistence"
);
}
struct ContextItemProvider {
name: String,
items: Vec<(String, String, String, HashMap<String, String>)>,
}
impl runkon_flow::traits::item_provider::ItemProvider for ContextItemProvider {
fn name(&self) -> &str {
&self.name
}
fn items(
&self,
_ctx: &dyn runkon_flow::traits::run_context::RunContext,
_info: &runkon_flow::traits::item_provider::ProviderInfo,
_scope: Option<&dyn std::any::Any>,
_filter: &HashMap<String, String>,
) -> Result<
Vec<runkon_flow::traits::item_provider::FanOutItem>,
runkon_flow::engine_error::EngineError,
> {
use runkon_flow::traits::item_provider::FanOutItem;
Ok(self
.items
.iter()
.map(|(t, i, r, ctx)| FanOutItem {
item_type: t.clone(),
item_id: i.clone(),
item_ref: r.clone(),
context: ctx.clone(),
})
.collect())
}
}
struct InputCapturingRunner {
captured: std::sync::Mutex<Vec<HashMap<String, String>>>,
}
impl InputCapturingRunner {
fn new() -> Self {
Self {
captured: std::sync::Mutex::new(vec![]),
}
}
fn captured_inputs(&self) -> Vec<HashMap<String, String>> {
self.captured.lock().unwrap().clone()
}
}
impl runkon_flow::engine::ChildWorkflowRunner for InputCapturingRunner {
fn execute_child(
&self,
workflow_name: &str,
_parent_ctx: &runkon_flow::engine::ChildWorkflowContext,
params: runkon_flow::engine::ChildWorkflowInput,
) -> runkon_flow::engine_error::Result<runkon_flow::types::WorkflowResult> {
self.captured.lock().unwrap().push(params.inputs.clone());
let item_id = params.inputs.get("item.id").cloned().unwrap_or_default();
Ok(runkon_flow::types::WorkflowResult {
workflow_run_id: format!("mock-run-{item_id}"),
workflow_name: workflow_name.to_string(),
all_succeeded: true,
total_duration_ms: 0,
extensions: Default::default(),
})
}
fn resume_child(
&self,
_workflow_run_id: &str,
_model: Option<&str>,
_parent_ctx: &runkon_flow::engine::ChildWorkflowContext,
) -> runkon_flow::engine_error::Result<runkon_flow::types::WorkflowResult> {
unimplemented!()
}
fn find_resumable_child(
&self,
_parent_run_id: &str,
_workflow_name: &str,
) -> runkon_flow::engine_error::Result<Option<runkon_flow::types::WorkflowRun>> {
Ok(None)
}
}
#[test]
fn test_foreach_item_context_injected_into_child_inputs() {
use runkon_flow::ItemProviderRegistry;
let mut ctx = HashMap::new();
ctx.insert("title".to_string(), "Fix the bug".to_string());
ctx.insert("state".to_string(), "open".to_string());
let provider = ContextItemProvider {
name: "tickets".to_string(),
items: vec![(
"ticket".to_string(),
"t1".to_string(),
"T-1".to_string(),
ctx,
)],
};
let runner = Arc::new(InputCapturingRunner::new());
let persistence = make_persistence();
let mut state = common::make_state("ctx-inject-test", Arc::clone(&persistence), HashMap::new());
state.child_runner =
Some(Arc::clone(&runner) as Arc<dyn runkon_flow::engine::ChildWorkflowRunner>);
state.exec_config.fail_fast = false;
let mut registry = ItemProviderRegistry::new();
registry.register(provider);
state.registry = Arc::new(registry);
let node = foreach_node("fan-out", "tickets", "child-wf", 1, OnChildFail::Halt);
let result = execute_foreach(&mut state, &node, 0);
assert!(result.is_ok(), "expected Ok, got: {:?}", result);
let inputs_list = runner.captured_inputs();
assert_eq!(inputs_list.len(), 1, "one child run expected");
let inputs = &inputs_list[0];
assert_eq!(inputs.get("item.id").map(String::as_str), Some("t1"));
assert_eq!(inputs.get("item.ref").map(String::as_str), Some("T-1"));
assert_eq!(
inputs.get("item.title").map(String::as_str),
Some("Fix the bug"),
"item.title should be injected from context"
);
assert_eq!(
inputs.get("item.state").map(String::as_str),
Some("open"),
"item.state should be injected from context"
);
}
#[test]
fn test_foreach_ordered_dependencies_error_propagates() {
let persistence = make_persistence();
let mut state = make_foreach_state(
"deps-fail-test",
Arc::clone(&persistence),
MockChildRunner::all_succeed(&["t1", "t2"]),
FailingOrderedItemProvider::new(
"tickets",
vec![("ticket", "t1", "T-1"), ("ticket", "t2", "T-2")],
),
);
let node = ordered_foreach_node("fan-out", "tickets", "child-wf", 1, OnChildFail::Halt);
let result = execute_foreach(&mut state, &node, 0);
assert!(
result.is_err(),
"expected Err from dependency fetch failure"
);
assert!(
matches!(
result.unwrap_err(),
runkon_flow::engine_error::EngineError::Workflow(_)
),
"error should be EngineError::Workflow"
);
}