harn-stdlib 0.8.11

Embedded Harn standard library source catalog
Documentation
import { workflow_select_stage_artifacts } from "std/workflow/context"

fn __map_list(value) {
  if type_of(value) == "list" {
    return value
  }
  return []
}

fn __map_string(value, fallback = "") {
  if type_of(value) == "string" {
    return value
  }
  return fallback
}

fn __map_int(value, fallback = 0) {
  if type_of(value) == "int" {
    return value
  }
  return fallback
}

fn __map_count(value) {
  if type_of(value) == "int" {
    return max(value, 0)
  }
  if type_of(value) == "list" {
    return len(value)
  }
  return 0
}

fn __map_take(values, max_items) {
  if type_of(max_items) != "int" {
    return values
  }
  return values.take(max(max_items, 0))
}

fn __map_filter_item_kind(artifacts, item_kind) {
  if type_of(item_kind) != "string" || trim(item_kind) == "" {
    return artifacts
  }
  var out = []
  for artifact in artifacts {
    if artifact?.kind == item_kind {
      out = out.push(artifact)
    }
  }
  return out
}

fn __map_artifact_ids(artifacts) {
  var ids = []
  for artifact in artifacts {
    if type_of(artifact?.id) == "string" {
      ids = ids.push(artifact.id)
    }
  }
  return ids
}

fn __map_item_lineage(items) {
  var ids = []
  for item in items {
    if item?.kind == "artifact" && type_of(item?.artifact?.id) == "string" {
      ids = ids.push(item.artifact.id)
    }
  }
  return ids
}

fn __map_item_index(item) {
  if type_of(item?.index) == "int" {
    return item.index
  }
  return 0
}

fn __map_join_target(plan) {
  let total = len(__map_list(plan?.items))
  if total == 0 {
    return 0
  }
  if type_of(plan?.join_target) == "int" {
    return min(total, max(plan.join_target, 1))
  }
  return total
}

fn __map_execute_branch(node_id, plan, opts, item) {
  let branch_artifact = __host_workflow_map_branch_artifact(node_id, item, __map_list(plan?.lineage))
  return __host_workflow_map_execute_branch(node_id, plan, item, branch_artifact, opts ?? {})
}

fn __map_collect_branch_outcomes(branches, join_target) {
  var completed = []
  var failures = []
  var produced = []
  if join_target == 0 {
    return {completed: completed, failures: failures, produced: produced}
  }
  var observed = 0
  for result in branches {
    observed = observed + 1
    if result?.ok {
      let branch = result.branch
      if branch?.status == "completed" && branch?.error == nil {
        let artifacts = __map_list(branch?.artifacts)
        for artifact in artifacts {
          produced = produced.push(artifact)
        }
        completed = completed
          .push(
          {
            index: __map_item_index(branch),
            status: branch.status,
            result: branch?.result,
            artifact_count: len(artifacts),
          },
        )
      } else {
        failures = failures
          .push(
          {index: __map_item_index(branch), status: branch?.status ?? "failed", error: branch?.error},
        )
      }
    } else {
      failures = failures.push({status: "failed", error: result?.error})
    }
    if observed >= join_target {
      break
    }
  }
  return {completed: completed, failures: failures, produced: produced}
}

fn __map_artifact_items(artifacts) {
  var items = []
  for (index, artifact) in iter(artifacts).enumerate() {
    items = items.push({kind: "artifact", index: index, artifact: artifact})
  }
  return items
}

fn __map_value_items(values, artifact_kind) {
  var items = []
  for (index, value) in iter(values).enumerate() {
    items = items.push({kind: "value", index: index, value: value, artifact_kind: artifact_kind})
  }
  return items
}

fn __map_has_tool_names(value) {
  if type_of(value) == "list" {
    for item in value {
      if type_of(item?.name) == "string" && trim(item.name) != "" {
        return true
      }
    }
    return false
  }
  if type_of(value) == "dict" {
    if value?._type == "tool_registry" {
      return __map_has_tool_names(value?.tools)
    }
    return type_of(value?.name) == "string" && trim(value.name) != ""
  }
  return false
}

fn __map_has_model_policy(policy) {
  if type_of(policy) != "dict" {
    return false
  }
  for key in [
    "provider",
    "model",
    "model_tier",
    "temperature",
    "max_tokens",
    "max_iterations",
    "iteration_budget",
    "max_nudges",
    "nudge",
    "tool_examples",
    "stop_after_successful_tools",
    "require_successful_tools",
    "turn_policy",
    "tool_format",
  ] {
    if policy[key] != nil {
      return true
    }
  }
  return false
}

fn __map_output_kind(node) {
  let explicit = node?.map_policy?.output_kind
  if type_of(explicit) == "string" && trim(explicit) != "" {
    return explicit
  }
  let output_kinds = __map_list(node?.output_contract?.output_kinds)
  if len(output_kinds) > 0 && type_of(output_kinds[0]) == "string" && trim(output_kinds[0]) != "" {
    return output_kinds[0]
  }
  return "artifact"
}

