import { workflow_select_stage_artifacts } from "std/workflow/context"
fn __map_list(value) {
if type_of(value) == "list" {
return value
}
return []
}
fn __map_string(value, fallback = "") {
if type_of(value) == "string" {
return value
}
return fallback
}
fn __map_int(value, fallback = 0) {
if type_of(value) == "int" {
return value
}
return fallback
}
fn __map_count(value) {
if type_of(value) == "int" {
return max(value, 0)
}
if type_of(value) == "list" {
return len(value)
}
return 0
}
fn __map_take(values, max_items) {
if type_of(max_items) != "int" {
return values
}
return values.take(max(max_items, 0))
}
fn __map_filter_item_kind(artifacts, item_kind) {
if type_of(item_kind) != "string" || trim(item_kind) == "" {
return artifacts
}
var out = []
for artifact in artifacts {
if artifact?.kind == item_kind {
out = out.push(artifact)
}
}
return out
}
fn __map_artifact_ids(artifacts) {
var ids = []
for artifact in artifacts {
if type_of(artifact?.id) == "string" {
ids = ids.push(artifact.id)
}
}
return ids
}
fn __map_item_lineage(items) {
var ids = []
for item in items {
if item?.kind == "artifact" && type_of(item?.artifact?.id) == "string" {
ids = ids.push(item.artifact.id)
}
}
return ids
}
fn __map_item_index(item) {
if type_of(item?.index) == "int" {
return item.index
}
return 0
}
fn __map_join_target(plan) {
let total = len(__map_list(plan?.items))
if total == 0 {
return 0
}
if type_of(plan?.join_target) == "int" {
return min(total, max(plan.join_target, 1))
}
return total
}
fn __map_execute_branch(node_id, plan, opts, item) {
let branch_artifact = __host_workflow_map_branch_artifact(node_id, item, __map_list(plan?.lineage))
return __host_workflow_map_execute_branch(node_id, plan, item, branch_artifact, opts ?? {})
}
fn __map_collect_branch_outcomes(branches, join_target) {
var completed = []
var failures = []
var produced = []
if join_target == 0 {
return {completed: completed, failures: failures, produced: produced}
}
var observed = 0
for result in branches {
observed = observed + 1
if result?.ok {
let branch = result.branch
if branch?.status == "completed" && branch?.error == nil {
let artifacts = __map_list(branch?.artifacts)
for artifact in artifacts {
produced = produced.push(artifact)
}
completed = completed
.push(
{
index: __map_item_index(branch),
status: branch.status,
result: branch?.result,
artifact_count: len(artifacts),
},
)
} else {
failures = failures
.push(
{index: __map_item_index(branch), status: branch?.status ?? "failed", error: branch?.error},
)
}
} else {
failures = failures.push({status: "failed", error: result?.error})
}
if observed >= join_target {
break
}
}
return {completed: completed, failures: failures, produced: produced}
}
fn __map_artifact_items(artifacts) {
var items = []
for (index, artifact) in iter(artifacts).enumerate() {
items = items.push({kind: "artifact", index: index, artifact: artifact})
}
return items
}
fn __map_value_items(values, artifact_kind) {
var items = []
for (index, value) in iter(values).enumerate() {
items = items.push({kind: "value", index: index, value: value, artifact_kind: artifact_kind})
}
return items
}
fn __map_has_tool_names(value) {
if type_of(value) == "list" {
for item in value {
if type_of(item?.name) == "string" && trim(item.name) != "" {
return true
}
}
return false
}
if type_of(value) == "dict" {
if value?._type == "tool_registry" {
return __map_has_tool_names(value?.tools)
}
return type_of(value?.name) == "string" && trim(value.name) != ""
}
return false
}
fn __map_has_model_policy(policy) {
if type_of(policy) != "dict" {
return false
}
for key in [
"provider",
"model",
"model_tier",
"temperature",
"max_tokens",
"max_iterations",
"max_nudges",
"nudge",
"tool_examples",
"stop_after_successful_tools",
"require_successful_tools",
"turn_policy",
"tool_format",
] {
if policy[key] != nil {
return true
}
}
return false
}
fn __map_output_kind(node) {
let explicit = node?.map_policy?.output_kind
if type_of(explicit) == "string" && trim(explicit) != "" {
return explicit
}
let output_kinds = __map_list(node?.output_contract?.output_kinds)
if len(output_kinds) > 0 && type_of(output_kinds[0]) == "string" && trim(output_kinds[0]) != "" {
return output_kinds[0]
}
return "artifact"
}
/** workflow_map_stage_plan. */
pub fn workflow_map_stage_plan(config = nil) {
let node = config?.node ?? {}
let output_kind = __map_output_kind(node)
var runs_stage = false
if node?.mode != nil || node?.prompt != nil || node?.system != nil {
runs_stage = true
}
if __map_has_tool_names(node?.tools) || __map_has_model_policy(node?.model_policy) {
runs_stage = true
}
if !runs_stage {
return {runs_stage: false, output_kind: output_kind, stage_node: nil}
}
let output_contract = node?.output_contract ?? {} + {output_kinds: [output_kind]}
return {
runs_stage: true,
output_kind: output_kind,
stage_node: node + {kind: "stage", map_policy: {}, join_policy: {}, output_contract: output_contract},
}
}
/** workflow_map_work_items. */
pub fn workflow_map_work_items(config = nil) {
let node = config?.node ?? {}
let map_policy = node?.map_policy ?? {}
let selected = workflow_select_stage_artifacts(
{
artifacts: __map_list(config?.artifacts),
context_policy: node?.context_policy ?? {},
input_contract: node?.input_contract ?? {},
},
).artifacts
let item_kind = map_policy?.item_artifact_kind
let max_items = map_policy?.max_items
let inputs = __map_take(__map_filter_item_kind(selected, item_kind), max_items)
let selected_artifact_ids = __map_artifact_ids(selected)
let explicit_items = __map_take(__map_list(map_policy?.items), max_items)
if len(explicit_items) > 0 {
return {
items: __map_value_items(explicit_items, __map_string(item_kind, "artifact")),
selected_artifact_ids: selected_artifact_ids,
}
}
return {items: __map_artifact_items(inputs), selected_artifact_ids: __map_artifact_ids(inputs)}
}
/** workflow_map_execution_plan. */
pub fn workflow_map_execution_plan(config = nil) {
let node = config?.node ?? {}
let stage_plan = workflow_map_stage_plan({node: node})
let work = workflow_map_work_items({node: node, artifacts: config?.artifacts ?? []})
let items = __map_list(work?.items)
let strategy = __map_string(node?.join_policy?.strategy, "all")
return {
items: items,
total_items: len(items),
strategy: if trim(strategy) == "" {
"all"
} else {
strategy
},
join_target: workflow_map_join_target({join_policy: node?.join_policy ?? {}, total: len(items)}),
max_concurrent: node?.map_policy?.max_concurrent,
stage_node: stage_plan.stage_node,
output_kind: stage_plan.output_kind,
lineage: __map_item_lineage(items),
}
}
/** workflow_map_join_target. */
pub fn workflow_map_join_target(config = nil) {
let total = max(__map_int(config?.total, 0), 0)
if total == 0 {
return 0
}
let join_policy = config?.join_policy ?? {}
let strategy = __map_string(join_policy?.strategy, "all")
if strategy == "first" {
return min(total, 1)
}
if strategy == "quorum" {
return min(total, max(__map_int(join_policy?.min_completed, 1), 1))
}
return total
}
/** workflow_map_finalize. */
pub fn workflow_map_finalize(config = nil) {
let strategy = __map_string(config?.strategy, "all")
let total_items = max(__map_int(config?.total_items, 0), 0)
let produced_count = __map_count(config?.produced_count ?? config?.produced)
let completed = __map_list(config?.completed)
let failures = __map_list(config?.failures)
let status = if len(failures) == 0 {
"completed"
} else {
if produced_count == 0 {
"failed"
} else {
"partial"
}
}
let text = if status == "failed" {
"map failed after " + to_string(len(failures)) + " branch failures"
} else {
"mapped " + to_string(produced_count) + " of " + to_string(total_items) + " items"
}
let branch = if status == "failed" {
"failed"
} else {
"mapped"
}
return {
result: {status: status, text: text, join_strategy: strategy, completed: completed, failures: failures},
outcome: "mapped",
branch: branch,
}
}
/** workflow_execute_map_stage. */
pub fn workflow_execute_map_stage(node_id, node, artifacts = nil, opts = nil) {
let plan = __host_workflow_map_plan(node, __map_list(artifacts))
let branches = parallel each plan.items with { max_concurrent: plan.max_concurrent } { item ->
try {
{ok: true, branch: __map_execute_branch(node_id, plan, opts, item)}
} catch (e) {
{ok: false, error: to_string(e)}
}
} as stream
let collected = __map_collect_branch_outcomes(branches, __map_join_target(plan))
let finalized = __host_workflow_map_finalize(
plan.strategy,
plan.total_items,
collected.completed,
collected.failures,
collected.produced,
)
return {
result: finalized.result,
artifacts: collected.produced,
outcome: finalized.outcome,
branch: finalized.branch,
}
}