#![allow(dead_code)]
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use runkon_flow::cancellation::CancellationToken;
use runkon_flow::dsl::{
ApprovalMode, ForEachNode, GateNode, OnChildFail, OnCycle, OnTimeout, WorkflowDef,
WorkflowNode, WorkflowTrigger,
};
use runkon_flow::engine::{ChildWorkflowInput, ChildWorkflowRunner, ExecutionState, ResumeContext};
use runkon_flow::engine_error::EngineError;
use runkon_flow::persistence_memory::InMemoryWorkflowPersistence;
pub use runkon_flow::traits::action_executor::ActionExecutor;
use runkon_flow::traits::action_executor::{ActionOutput, ActionParams, ActionRegistry, StepInfo};
use runkon_flow::traits::item_provider::{FanOutItem, ItemProvider, ProviderInfo};
use runkon_flow::traits::persistence::{NewRun, WorkflowPersistence};
use runkon_flow::traits::run_context::NoopRunContext;
use runkon_flow::traits::run_context::RunContext;
use runkon_flow::traits::script_env_provider::NoOpScriptEnvProvider;
use runkon_flow::types::{WorkflowExecConfig, WorkflowResult};
use runkon_flow::CancellationReason;
use runkon_flow::ItemProviderRegistry;
pub struct MockExecutor {
pub label: String,
pub markers: Vec<String>,
}
impl MockExecutor {
pub fn new(name: &str) -> Self {
Self {
label: name.to_string(),
markers: vec![],
}
}
pub fn with_markers(name: &str, markers: &[&str]) -> Self {
Self {
label: name.to_string(),
markers: markers.iter().map(|s| s.to_string()).collect(),
}
}
}
impl ActionExecutor for MockExecutor {
fn name(&self) -> &str {
&self.label
}
fn execute(
&self,
_ctx: &dyn RunContext,
_info: &StepInfo,
_params: &ActionParams,
) -> Result<ActionOutput, EngineError> {
Ok(ActionOutput {
markers: self.markers.clone(),
..Default::default()
})
}
}
pub struct FailingExecutor;
impl ActionExecutor for FailingExecutor {
fn name(&self) -> &str {
"failing"
}
fn execute(
&self,
_ctx: &dyn RunContext,
_info: &StepInfo,
_params: &ActionParams,
) -> Result<ActionOutput, EngineError> {
Err(EngineError::Workflow(
"intentional test failure".to_string(),
))
}
}
#[allow(unused_imports)]
pub use runkon_flow::test_helpers::{call_node, make_def, ForwardSink, VecSink};
pub fn make_persistence() -> Arc<InMemoryWorkflowPersistence> {
Arc::new(InMemoryWorkflowPersistence::new())
}
pub fn make_state(
wf_name: &str,
persistence: Arc<InMemoryWorkflowPersistence>,
named_executors: HashMap<String, Box<dyn ActionExecutor>>,
) -> ExecutionState {
let run = persistence
.create_run(NewRun {
workflow_name: wf_name.to_string(),
parent_run_id: String::new(),
dry_run: false,
trigger: "test".to_string(),
definition_snapshot: None,
parent_workflow_run_id: None,
})
.expect("create_run failed");
ExecutionState {
persistence: Arc::clone(&persistence) as Arc<dyn WorkflowPersistence>,
action_registry: Arc::new(ActionRegistry::from_executors(named_executors, None)),
script_env_provider: Arc::new(NoOpScriptEnvProvider),
workflow_run_id: run.id,
workflow_name: wf_name.to_string(),
run_ctx: Arc::new(NoopRunContext::default())
as Arc<dyn runkon_flow::traits::run_context::RunContext>,
extra_plugin_dirs: vec![],
model: None,
exec_config: WorkflowExecConfig::default(),
inputs: HashMap::new(),
parent_run_id: String::new(),
depth: 0,
target_label: None,
step_results: HashMap::new(),
contexts: vec![],
position: 0,
all_succeeded: true,
total_cost: 0.0,
total_turns: 0,
total_duration_ms: 0,
total_input_tokens: 0,
total_output_tokens: 0,
total_cache_read_input_tokens: 0,
total_cache_creation_input_tokens: 0,
has_llm_metrics: false,
last_gate_feedback: None,
block_output: None,
block_with: vec![],
resume_ctx: None,
default_as_identity: None,
triggered_by_hook: false,
schema_resolver: None,
child_runner: None,
last_heartbeat_at: ExecutionState::new_heartbeat(),
registry: Arc::new(ItemProviderRegistry::new()),
event_sinks: Arc::from(vec![]),
cancellation: CancellationToken::new(),
current_execution_id: Arc::new(Mutex::new(None)),
owner_token: None,
lease_generation: Some(0),
}
}
pub fn make_state_with_resume_ctx(
wf_name: &str,
persistence: Arc<InMemoryWorkflowPersistence>,
named_executors: HashMap<String, Box<dyn ActionExecutor>>,
) -> ExecutionState {
let mut state = make_state(wf_name, persistence, named_executors);
state.resume_ctx = Some(ResumeContext {
step_map: HashMap::new(),
});
state
}
pub fn make_def_with_always(
name: &str,
body: Vec<WorkflowNode>,
always: Vec<WorkflowNode>,
) -> WorkflowDef {
WorkflowDef {
name: name.to_string(),
title: None,
description: String::new(),
trigger: WorkflowTrigger::Manual,
targets: vec![],
group: None,
inputs: vec![],
body,
always,
source_path: "test.wf".to_string(),
}
}
pub fn gate_node(name: &str) -> WorkflowNode {
WorkflowNode::Gate(GateNode {
name: name.to_string(),
gate_type: "human_approval".to_string(),
prompt: None,
min_approvals: 1,
approval_mode: ApprovalMode::default(),
timeout_secs: 0,
on_timeout: OnTimeout::Fail,
as_identity: None,
quality_gate: None,
options: None,
})
}
pub fn timeout_gate(on_timeout: OnTimeout) -> WorkflowNode {
WorkflowNode::Gate(GateNode {
name: "approval".to_string(),
gate_type: "human_approval".to_string(),
prompt: None,
min_approvals: 1,
approval_mode: ApprovalMode::default(),
timeout_secs: 0,
on_timeout,
as_identity: None,
quality_gate: None,
options: None,
})
}
pub fn named_executors(
executors: impl IntoIterator<Item = Box<dyn ActionExecutor>>,
) -> HashMap<String, Box<dyn ActionExecutor>> {
executors
.into_iter()
.map(|e| (e.name().to_string(), e))
.collect()
}
fn mock_workflow_result(item_id: &str, wf_name: &str, succeeded: bool) -> WorkflowResult {
WorkflowResult {
workflow_run_id: format!("mock-run-{}", item_id),
workflow_name: wf_name.to_string(),
all_succeeded: succeeded,
total_duration_ms: 0,
extensions: Default::default(),
}
}
pub struct MockChildRunner {
outcomes: HashMap<String, bool>,
pub call_log: Mutex<Vec<String>>,
}
impl MockChildRunner {
pub fn new(outcomes: HashMap<String, bool>) -> Self {
Self {
outcomes,
call_log: Mutex::new(Vec::new()),
}
}
pub fn all_succeed(item_ids: &[&str]) -> Self {
Self::new(item_ids.iter().map(|id| (id.to_string(), true)).collect())
}
}
impl ChildWorkflowRunner for MockChildRunner {
fn execute_child(
&self,
workflow_name: &str,
_parent_ctx: &runkon_flow::engine::ChildWorkflowContext,
params: ChildWorkflowInput,
) -> runkon_flow::engine_error::Result<WorkflowResult> {
let item_id = params.inputs.get("item.id").cloned().unwrap_or_default();
self.call_log.lock().unwrap().push(item_id.clone());
let succeeded = self.outcomes.get(&item_id).copied().unwrap_or(true);
Ok(mock_workflow_result(&item_id, workflow_name, succeeded))
}
fn resume_child(
&self,
_workflow_run_id: &str,
_model: Option<&str>,
_parent_ctx: &runkon_flow::engine::ChildWorkflowContext,
) -> runkon_flow::engine_error::Result<WorkflowResult> {
unimplemented!("MockChildRunner does not support resume_child")
}
fn find_resumable_child(
&self,
_parent_run_id: &str,
_workflow_name: &str,
) -> runkon_flow::engine_error::Result<Option<runkon_flow::types::WorkflowRun>> {
Ok(None)
}
}
pub struct MockItemProvider {
name: String,
items: Vec<(String, String, String)>, }
impl MockItemProvider {
pub fn new(name: &str, items: Vec<(&str, &str, &str)>) -> Self {
Self {
name: name.to_string(),
items: items
.into_iter()
.map(|(t, i, r)| (t.to_string(), i.to_string(), r.to_string()))
.collect(),
}
}
}
impl ItemProvider for MockItemProvider {
fn name(&self) -> &str {
&self.name
}
fn items(
&self,
_ctx: &dyn RunContext,
_info: &ProviderInfo,
_scope: Option<&dyn std::any::Any>,
_filter: &HashMap<String, String>,
) -> Result<Vec<FanOutItem>, EngineError> {
Ok(self
.items
.iter()
.map(|(t, i, r)| FanOutItem {
item_type: t.clone(),
item_id: i.clone(),
item_ref: r.clone(),
context: HashMap::new(),
})
.collect())
}
}
pub fn foreach_node(
name: &str,
provider: &str,
workflow: &str,
max_parallel: u32,
on_child_fail: OnChildFail,
) -> ForEachNode {
ForEachNode {
name: name.to_string(),
over: provider.to_string(),
scope: None,
filter: HashMap::new(),
ordered: false,
on_cycle: OnCycle::Fail,
max_parallel,
workflow: workflow.to_string(),
inputs: HashMap::new(),
on_child_fail,
}
}
pub fn ordered_foreach_node(
name: &str,
provider: &str,
workflow: &str,
max_parallel: u32,
on_child_fail: OnChildFail,
) -> ForEachNode {
ForEachNode {
ordered: true,
..foreach_node(name, provider, workflow, max_parallel, on_child_fail)
}
}
fn make_foreach_state_inner<R, P>(
wf_name: &str,
persistence: Arc<InMemoryWorkflowPersistence>,
child_runner: R,
provider: P,
) -> ExecutionState
where
R: ChildWorkflowRunner + 'static,
P: ItemProvider + 'static,
{
let mut state = make_state(wf_name, Arc::clone(&persistence), HashMap::new());
state.child_runner = Some(Arc::new(child_runner));
state.exec_config.fail_fast = false;
let mut registry = ItemProviderRegistry::new();
registry.register(provider);
state.registry = Arc::new(registry);
state
}
pub fn make_foreach_state<P: ItemProvider + 'static>(
wf_name: &str,
persistence: Arc<InMemoryWorkflowPersistence>,
child_runner: MockChildRunner,
provider: P,
) -> ExecutionState {
make_foreach_state_inner(wf_name, persistence, child_runner, provider)
}
pub fn make_foreach_state_cancellable(
wf_name: &str,
persistence: Arc<InMemoryWorkflowPersistence>,
child_runner: CancellingMockRunner,
provider: MockItemProvider,
cancellation: CancellationToken,
) -> ExecutionState {
let mut state = make_foreach_state_inner(wf_name, persistence, child_runner, provider);
state.cancellation = cancellation;
state
}
pub struct MockOrderedItemProvider {
name: String,
items: Vec<(String, String, String)>,
deps: Vec<(String, String)>,
}
impl MockOrderedItemProvider {
pub fn new(name: &str, items: Vec<(&str, &str, &str)>, deps: Vec<(&str, &str)>) -> Self {
Self {
name: name.to_string(),
items: items
.into_iter()
.map(|(t, i, r)| (t.to_string(), i.to_string(), r.to_string()))
.collect(),
deps: deps
.into_iter()
.map(|(a, b)| (a.to_string(), b.to_string()))
.collect(),
}
}
}
impl ItemProvider for MockOrderedItemProvider {
fn name(&self) -> &str {
&self.name
}
fn items(
&self,
_ctx: &dyn RunContext,
_info: &ProviderInfo,
_scope: Option<&dyn std::any::Any>,
_filter: &HashMap<String, String>,
) -> Result<Vec<FanOutItem>, EngineError> {
Ok(self
.items
.iter()
.map(|(t, i, r)| FanOutItem {
item_type: t.clone(),
item_id: i.clone(),
item_ref: r.clone(),
context: HashMap::new(),
})
.collect())
}
fn dependencies(&self, _step_id: &str) -> Result<Vec<(String, String)>, EngineError> {
Ok(self.deps.clone())
}
fn supports_ordered(&self) -> bool {
true
}
}
pub struct FailingOrderedItemProvider {
inner: MockOrderedItemProvider,
}
impl FailingOrderedItemProvider {
pub fn new(name: &str, items: Vec<(&str, &str, &str)>) -> Self {
Self {
inner: MockOrderedItemProvider::new(name, items, vec![]),
}
}
}
impl ItemProvider for FailingOrderedItemProvider {
fn name(&self) -> &str {
self.inner.name()
}
fn items(
&self,
ctx: &dyn RunContext,
info: &ProviderInfo,
scope: Option<&dyn std::any::Any>,
filter: &HashMap<String, String>,
) -> Result<Vec<FanOutItem>, EngineError> {
self.inner.items(ctx, info, scope, filter)
}
fn dependencies(&self, _step_id: &str) -> Result<Vec<(String, String)>, EngineError> {
Err(EngineError::Workflow(
"injected dependency fetch failure".to_string(),
))
}
fn supports_ordered(&self) -> bool {
self.inner.supports_ordered()
}
}
pub struct CancellingMockRunner {
outcomes: HashMap<String, bool>,
cancel_after: usize,
call_count: Mutex<usize>,
token: CancellationToken,
}
impl CancellingMockRunner {
pub fn new(
outcomes: HashMap<String, bool>,
cancel_after: usize,
token: CancellationToken,
) -> Self {
Self {
outcomes,
cancel_after,
call_count: Mutex::new(0),
token,
}
}
}
impl ChildWorkflowRunner for CancellingMockRunner {
fn execute_child(
&self,
workflow_name: &str,
_parent_ctx: &runkon_flow::engine::ChildWorkflowContext,
params: ChildWorkflowInput,
) -> runkon_flow::engine_error::Result<WorkflowResult> {
let item_id = params.inputs.get("item.id").cloned().unwrap_or_default();
let mut count = self.call_count.lock().unwrap();
*count += 1;
if *count >= self.cancel_after {
self.token.cancel(CancellationReason::UserRequested(None));
}
let succeeded = self.outcomes.get(&item_id).copied().unwrap_or(true);
Ok(mock_workflow_result(&item_id, workflow_name, succeeded))
}
fn resume_child(
&self,
_workflow_run_id: &str,
_model: Option<&str>,
_parent_ctx: &runkon_flow::engine::ChildWorkflowContext,
) -> runkon_flow::engine_error::Result<WorkflowResult> {
unimplemented!("CancellingMockRunner does not support resume_child")
}
fn find_resumable_child(
&self,
_parent_run_id: &str,
_workflow_name: &str,
) -> runkon_flow::engine_error::Result<Option<runkon_flow::types::WorkflowRun>> {
Ok(None)
}
}