// @harn-entrypoint-category workflow.stdlib
import { agent_loop } from "std/agent/loop"
import { workflow_execute_map_stage } from "std/workflow/map"
import "std/workflow/prompts"
import { workflow_join_readiness, workflow_next_edges } from "std/workflow/schedule"
fn __workflow_execute_list(value) {
if type_of(value) == "list" {
return value
}
return []
}
fn __workflow_execute_enqueue_unique(queue, node_id) {
for queued in __workflow_execute_list(queue) {
if queued == node_id {
return queue
}
}
return queue.push(node_id)
}
fn __workflow_execute_pop_front(queue) {
if len(queue) <= 1 {
return []
}
return queue.slice(1, len(queue))
}
fn __workflow_execute_enqueue_edges(queue, edges) {
var next = queue
for edge in __workflow_execute_list(edges) {
if type_of(edge?.to) == "string" {
next = __workflow_execute_enqueue_unique(next, edge.to)
}
}
return next
}
/** workflow_execute. */
pub fn workflow_execute(task, graph, artifacts = nil, options = nil) {
let opts = options ?? {}
var state = __host_workflow_prepare_run(task, graph, artifacts ?? [], opts)
let state_id = state.state_id
let workflow_graph = state.graph
var ready = __workflow_execute_list(state?.ready_nodes)
var steps = 0
while steps < state.max_steps && len(ready) > 0 {
steps = steps + 1
let current = ready[0]
ready = __workflow_execute_pop_front(ready)
let node = workflow_graph.nodes[current]
if node?.kind == "join" {
let readiness = workflow_join_readiness(
{graph: workflow_graph, node_id: current, node: node, completed_nodes: state.completed_nodes},
)
if !readiness.ready {
ready = __workflow_execute_enqueue_unique(ready, current)
continue
}
}
let plan = __host_workflow_stage_prepare(state_id, current, ready, opts)
let llm_result = if contains(plan.keys(), "result") {
plan.result
} else {
try {
if plan?.run_map_stage {
workflow_execute_map_stage(current, plan.node, plan.artifacts, plan.map_options)
} else if plan.run_agent_loop {
agent_loop(plan.prompt, plan.system, plan.agent_loop_options)
} else {
llm_call(plan.prompt, plan.system, plan.llm_options)
}
} catch (e) {
{__workflow_stage_error: true, status: "failed", text: "", visible_text: "", error: to_string(e)}
}
}
let executed = __host_workflow_stage_complete(state_id, current, llm_result)
state = executed.state
let edges = workflow_next_edges({graph: workflow_graph, current: current, branch: executed.branch})
ready = __workflow_execute_enqueue_edges(__workflow_execute_list(state?.ready_nodes), edges)
state = __host_workflow_record_transitions(state_id, ready, executed.stage, edges)
ready = __workflow_execute_list(state?.ready_nodes)
}
return __host_workflow_finalize_run(state_id, ready)
}