Skip to main content

harn_vm/orchestration/
workflow.rs

1//! Workflow graph types, normalization, validation, and execution.
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::rc::Rc;
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9    apply_input_transcript_policy, apply_output_transcript_policy, new_id, now_rfc3339,
10    ArtifactRecord, BranchSemantics, CapabilityPolicy, ContextPolicy, EscalationPolicy, JoinPolicy,
11    MapPolicy, ModelPolicy, ReducePolicy, RetryPolicy, StageContract, ToolRuntimePolicyMetadata,
12    TranscriptPolicy,
13};
14use crate::llm::{extract_llm_options, vm_call_llm_full, vm_value_to_json};
15use crate::value::{VmError, VmValue};
16
17#[derive(Clone, Debug, Default, Serialize, Deserialize)]
18#[serde(default)]
19pub struct WorkflowNode {
20    pub id: Option<String>,
21    pub kind: String,
22    pub mode: Option<String>,
23    pub prompt: Option<String>,
24    pub system: Option<String>,
25    pub task_label: Option<String>,
26    pub done_sentinel: Option<String>,
27    pub tools: serde_json::Value,
28    pub model_policy: ModelPolicy,
29    pub transcript_policy: TranscriptPolicy,
30    pub context_policy: ContextPolicy,
31    pub retry_policy: RetryPolicy,
32    pub capability_policy: CapabilityPolicy,
33    pub approval_policy: super::ToolApprovalPolicy,
34    pub input_contract: StageContract,
35    pub output_contract: StageContract,
36    pub branch_semantics: BranchSemantics,
37    pub map_policy: MapPolicy,
38    pub join_policy: JoinPolicy,
39    pub reduce_policy: ReducePolicy,
40    pub escalation_policy: EscalationPolicy,
41    pub verify: Option<serde_json::Value>,
42    /// When true, the stage's agent loop gates the done sentinel on the most
43    /// recent `run()` tool call exiting cleanly (`exit_code == 0`). Use for
44    /// persistent execute stages that fold verification into the loop via a
45    /// shell-exec tool the model invokes explicitly.
46    #[serde(default)]
47    pub exit_when_verified: bool,
48    /// Optional per-stage timeout in milliseconds.  When set, stage execution
49    /// is wrapped in `tokio::time::timeout` and will fail with a timeout error
50    /// if it exceeds the given duration.
51    #[serde(default)]
52    pub timeout_ms: Option<u64>,
53    pub metadata: BTreeMap<String, serde_json::Value>,
54    #[serde(skip)]
55    pub raw_tools: Option<VmValue>,
56    /// Raw transcript_policy VmValue dict — preserved for extracting closure
57    /// fields (compress_callback, mask_callback) that can't go through serde.
58    #[serde(skip)]
59    pub raw_transcript_policy: Option<VmValue>,
60    /// Raw model_policy VmValue dict — preserved for extracting closure
61    /// fields (post_turn_callback) that can't go through serde.
62    #[serde(skip)]
63    pub raw_model_policy: Option<VmValue>,
64}
65
66impl PartialEq for WorkflowNode {
67    fn eq(&self, other: &Self) -> bool {
68        serde_json::to_value(self).ok() == serde_json::to_value(other).ok()
69    }
70}
71
72pub fn workflow_tool_names(value: &serde_json::Value) -> Vec<String> {
73    match value {
74        serde_json::Value::Null => Vec::new(),
75        serde_json::Value::Array(items) => items
76            .iter()
77            .filter_map(|item| match item {
78                serde_json::Value::Object(map) => map
79                    .get("name")
80                    .and_then(|value| value.as_str())
81                    .filter(|name| !name.is_empty())
82                    .map(|name| name.to_string()),
83                _ => None,
84            })
85            .collect(),
86        serde_json::Value::Object(map) => {
87            if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
88                return map
89                    .get("tools")
90                    .map(workflow_tool_names)
91                    .unwrap_or_default();
92            }
93            map.get("name")
94                .and_then(|value| value.as_str())
95                .filter(|name| !name.is_empty())
96                .map(|name| vec![name.to_string()])
97                .unwrap_or_default()
98        }
99        _ => Vec::new(),
100    }
101}
102
103fn max_side_effect_level(levels: impl Iterator<Item = String>) -> Option<String> {
104    fn rank(v: &str) -> usize {
105        match v {
106            "none" => 0,
107            "read_only" => 1,
108            "workspace_write" => 2,
109            "process_exec" => 3,
110            "network" => 4,
111            _ => 5,
112        }
113    }
114    levels.max_by_key(|level| rank(level))
115}
116
117fn parse_tool_runtime_policy(
118    map: &serde_json::Map<String, serde_json::Value>,
119) -> ToolRuntimePolicyMetadata {
120    let Some(policy) = map.get("policy").and_then(|value| value.as_object()) else {
121        return ToolRuntimePolicyMetadata::default();
122    };
123
124    let capabilities = policy
125        .get("capabilities")
126        .and_then(|value| value.as_object())
127        .map(|caps| {
128            caps.iter()
129                .map(|(capability, ops)| {
130                    let values = ops
131                        .as_array()
132                        .map(|items| {
133                            items
134                                .iter()
135                                .filter_map(|item| item.as_str().map(|s| s.to_string()))
136                                .collect::<Vec<_>>()
137                        })
138                        .unwrap_or_default();
139                    (capability.clone(), values)
140                })
141                .collect::<BTreeMap<_, _>>()
142        })
143        .unwrap_or_default();
144
145    let path_params = policy
146        .get("path_params")
147        .and_then(|value| value.as_array())
148        .map(|items| {
149            items
150                .iter()
151                .filter_map(|item| item.as_str().map(|s| s.to_string()))
152                .collect::<Vec<_>>()
153        })
154        .unwrap_or_default();
155
156    ToolRuntimePolicyMetadata {
157        capabilities,
158        side_effect_level: policy
159            .get("side_effect_level")
160            .and_then(|value| value.as_str())
161            .map(|s| s.to_string()),
162        path_params,
163        mutation_classification: policy
164            .get("mutation_classification")
165            .and_then(|value| value.as_str())
166            .map(|s| s.to_string()),
167    }
168}
169
170pub fn workflow_tool_metadata(
171    value: &serde_json::Value,
172) -> BTreeMap<String, ToolRuntimePolicyMetadata> {
173    match value {
174        serde_json::Value::Null => BTreeMap::new(),
175        serde_json::Value::Array(items) => items
176            .iter()
177            .filter_map(|item| match item {
178                serde_json::Value::Object(map) => map
179                    .get("name")
180                    .and_then(|value| value.as_str())
181                    .filter(|name| !name.is_empty())
182                    .map(|name| (name.to_string(), parse_tool_runtime_policy(map))),
183                _ => None,
184            })
185            .collect(),
186        serde_json::Value::Object(map) => {
187            if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
188                return map
189                    .get("tools")
190                    .map(workflow_tool_metadata)
191                    .unwrap_or_default();
192            }
193            map.get("name")
194                .and_then(|value| value.as_str())
195                .filter(|name| !name.is_empty())
196                .map(|name| {
197                    let mut metadata = BTreeMap::new();
198                    metadata.insert(name.to_string(), parse_tool_runtime_policy(map));
199                    metadata
200                })
201                .unwrap_or_default()
202        }
203        _ => BTreeMap::new(),
204    }
205}
206
207pub fn workflow_tool_policy_from_tools(value: &serde_json::Value) -> CapabilityPolicy {
208    let tools = workflow_tool_names(value);
209    let tool_metadata = workflow_tool_metadata(value);
210    let mut capabilities: BTreeMap<String, Vec<String>> = BTreeMap::new();
211    for metadata in tool_metadata.values() {
212        for (capability, ops) in &metadata.capabilities {
213            let entry = capabilities.entry(capability.clone()).or_default();
214            for op in ops {
215                if !entry.contains(op) {
216                    entry.push(op.clone());
217                }
218            }
219            entry.sort();
220        }
221    }
222    let side_effect_level = max_side_effect_level(
223        tool_metadata
224            .values()
225            .filter_map(|metadata| metadata.side_effect_level.clone()),
226    );
227    CapabilityPolicy {
228        tools,
229        capabilities,
230        workspace_roots: Vec::new(),
231        side_effect_level,
232        recursion_limit: None,
233        tool_arg_constraints: Vec::new(),
234        tool_metadata,
235    }
236}
237
238#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
239#[serde(default)]
240pub struct WorkflowEdge {
241    pub from: String,
242    pub to: String,
243    pub branch: Option<String>,
244    pub label: Option<String>,
245}
246
247#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
248#[serde(default)]
249pub struct WorkflowGraph {
250    #[serde(rename = "_type")]
251    pub type_name: String,
252    pub id: String,
253    pub name: Option<String>,
254    pub version: usize,
255    pub entry: String,
256    pub nodes: BTreeMap<String, WorkflowNode>,
257    pub edges: Vec<WorkflowEdge>,
258    pub capability_policy: CapabilityPolicy,
259    pub approval_policy: super::ToolApprovalPolicy,
260    pub metadata: BTreeMap<String, serde_json::Value>,
261    pub audit_log: Vec<WorkflowAuditEntry>,
262}
263
264#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
265#[serde(default)]
266pub struct WorkflowAuditEntry {
267    pub id: String,
268    pub op: String,
269    pub node_id: Option<String>,
270    pub timestamp: String,
271    pub reason: Option<String>,
272    pub metadata: BTreeMap<String, serde_json::Value>,
273}
274
275#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
276#[serde(default)]
277pub struct WorkflowValidationReport {
278    pub valid: bool,
279    pub errors: Vec<String>,
280    pub warnings: Vec<String>,
281    pub reachable_nodes: Vec<String>,
282}
283
284pub fn parse_workflow_node_value(value: &VmValue, label: &str) -> Result<WorkflowNode, VmError> {
285    let mut node: WorkflowNode = super::parse_json_payload(vm_value_to_json(value), label)?;
286    let dict = value.as_dict();
287    node.raw_tools = dict.and_then(|d| d.get("tools")).cloned();
288    node.raw_transcript_policy = dict.and_then(|d| d.get("transcript_policy")).cloned();
289    node.raw_model_policy = dict.and_then(|d| d.get("model_policy")).cloned();
290    Ok(node)
291}
292
293pub fn parse_workflow_node_json(
294    json: serde_json::Value,
295    label: &str,
296) -> Result<WorkflowNode, VmError> {
297    super::parse_json_payload(json, label)
298}
299
300pub fn parse_workflow_edge_json(
301    json: serde_json::Value,
302    label: &str,
303) -> Result<WorkflowEdge, VmError> {
304    super::parse_json_payload(json, label)
305}
306
307pub fn normalize_workflow_value(value: &VmValue) -> Result<WorkflowGraph, VmError> {
308    let mut graph: WorkflowGraph = super::parse_json_value(value)?;
309    let as_dict = value.as_dict().cloned().unwrap_or_default();
310
311    if graph.nodes.is_empty() {
312        for key in ["act", "verify", "repair"] {
313            if let Some(node_value) = as_dict.get(key) {
314                let mut node = parse_workflow_node_value(node_value, "orchestration")?;
315                let raw_node = node_value.as_dict().cloned().unwrap_or_default();
316                node.id = Some(key.to_string());
317                if node.kind.is_empty() {
318                    node.kind = if key == "verify" {
319                        "verify".to_string()
320                    } else {
321                        "stage".to_string()
322                    };
323                }
324                if node.model_policy.provider.is_none() {
325                    node.model_policy.provider = as_dict
326                        .get("provider")
327                        .map(|value| value.display())
328                        .filter(|value| !value.is_empty());
329                }
330                if node.model_policy.model.is_none() {
331                    node.model_policy.model = as_dict
332                        .get("model")
333                        .map(|value| value.display())
334                        .filter(|value| !value.is_empty());
335                }
336                if node.model_policy.model_tier.is_none() {
337                    node.model_policy.model_tier = as_dict
338                        .get("model_tier")
339                        .or_else(|| as_dict.get("tier"))
340                        .map(|value| value.display())
341                        .filter(|value| !value.is_empty());
342                }
343                if node.model_policy.temperature.is_none() {
344                    node.model_policy.temperature = as_dict.get("temperature").and_then(|value| {
345                        if let VmValue::Float(number) = value {
346                            Some(*number)
347                        } else {
348                            value.as_int().map(|number| number as f64)
349                        }
350                    });
351                }
352                if node.model_policy.max_tokens.is_none() {
353                    node.model_policy.max_tokens =
354                        as_dict.get("max_tokens").and_then(|value| value.as_int());
355                }
356                if node.mode.is_none() {
357                    node.mode = as_dict
358                        .get("mode")
359                        .map(|value| value.display())
360                        .filter(|value| !value.is_empty());
361                }
362                if node.done_sentinel.is_none() {
363                    node.done_sentinel = as_dict
364                        .get("done_sentinel")
365                        .map(|value| value.display())
366                        .filter(|value| !value.is_empty());
367                }
368                if key == "verify"
369                    && node.verify.is_none()
370                    && (raw_node.contains_key("assert_text")
371                        || raw_node.contains_key("command")
372                        || raw_node.contains_key("expect_status")
373                        || raw_node.contains_key("expect_text"))
374                {
375                    node.verify = Some(serde_json::json!({
376                        "assert_text": raw_node.get("assert_text").map(vm_value_to_json),
377                        "command": raw_node.get("command").map(vm_value_to_json),
378                        "expect_status": raw_node.get("expect_status").map(vm_value_to_json),
379                        "expect_text": raw_node.get("expect_text").map(vm_value_to_json),
380                    }));
381                }
382                graph.nodes.insert(key.to_string(), node);
383            }
384        }
385        if graph.entry.is_empty() && graph.nodes.contains_key("act") {
386            graph.entry = "act".to_string();
387        }
388        if graph.edges.is_empty() && graph.nodes.contains_key("act") {
389            if graph.nodes.contains_key("verify") {
390                graph.edges.push(WorkflowEdge {
391                    from: "act".to_string(),
392                    to: "verify".to_string(),
393                    branch: None,
394                    label: None,
395                });
396            }
397            if graph.nodes.contains_key("repair") {
398                graph.edges.push(WorkflowEdge {
399                    from: "verify".to_string(),
400                    to: "repair".to_string(),
401                    branch: Some("failed".to_string()),
402                    label: None,
403                });
404                graph.edges.push(WorkflowEdge {
405                    from: "repair".to_string(),
406                    to: "verify".to_string(),
407                    branch: Some("retry".to_string()),
408                    label: None,
409                });
410            }
411        }
412    }
413
414    if graph.type_name.is_empty() {
415        graph.type_name = "workflow_graph".to_string();
416    }
417    if graph.id.is_empty() {
418        graph.id = new_id("workflow");
419    }
420    if graph.version == 0 {
421        graph.version = 1;
422    }
423    if graph.entry.is_empty() {
424        graph.entry = graph
425            .nodes
426            .keys()
427            .next()
428            .cloned()
429            .unwrap_or_else(|| "act".to_string());
430    }
431    for (node_id, node) in &mut graph.nodes {
432        if node.raw_tools.is_none() {
433            node.raw_tools = as_dict
434                .get("nodes")
435                .and_then(|nodes| nodes.as_dict())
436                .and_then(|nodes| nodes.get(node_id))
437                .and_then(|node_value| node_value.as_dict())
438                .and_then(|raw_node| raw_node.get("tools"))
439                .cloned();
440        }
441        if node.id.is_none() {
442            node.id = Some(node_id.clone());
443        }
444        if node.kind.is_empty() {
445            node.kind = "stage".to_string();
446        }
447        if node.join_policy.strategy.is_empty() {
448            node.join_policy.strategy = "all".to_string();
449        }
450        if node.reduce_policy.strategy.is_empty() {
451            node.reduce_policy.strategy = "concat".to_string();
452        }
453        if node.output_contract.output_kinds.is_empty() {
454            node.output_contract.output_kinds = vec![match node.kind.as_str() {
455                "verify" => "verification_result".to_string(),
456                "reduce" => node
457                    .reduce_policy
458                    .output_kind
459                    .clone()
460                    .unwrap_or_else(|| "summary".to_string()),
461                "map" => node
462                    .map_policy
463                    .output_kind
464                    .clone()
465                    .unwrap_or_else(|| "artifact".to_string()),
466                "escalation" => "plan".to_string(),
467                _ => "artifact".to_string(),
468            }];
469        }
470        if node.retry_policy.max_attempts == 0 {
471            node.retry_policy.max_attempts = 1;
472        }
473    }
474    Ok(graph)
475}
476
477pub fn validate_workflow(
478    graph: &WorkflowGraph,
479    ceiling: Option<&CapabilityPolicy>,
480) -> WorkflowValidationReport {
481    let mut errors = Vec::new();
482    let mut warnings = Vec::new();
483
484    if !graph.nodes.contains_key(&graph.entry) {
485        errors.push(format!("entry node does not exist: {}", graph.entry));
486    }
487
488    let node_ids: BTreeSet<String> = graph.nodes.keys().cloned().collect();
489    for edge in &graph.edges {
490        if !node_ids.contains(&edge.from) {
491            errors.push(format!("edge.from references unknown node: {}", edge.from));
492        }
493        if !node_ids.contains(&edge.to) {
494            errors.push(format!("edge.to references unknown node: {}", edge.to));
495        }
496    }
497
498    let reachable_nodes = reachable_nodes(graph);
499    for node_id in &node_ids {
500        if !reachable_nodes.contains(node_id) {
501            warnings.push(format!("node is unreachable: {node_id}"));
502        }
503    }
504
505    for (node_id, node) in &graph.nodes {
506        let incoming = graph
507            .edges
508            .iter()
509            .filter(|edge| edge.to == *node_id)
510            .count();
511        let outgoing: Vec<&WorkflowEdge> = graph
512            .edges
513            .iter()
514            .filter(|edge| edge.from == *node_id)
515            .collect();
516        if let Some(min_inputs) = node.input_contract.min_inputs {
517            if let Some(max_inputs) = node.input_contract.max_inputs {
518                if min_inputs > max_inputs {
519                    errors.push(format!(
520                        "node {node_id}: input contract min_inputs exceeds max_inputs"
521                    ));
522                }
523            }
524        }
525        match node.kind.as_str() {
526            "condition" => {
527                let has_true = outgoing
528                    .iter()
529                    .any(|edge| edge.branch.as_deref() == Some("true"));
530                let has_false = outgoing
531                    .iter()
532                    .any(|edge| edge.branch.as_deref() == Some("false"));
533                if !has_true || !has_false {
534                    errors.push(format!(
535                        "node {node_id}: condition nodes require both 'true' and 'false' branch edges"
536                    ));
537                }
538            }
539            "fork" => {
540                if outgoing.len() < 2 {
541                    errors.push(format!(
542                        "node {node_id}: fork nodes require at least two outgoing edges"
543                    ));
544                }
545            }
546            "join" => {
547                if incoming < 2 {
548                    warnings.push(format!(
549                        "node {node_id}: join node has fewer than two incoming edges"
550                    ));
551                }
552            }
553            "map" => {
554                if node.map_policy.items.is_empty()
555                    && node.map_policy.item_artifact_kind.is_none()
556                    && node.input_contract.input_kinds.is_empty()
557                {
558                    errors.push(format!(
559                        "node {node_id}: map nodes require items, item_artifact_kind, or input_contract.input_kinds"
560                    ));
561                }
562            }
563            "reduce" => {
564                if node.input_contract.input_kinds.is_empty() {
565                    warnings.push(format!(
566                        "node {node_id}: reduce node has no input_contract.input_kinds; it will consume all available artifacts"
567                    ));
568                }
569            }
570            _ => {}
571        }
572    }
573
574    if let Some(ceiling) = ceiling {
575        if let Err(error) = ceiling.intersect(&graph.capability_policy) {
576            errors.push(error);
577        }
578        for (node_id, node) in &graph.nodes {
579            if let Err(error) = ceiling.intersect(&node.capability_policy) {
580                errors.push(format!("node {node_id}: {error}"));
581            }
582        }
583    }
584
585    WorkflowValidationReport {
586        valid: errors.is_empty(),
587        errors,
588        warnings,
589        reachable_nodes: reachable_nodes.into_iter().collect(),
590    }
591}
592
593fn reachable_nodes(graph: &WorkflowGraph) -> BTreeSet<String> {
594    let mut seen = BTreeSet::new();
595    let mut stack = vec![graph.entry.clone()];
596    while let Some(node_id) = stack.pop() {
597        if !seen.insert(node_id.clone()) {
598            continue;
599        }
600        for edge in graph.edges.iter().filter(|edge| edge.from == node_id) {
601            stack.push(edge.to.clone());
602        }
603    }
604    seen
605}
606
607pub async fn execute_stage_node(
608    node_id: &str,
609    node: &WorkflowNode,
610    task: &str,
611    artifacts: &[ArtifactRecord],
612    transcript: Option<VmValue>,
613) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
614    let mut selection_policy = node.context_policy.clone();
615    if selection_policy.include_kinds.is_empty() && !node.input_contract.input_kinds.is_empty() {
616        selection_policy.include_kinds = node.input_contract.input_kinds.clone();
617    }
618    let selected = super::select_artifacts_adaptive(artifacts.to_vec(), &selection_policy);
619    let rendered_context = super::render_artifacts_context(&selected, &node.context_policy);
620    let input_transcript = apply_input_transcript_policy(transcript, &node.transcript_policy);
621    if node.input_contract.require_transcript && input_transcript.is_none() {
622        return Err(VmError::Runtime(format!(
623            "workflow stage {node_id} requires transcript input"
624        )));
625    }
626    if let Some(min_inputs) = node.input_contract.min_inputs {
627        if selected.len() < min_inputs {
628            return Err(VmError::Runtime(format!(
629                "workflow stage {node_id} requires at least {min_inputs} input artifacts"
630            )));
631        }
632    }
633    if let Some(max_inputs) = node.input_contract.max_inputs {
634        if selected.len() > max_inputs {
635            return Err(VmError::Runtime(format!(
636                "workflow stage {node_id} accepts at most {max_inputs} input artifacts"
637            )));
638        }
639    }
640    let prompt = if rendered_context.is_empty() {
641        task.to_string()
642    } else {
643        format!(
644            "{rendered_context}\n\n{}:\n{task}",
645            node.task_label
646                .clone()
647                .unwrap_or_else(|| "Task".to_string())
648        )
649    };
650
651    let tool_format = std::env::var("HARN_AGENT_TOOL_FORMAT")
652        .ok()
653        .filter(|value| !value.trim().is_empty())
654        .unwrap_or_else(|| {
655            // Auto-detect from model/provider alias config.
656            let model = std::env::var("HARN_LLM_MODEL").unwrap_or_default();
657            let provider = std::env::var("HARN_LLM_PROVIDER").unwrap_or_default();
658            crate::llm_config::default_tool_format(&model, &provider)
659        });
660    let mut llm_result = if node.kind == "verify" {
661        if let Some(command) = node
662            .verify
663            .as_ref()
664            .and_then(|verify| verify.as_object())
665            .and_then(|verify| verify.get("command"))
666            .and_then(|value| value.as_str())
667            .map(str::trim)
668            .filter(|value| !value.is_empty())
669        {
670            let mut process = if cfg!(target_os = "windows") {
671                let mut cmd = tokio::process::Command::new("cmd");
672                cmd.arg("/C").arg(command);
673                cmd
674            } else {
675                let mut cmd = tokio::process::Command::new("/bin/sh");
676                cmd.arg("-lc").arg(command);
677                cmd
678            };
679            process.stdin(std::process::Stdio::null());
680            if let Some(context) = crate::stdlib::process::current_execution_context() {
681                if let Some(cwd) = context.cwd.filter(|cwd| !cwd.is_empty()) {
682                    process.current_dir(cwd);
683                }
684                if !context.env.is_empty() {
685                    process.envs(context.env);
686                }
687            }
688            let output = process
689                .output()
690                .await
691                .map_err(|e| VmError::Runtime(format!("workflow verify exec failed: {e}")))?;
692            let stdout = String::from_utf8_lossy(&output.stdout).to_string();
693            let stderr = String::from_utf8_lossy(&output.stderr).to_string();
694            let combined = if stderr.is_empty() {
695                stdout.clone()
696            } else if stdout.is_empty() {
697                stderr.clone()
698            } else {
699                format!("{stdout}\n{stderr}")
700            };
701            serde_json::json!({
702                "status": "completed",
703                "text": combined,
704                "visible_text": combined,
705                "command": command,
706                "stdout": stdout,
707                "stderr": stderr,
708                "exit_status": output.status.code().unwrap_or(-1),
709                "success": output.status.success(),
710            })
711        } else {
712            serde_json::json!({
713                "status": "completed",
714                "text": "",
715                "visible_text": "",
716            })
717        }
718    } else {
719        let mut options = BTreeMap::new();
720        if let Some(provider) = &node.model_policy.provider {
721            options.insert(
722                "provider".to_string(),
723                VmValue::String(Rc::from(provider.clone())),
724            );
725        }
726        if let Some(model) = &node.model_policy.model {
727            options.insert(
728                "model".to_string(),
729                VmValue::String(Rc::from(model.clone())),
730            );
731        }
732        if let Some(model_tier) = &node.model_policy.model_tier {
733            options.insert(
734                "model_tier".to_string(),
735                VmValue::String(Rc::from(model_tier.clone())),
736            );
737        }
738        if let Some(temperature) = node.model_policy.temperature {
739            options.insert("temperature".to_string(), VmValue::Float(temperature));
740        }
741        if let Some(max_tokens) = node.model_policy.max_tokens {
742            options.insert("max_tokens".to_string(), VmValue::Int(max_tokens));
743        }
744        let tool_names = workflow_tool_names(&node.tools);
745        let tools_value = node.raw_tools.clone().or_else(|| {
746            if matches!(node.tools, serde_json::Value::Null) {
747                None
748            } else {
749                Some(crate::stdlib::json_to_vm_value(&node.tools))
750            }
751        });
752        if tools_value.is_some() && !tool_names.is_empty() {
753            options.insert("tools".to_string(), tools_value.unwrap_or(VmValue::Nil));
754        }
755        if let Some(transcript) = input_transcript.clone() {
756            options.insert("transcript".to_string(), transcript);
757        }
758
759        let args = vec![
760            VmValue::String(Rc::from(prompt.clone())),
761            node.system
762                .clone()
763                .map(|s| VmValue::String(Rc::from(s)))
764                .unwrap_or(VmValue::Nil),
765            VmValue::Dict(Rc::new(options)),
766        ];
767        let mut opts = extract_llm_options(&args)?;
768
769        if node.mode.as_deref() == Some("agent") || !tool_names.is_empty() {
770            let tool_policy = workflow_tool_policy_from_tools(&node.tools);
771            let effective_policy = tool_policy
772                .intersect(&node.capability_policy)
773                .map_err(VmError::Runtime)?;
774            // Build auto-compact config from transcript_policy fields
775            let auto_compact = if node.transcript_policy.auto_compact {
776                let mut ac = crate::orchestration::AutoCompactConfig::default();
777                if let Some(v) = node.transcript_policy.compact_threshold {
778                    ac.token_threshold = v;
779                }
780                if let Some(v) = node.transcript_policy.tool_output_max_chars {
781                    ac.tool_output_max_chars = v;
782                }
783                if let Some(ref strategy) = node.transcript_policy.compact_strategy {
784                    if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
785                        ac.compact_strategy = s;
786                    }
787                }
788                if let Some(v) = node.transcript_policy.hard_limit_tokens {
789                    ac.hard_limit_tokens = Some(v);
790                }
791                if let Some(ref strategy) = node.transcript_policy.hard_limit_strategy {
792                    if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
793                        ac.hard_limit_strategy = s;
794                    }
795                }
796                // Extract closure fields from raw transcript_policy dict
797                if let Some(ref raw_tp) = node.raw_transcript_policy {
798                    if let Some(dict) = raw_tp.as_dict() {
799                        if let Some(cb) = dict.get("compress_callback") {
800                            ac.compress_callback = Some(cb.clone());
801                        }
802                        if let Some(cb) = dict.get("mask_callback") {
803                            ac.mask_callback = Some(cb.clone());
804                        }
805                    }
806                }
807                // Adapt thresholds to provider context window
808                {
809                    let user_specified_threshold =
810                        node.transcript_policy.compact_threshold.is_some();
811                    let user_specified_hard_limit =
812                        node.transcript_policy.hard_limit_tokens.is_some();
813                    crate::llm::api::adapt_auto_compact_to_provider(
814                        &mut ac,
815                        user_specified_threshold,
816                        user_specified_hard_limit,
817                        &opts.provider,
818                        &opts.model,
819                        &opts.api_key,
820                    )
821                    .await;
822                }
823                Some(ac)
824            } else {
825                None
826            };
827            crate::llm::run_agent_loop_internal(
828                &mut opts,
829                crate::llm::AgentLoopConfig {
830                    persistent: true,
831                    max_iterations: node.model_policy.max_iterations.unwrap_or(16),
832                    max_nudges: node.model_policy.max_nudges.unwrap_or(3),
833                    nudge: node.model_policy.nudge.clone(),
834                    done_sentinel: node.done_sentinel.clone(),
835                    break_unless_phase: None,
836                    tool_retries: 0,
837                    tool_backoff_ms: 1000,
838                    tool_format: tool_format.clone(),
839                    auto_compact,
840                    context_callback: None,
841                    policy: Some(effective_policy),
842                    approval_policy: Some(node.approval_policy.clone()),
843                    daemon: false,
844                    daemon_config: Default::default(),
845                    llm_retries: 2,
846                    llm_backoff_ms: 2000,
847                    exit_when_verified: node.exit_when_verified,
848                    loop_detect_warn: 2,
849                    loop_detect_block: 3,
850                    loop_detect_skip: 4,
851                    tool_examples: node.model_policy.tool_examples.clone(),
852                    // Extract post_turn_callback from raw dict since serde(skip) drops it
853                    post_turn_callback: node
854                        .raw_model_policy
855                        .as_ref()
856                        .and_then(|v| v.as_dict())
857                        .and_then(|d| d.get("post_turn_callback"))
858                        .cloned(),
859                    turn_policy: node.model_policy.turn_policy.clone(),
860                    stop_after_successful_tools: node
861                        .model_policy
862                        .stop_after_successful_tools
863                        .clone(),
864                    require_successful_tools: node.model_policy.require_successful_tools.clone(),
865                    on_tool_call: node
866                        .raw_model_policy
867                        .as_ref()
868                        .and_then(|v| v.as_dict())
869                        .and_then(|d| d.get("on_tool_call"))
870                        .cloned(),
871                    on_tool_result: node
872                        .raw_model_policy
873                        .as_ref()
874                        .and_then(|v| v.as_dict())
875                        .and_then(|d| d.get("on_tool_result"))
876                        .cloned(),
877                },
878            )
879            .await?
880        } else {
881            let result = vm_call_llm_full(&opts).await?;
882            crate::llm::agent_loop_result_from_llm(&result, opts)
883        }
884    };
885    if let Some(payload) = llm_result.as_object_mut() {
886        payload.insert("prompt".to_string(), serde_json::json!(prompt));
887        payload.insert(
888            "system_prompt".to_string(),
889            serde_json::json!(node.system.clone().unwrap_or_default()),
890        );
891        payload.insert(
892            "rendered_context".to_string(),
893            serde_json::json!(rendered_context),
894        );
895        payload.insert(
896            "selected_artifact_ids".to_string(),
897            serde_json::json!(selected
898                .iter()
899                .map(|artifact| artifact.id.clone())
900                .collect::<Vec<_>>()),
901        );
902        payload.insert(
903            "selected_artifact_titles".to_string(),
904            serde_json::json!(selected
905                .iter()
906                .map(|artifact| artifact.title.clone())
907                .collect::<Vec<_>>()),
908        );
909        payload.insert(
910            "tool_calling_mode".to_string(),
911            serde_json::json!(tool_format.clone()),
912        );
913    }
914
915    let visible_text = llm_result["text"].as_str().unwrap_or_default().to_string();
916    // Non-LLM stages (verify command, condition, fork, join, ...) don't
917    // produce a "transcript" field. Fall back to the stage's input transcript
918    // so the running conversation survives cross-stage transitions.
919    let result_transcript = llm_result
920        .get("transcript")
921        .cloned()
922        .map(|value| crate::stdlib::json_to_vm_value(&value));
923    let transcript = apply_output_transcript_policy(
924        result_transcript.or(input_transcript),
925        &node.transcript_policy,
926    );
927    let output_kind = node
928        .output_contract
929        .output_kinds
930        .first()
931        .cloned()
932        .unwrap_or_else(|| {
933            if node.kind == "verify" {
934                "verification_result".to_string()
935            } else {
936                "artifact".to_string()
937            }
938        });
939    let mut metadata = BTreeMap::new();
940    metadata.insert(
941        "input_artifact_ids".to_string(),
942        serde_json::json!(selected
943            .iter()
944            .map(|artifact| artifact.id.clone())
945            .collect::<Vec<_>>()),
946    );
947    metadata.insert("node_kind".to_string(), serde_json::json!(node.kind));
948    let artifact = ArtifactRecord {
949        type_name: "artifact".to_string(),
950        id: new_id("artifact"),
951        kind: output_kind,
952        title: Some(format!("stage {node_id} output")),
953        text: Some(visible_text),
954        data: Some(llm_result.clone()),
955        source: Some(node_id.to_string()),
956        created_at: now_rfc3339(),
957        freshness: Some("fresh".to_string()),
958        priority: None,
959        lineage: selected
960            .iter()
961            .map(|artifact| artifact.id.clone())
962            .collect(),
963        relevance: Some(1.0),
964        estimated_tokens: None,
965        stage: Some(node_id.to_string()),
966        metadata,
967    }
968    .normalize();
969
970    Ok((llm_result, vec![artifact], transcript))
971}
972
973pub fn next_nodes_for(
974    graph: &WorkflowGraph,
975    current: &str,
976    branch: Option<&str>,
977) -> Vec<WorkflowEdge> {
978    let mut matching: Vec<WorkflowEdge> = graph
979        .edges
980        .iter()
981        .filter(|edge| edge.from == current && edge.branch.as_deref() == branch)
982        .cloned()
983        .collect();
984    if matching.is_empty() {
985        matching = graph
986            .edges
987            .iter()
988            .filter(|edge| edge.from == current && edge.branch.is_none())
989            .cloned()
990            .collect();
991    }
992    matching
993}
994
995pub fn next_node_for(graph: &WorkflowGraph, current: &str, branch: &str) -> Option<String> {
996    next_nodes_for(graph, current, Some(branch))
997        .into_iter()
998        .next()
999        .map(|edge| edge.to)
1000}
1001
1002pub fn append_audit_entry(
1003    graph: &mut WorkflowGraph,
1004    op: &str,
1005    node_id: Option<String>,
1006    reason: Option<String>,
1007    metadata: BTreeMap<String, serde_json::Value>,
1008) {
1009    graph.audit_log.push(WorkflowAuditEntry {
1010        id: new_id("audit"),
1011        op: op.to_string(),
1012        node_id,
1013        timestamp: now_rfc3339(),
1014        reason,
1015        metadata,
1016    });
1017}