/** workflow_map_stage_plan. */
pub fn workflow_map_stage_plan(config = nil) {
  let node = config?.node ?? {}
  let output_kind = __map_output_kind(node)
  var runs_stage = false
  if node?.mode != nil || node?.prompt != nil || node?.system != nil {
    runs_stage = true
  }
  if __map_has_tool_names(node?.tools) || __map_has_model_policy(node?.model_policy) {
    runs_stage = true
  }
  if !runs_stage {
    return {runs_stage: false, output_kind: output_kind, stage_node: nil}
  }
  let output_contract = node?.output_contract ?? {} + {output_kinds: [output_kind]}
  return {
    runs_stage: true,
    output_kind: output_kind,
    stage_node: node + {kind: "stage", map_policy: {}, join_policy: {}, output_contract: output_contract},
  }
}

/** workflow_map_work_items. */
pub fn workflow_map_work_items(config = nil) {
  let node = config?.node ?? {}
  let map_policy = node?.map_policy ?? {}
  let selected = workflow_select_stage_artifacts(
    {
      artifacts: __map_list(config?.artifacts),
      context_policy: node?.context_policy ?? {},
      input_contract: node?.input_contract ?? {},
    },
  ).artifacts
  let item_kind = map_policy?.item_artifact_kind
  let max_items = map_policy?.max_items
  let inputs = __map_take(__map_filter_item_kind(selected, item_kind), max_items)
  let selected_artifact_ids = __map_artifact_ids(selected)
  let explicit_items = __map_take(__map_list(map_policy?.items), max_items)
  if len(explicit_items) > 0 {
    return {
      items: __map_value_items(explicit_items, __map_string(item_kind, "artifact")),
      selected_artifact_ids: selected_artifact_ids,
    }
  }
  return {items: __map_artifact_items(inputs), selected_artifact_ids: __map_artifact_ids(inputs)}
}

/** workflow_map_execution_plan. */
pub fn workflow_map_execution_plan(config = nil) {
  let node = config?.node ?? {}
  let stage_plan = workflow_map_stage_plan({node: node})
  let work = workflow_map_work_items({node: node, artifacts: config?.artifacts ?? []})
  let items = __map_list(work?.items)
  let strategy = __map_string(node?.join_policy?.strategy, "all")
  return {
    items: items,
    total_items: len(items),
    strategy: if trim(strategy) == "" {
      "all"
    } else {
      strategy
    },
    join_target: workflow_map_join_target({join_policy: node?.join_policy ?? {}, total: len(items)}),
    max_concurrent: node?.map_policy?.max_concurrent,
    stage_node: stage_plan.stage_node,
    output_kind: stage_plan.output_kind,
    lineage: __map_item_lineage(items),
  }
}

/** workflow_map_join_target. */
pub fn workflow_map_join_target(config = nil) {
  let total = max(__map_int(config?.total, 0), 0)
  if total == 0 {
    return 0
  }
  let join_policy = config?.join_policy ?? {}
  let strategy = __map_string(join_policy?.strategy, "all")
  if strategy == "first" {
    return min(total, 1)
  }
  if strategy == "quorum" {
    return min(total, max(__map_int(join_policy?.min_completed, 1), 1))
  }
  return total
}

/** workflow_map_finalize. */
pub fn workflow_map_finalize(config = nil) {
  let strategy = __map_string(config?.strategy, "all")
  let total_items = max(__map_int(config?.total_items, 0), 0)
  let produced_count = __map_count(config?.produced_count ?? config?.produced)
  let completed = __map_list(config?.completed)
  let failures = __map_list(config?.failures)
  let status = if len(failures) == 0 {
    "completed"
  } else {
    if produced_count == 0 {
      "failed"
    } else {
      "partial"
    }
  }
  let text = if status == "failed" {
    "map failed after " + to_string(len(failures)) + " branch failures"
  } else {
    "mapped " + to_string(produced_count) + " of " + to_string(total_items) + " items"
  }
  let branch = if status == "failed" {
    "failed"
  } else {
    "mapped"
  }
  return {
    result: {status: status, text: text, join_strategy: strategy, completed: completed, failures: failures},
    outcome: "mapped",
    branch: branch,
  }
}

/** workflow_execute_map_stage. */
pub fn workflow_execute_map_stage(node_id, node, artifacts = nil, opts = nil) {
  let plan = __host_workflow_map_plan(node, __map_list(artifacts))
  let branches = parallel each plan.items with { max_concurrent: plan.max_concurrent } { item ->
    try {
      {ok: true, branch: __map_execute_branch(node_id, plan, opts, item)}
    } catch (e) {
      {ok: false, error: to_string(e)}
    }
  } as stream
  let collected = __map_collect_branch_outcomes(branches, __map_join_target(plan))
  let finalized = __host_workflow_map_finalize(
    plan.strategy,
    plan.total_items,
    collected.completed,
    collected.failures,
    collected.produced,
  )
  return {
    result: finalized.result,
    artifacts: collected.produced,
    outcome: finalized.outcome,
    branch: finalized.branch,
  }
}