harn-stdlib 0.8.2

Embedded Harn standard library source catalog
Documentation
// @harn-entrypoint-category workflow.stdlib
import { agent_loop } from "std/agent/loop"
import { workflow_execute_map_stage } from "std/workflow/map"
import "std/workflow/prompts"
import { workflow_join_readiness, workflow_next_edges } from "std/workflow/schedule"

fn __workflow_execute_list(value) {
  if type_of(value) == "list" {
    return value
  }
  return []
}

fn __workflow_execute_enqueue_unique(queue, node_id) {
  for queued in __workflow_execute_list(queue) {
    if queued == node_id {
      return queue
    }
  }
  return queue.push(node_id)
}

fn __workflow_execute_pop_front(queue) {
  if len(queue) <= 1 {
    return []
  }
  return queue.slice(1, len(queue))
}

fn __workflow_execute_enqueue_edges(queue, edges) {
  var next = queue
  for edge in __workflow_execute_list(edges) {
    if type_of(edge?.to) == "string" {
      next = __workflow_execute_enqueue_unique(next, edge.to)
    }
  }
  return next
}

/** workflow_execute. */
pub fn workflow_execute(task, graph, artifacts = nil, options = nil) {
  let opts = options ?? {}
  var state = __host_workflow_prepare_run(task, graph, artifacts ?? [], opts)
  let state_id = state.state_id
  let workflow_graph = state.graph
  var ready = __workflow_execute_list(state?.ready_nodes)
  var steps = 0
  while steps < state.max_steps && len(ready) > 0 {
    steps = steps + 1
    let current = ready[0]
    ready = __workflow_execute_pop_front(ready)
    let node = workflow_graph.nodes[current]
    if node?.kind == "join" {
      let readiness = workflow_join_readiness(
        {graph: workflow_graph, node_id: current, node: node, completed_nodes: state.completed_nodes},
      )
      if !readiness.ready {
        ready = __workflow_execute_enqueue_unique(ready, current)
        continue
      }
    }
    let plan = __host_workflow_stage_prepare(state_id, current, ready, opts)
    let llm_result = if contains(plan.keys(), "result") {
      plan.result
    } else {
      try {
        if plan?.run_map_stage {
          workflow_execute_map_stage(current, plan.node, plan.artifacts, plan.map_options)
        } else {
          if plan.run_agent_loop {
            agent_loop(plan.prompt, plan.system, plan.agent_loop_options)
          } else {
            llm_call(plan.prompt, plan.system, plan.llm_options)
          }
        }
      } catch (e) {
        {__workflow_stage_error: true, status: "failed", text: "", visible_text: "", error: to_string(e)}
      }
    }
    let executed = __host_workflow_stage_complete(state_id, current, llm_result)
    state = executed.state
    let edges = workflow_next_edges({graph: workflow_graph, current: current, branch: executed.branch})
    ready = __workflow_execute_enqueue_edges(__workflow_execute_list(state?.ready_nodes), edges)
    state = __host_workflow_record_transitions(state_id, ready, executed.stage, edges)
    ready = __workflow_execute_list(state?.ready_nodes)
  }
  return __host_workflow_finalize_run(state_id, ready)
}