Skip to main content

harn_vm/orchestration/
workflow.rs

1//! Workflow graph types, normalization, validation, and execution.
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::rc::Rc;
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9    apply_input_transcript_policy, apply_output_transcript_policy, new_id, now_rfc3339,
10    ArtifactRecord, BranchSemantics, CapabilityPolicy, ContextPolicy, EscalationPolicy, JoinPolicy,
11    MapPolicy, ModelPolicy, ReducePolicy, RetryPolicy, StageContract, ToolRuntimePolicyMetadata,
12    TranscriptPolicy,
13};
14use crate::llm::{extract_llm_options, vm_call_llm_full, vm_value_to_json};
15use crate::value::{VmError, VmValue};
16
17#[derive(Clone, Debug, Default, Serialize, Deserialize)]
18#[serde(default)]
19pub struct WorkflowNode {
20    pub id: Option<String>,
21    pub kind: String,
22    pub mode: Option<String>,
23    pub prompt: Option<String>,
24    pub system: Option<String>,
25    pub task_label: Option<String>,
26    pub done_sentinel: Option<String>,
27    pub tools: serde_json::Value,
28    pub model_policy: ModelPolicy,
29    pub transcript_policy: TranscriptPolicy,
30    pub context_policy: ContextPolicy,
31    pub retry_policy: RetryPolicy,
32    pub capability_policy: CapabilityPolicy,
33    pub approval_policy: super::ToolApprovalPolicy,
34    pub input_contract: StageContract,
35    pub output_contract: StageContract,
36    pub branch_semantics: BranchSemantics,
37    pub map_policy: MapPolicy,
38    pub join_policy: JoinPolicy,
39    pub reduce_policy: ReducePolicy,
40    pub escalation_policy: EscalationPolicy,
41    pub verify: Option<serde_json::Value>,
42    /// When true, the stage's agent loop gates the done sentinel on the most
43    /// recent `run()` tool call exiting cleanly (`exit_code == 0`). Use for
44    /// persistent execute stages that fold verification into the loop via a
45    /// shell-exec tool the model invokes explicitly.
46    #[serde(default)]
47    pub exit_when_verified: bool,
48    pub metadata: BTreeMap<String, serde_json::Value>,
49    #[serde(skip)]
50    pub raw_tools: Option<VmValue>,
51    /// Raw transcript_policy VmValue dict — preserved for extracting closure
52    /// fields (compress_callback, mask_callback) that can't go through serde.
53    #[serde(skip)]
54    pub raw_transcript_policy: Option<VmValue>,
55    /// Raw model_policy VmValue dict — preserved for extracting closure
56    /// fields (post_turn_callback) that can't go through serde.
57    #[serde(skip)]
58    pub raw_model_policy: Option<VmValue>,
59}
60
61impl PartialEq for WorkflowNode {
62    fn eq(&self, other: &Self) -> bool {
63        serde_json::to_value(self).ok() == serde_json::to_value(other).ok()
64    }
65}
66
67pub fn workflow_tool_names(value: &serde_json::Value) -> Vec<String> {
68    match value {
69        serde_json::Value::Null => Vec::new(),
70        serde_json::Value::Array(items) => items
71            .iter()
72            .filter_map(|item| match item {
73                serde_json::Value::Object(map) => map
74                    .get("name")
75                    .and_then(|value| value.as_str())
76                    .filter(|name| !name.is_empty())
77                    .map(|name| name.to_string()),
78                _ => None,
79            })
80            .collect(),
81        serde_json::Value::Object(map) => {
82            if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
83                return map
84                    .get("tools")
85                    .map(workflow_tool_names)
86                    .unwrap_or_default();
87            }
88            map.get("name")
89                .and_then(|value| value.as_str())
90                .filter(|name| !name.is_empty())
91                .map(|name| vec![name.to_string()])
92                .unwrap_or_default()
93        }
94        _ => Vec::new(),
95    }
96}
97
98fn max_side_effect_level(levels: impl Iterator<Item = String>) -> Option<String> {
99    fn rank(v: &str) -> usize {
100        match v {
101            "none" => 0,
102            "read_only" => 1,
103            "workspace_write" => 2,
104            "process_exec" => 3,
105            "network" => 4,
106            _ => 5,
107        }
108    }
109    levels.max_by_key(|level| rank(level))
110}
111
112fn parse_tool_runtime_policy(
113    map: &serde_json::Map<String, serde_json::Value>,
114) -> ToolRuntimePolicyMetadata {
115    let Some(policy) = map.get("policy").and_then(|value| value.as_object()) else {
116        return ToolRuntimePolicyMetadata::default();
117    };
118
119    let capabilities = policy
120        .get("capabilities")
121        .and_then(|value| value.as_object())
122        .map(|caps| {
123            caps.iter()
124                .map(|(capability, ops)| {
125                    let values = ops
126                        .as_array()
127                        .map(|items| {
128                            items
129                                .iter()
130                                .filter_map(|item| item.as_str().map(|s| s.to_string()))
131                                .collect::<Vec<_>>()
132                        })
133                        .unwrap_or_default();
134                    (capability.clone(), values)
135                })
136                .collect::<BTreeMap<_, _>>()
137        })
138        .unwrap_or_default();
139
140    let path_params = policy
141        .get("path_params")
142        .and_then(|value| value.as_array())
143        .map(|items| {
144            items
145                .iter()
146                .filter_map(|item| item.as_str().map(|s| s.to_string()))
147                .collect::<Vec<_>>()
148        })
149        .unwrap_or_default();
150
151    ToolRuntimePolicyMetadata {
152        capabilities,
153        side_effect_level: policy
154            .get("side_effect_level")
155            .and_then(|value| value.as_str())
156            .map(|s| s.to_string()),
157        path_params,
158        mutation_classification: policy
159            .get("mutation_classification")
160            .and_then(|value| value.as_str())
161            .map(|s| s.to_string()),
162    }
163}
164
165pub fn workflow_tool_metadata(
166    value: &serde_json::Value,
167) -> BTreeMap<String, ToolRuntimePolicyMetadata> {
168    match value {
169        serde_json::Value::Null => BTreeMap::new(),
170        serde_json::Value::Array(items) => items
171            .iter()
172            .filter_map(|item| match item {
173                serde_json::Value::Object(map) => map
174                    .get("name")
175                    .and_then(|value| value.as_str())
176                    .filter(|name| !name.is_empty())
177                    .map(|name| (name.to_string(), parse_tool_runtime_policy(map))),
178                _ => None,
179            })
180            .collect(),
181        serde_json::Value::Object(map) => {
182            if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
183                return map
184                    .get("tools")
185                    .map(workflow_tool_metadata)
186                    .unwrap_or_default();
187            }
188            map.get("name")
189                .and_then(|value| value.as_str())
190                .filter(|name| !name.is_empty())
191                .map(|name| {
192                    let mut metadata = BTreeMap::new();
193                    metadata.insert(name.to_string(), parse_tool_runtime_policy(map));
194                    metadata
195                })
196                .unwrap_or_default()
197        }
198        _ => BTreeMap::new(),
199    }
200}
201
202pub fn workflow_tool_policy_from_tools(value: &serde_json::Value) -> CapabilityPolicy {
203    let tools = workflow_tool_names(value);
204    let tool_metadata = workflow_tool_metadata(value);
205    let mut capabilities: BTreeMap<String, Vec<String>> = BTreeMap::new();
206    for metadata in tool_metadata.values() {
207        for (capability, ops) in &metadata.capabilities {
208            let entry = capabilities.entry(capability.clone()).or_default();
209            for op in ops {
210                if !entry.contains(op) {
211                    entry.push(op.clone());
212                }
213            }
214            entry.sort();
215        }
216    }
217    let side_effect_level = max_side_effect_level(
218        tool_metadata
219            .values()
220            .filter_map(|metadata| metadata.side_effect_level.clone()),
221    );
222    CapabilityPolicy {
223        tools,
224        capabilities,
225        workspace_roots: Vec::new(),
226        side_effect_level,
227        recursion_limit: None,
228        tool_arg_constraints: Vec::new(),
229        tool_metadata,
230    }
231}
232
233#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
234#[serde(default)]
235pub struct WorkflowEdge {
236    pub from: String,
237    pub to: String,
238    pub branch: Option<String>,
239    pub label: Option<String>,
240}
241
242#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
243#[serde(default)]
244pub struct WorkflowGraph {
245    #[serde(rename = "_type")]
246    pub type_name: String,
247    pub id: String,
248    pub name: Option<String>,
249    pub version: usize,
250    pub entry: String,
251    pub nodes: BTreeMap<String, WorkflowNode>,
252    pub edges: Vec<WorkflowEdge>,
253    pub capability_policy: CapabilityPolicy,
254    pub approval_policy: super::ToolApprovalPolicy,
255    pub metadata: BTreeMap<String, serde_json::Value>,
256    pub audit_log: Vec<WorkflowAuditEntry>,
257}
258
259#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
260#[serde(default)]
261pub struct WorkflowAuditEntry {
262    pub id: String,
263    pub op: String,
264    pub node_id: Option<String>,
265    pub timestamp: String,
266    pub reason: Option<String>,
267    pub metadata: BTreeMap<String, serde_json::Value>,
268}
269
270#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
271#[serde(default)]
272pub struct WorkflowValidationReport {
273    pub valid: bool,
274    pub errors: Vec<String>,
275    pub warnings: Vec<String>,
276    pub reachable_nodes: Vec<String>,
277}
278
279pub fn parse_workflow_node_value(value: &VmValue, label: &str) -> Result<WorkflowNode, VmError> {
280    let mut node: WorkflowNode = super::parse_json_payload(vm_value_to_json(value), label)?;
281    let dict = value.as_dict();
282    node.raw_tools = dict.and_then(|d| d.get("tools")).cloned();
283    node.raw_transcript_policy = dict.and_then(|d| d.get("transcript_policy")).cloned();
284    node.raw_model_policy = dict.and_then(|d| d.get("model_policy")).cloned();
285    Ok(node)
286}
287
288pub fn parse_workflow_node_json(
289    json: serde_json::Value,
290    label: &str,
291) -> Result<WorkflowNode, VmError> {
292    super::parse_json_payload(json, label)
293}
294
295pub fn parse_workflow_edge_json(
296    json: serde_json::Value,
297    label: &str,
298) -> Result<WorkflowEdge, VmError> {
299    super::parse_json_payload(json, label)
300}
301
302pub fn normalize_workflow_value(value: &VmValue) -> Result<WorkflowGraph, VmError> {
303    let mut graph: WorkflowGraph = super::parse_json_value(value)?;
304    let as_dict = value.as_dict().cloned().unwrap_or_default();
305
306    if graph.nodes.is_empty() {
307        for key in ["act", "verify", "repair"] {
308            if let Some(node_value) = as_dict.get(key) {
309                let mut node = parse_workflow_node_value(node_value, "orchestration")?;
310                let raw_node = node_value.as_dict().cloned().unwrap_or_default();
311                node.id = Some(key.to_string());
312                if node.kind.is_empty() {
313                    node.kind = if key == "verify" {
314                        "verify".to_string()
315                    } else {
316                        "stage".to_string()
317                    };
318                }
319                if node.model_policy.provider.is_none() {
320                    node.model_policy.provider = as_dict
321                        .get("provider")
322                        .map(|value| value.display())
323                        .filter(|value| !value.is_empty());
324                }
325                if node.model_policy.model.is_none() {
326                    node.model_policy.model = as_dict
327                        .get("model")
328                        .map(|value| value.display())
329                        .filter(|value| !value.is_empty());
330                }
331                if node.model_policy.model_tier.is_none() {
332                    node.model_policy.model_tier = as_dict
333                        .get("model_tier")
334                        .or_else(|| as_dict.get("tier"))
335                        .map(|value| value.display())
336                        .filter(|value| !value.is_empty());
337                }
338                if node.model_policy.temperature.is_none() {
339                    node.model_policy.temperature = as_dict.get("temperature").and_then(|value| {
340                        if let VmValue::Float(number) = value {
341                            Some(*number)
342                        } else {
343                            value.as_int().map(|number| number as f64)
344                        }
345                    });
346                }
347                if node.model_policy.max_tokens.is_none() {
348                    node.model_policy.max_tokens =
349                        as_dict.get("max_tokens").and_then(|value| value.as_int());
350                }
351                if node.mode.is_none() {
352                    node.mode = as_dict
353                        .get("mode")
354                        .map(|value| value.display())
355                        .filter(|value| !value.is_empty());
356                }
357                if node.done_sentinel.is_none() {
358                    node.done_sentinel = as_dict
359                        .get("done_sentinel")
360                        .map(|value| value.display())
361                        .filter(|value| !value.is_empty());
362                }
363                if key == "verify"
364                    && node.verify.is_none()
365                    && (raw_node.contains_key("assert_text")
366                        || raw_node.contains_key("command")
367                        || raw_node.contains_key("expect_status")
368                        || raw_node.contains_key("expect_text"))
369                {
370                    node.verify = Some(serde_json::json!({
371                        "assert_text": raw_node.get("assert_text").map(vm_value_to_json),
372                        "command": raw_node.get("command").map(vm_value_to_json),
373                        "expect_status": raw_node.get("expect_status").map(vm_value_to_json),
374                        "expect_text": raw_node.get("expect_text").map(vm_value_to_json),
375                    }));
376                }
377                graph.nodes.insert(key.to_string(), node);
378            }
379        }
380        if graph.entry.is_empty() && graph.nodes.contains_key("act") {
381            graph.entry = "act".to_string();
382        }
383        if graph.edges.is_empty() && graph.nodes.contains_key("act") {
384            if graph.nodes.contains_key("verify") {
385                graph.edges.push(WorkflowEdge {
386                    from: "act".to_string(),
387                    to: "verify".to_string(),
388                    branch: None,
389                    label: None,
390                });
391            }
392            if graph.nodes.contains_key("repair") {
393                graph.edges.push(WorkflowEdge {
394                    from: "verify".to_string(),
395                    to: "repair".to_string(),
396                    branch: Some("failed".to_string()),
397                    label: None,
398                });
399                graph.edges.push(WorkflowEdge {
400                    from: "repair".to_string(),
401                    to: "verify".to_string(),
402                    branch: Some("retry".to_string()),
403                    label: None,
404                });
405            }
406        }
407    }
408
409    if graph.type_name.is_empty() {
410        graph.type_name = "workflow_graph".to_string();
411    }
412    if graph.id.is_empty() {
413        graph.id = new_id("workflow");
414    }
415    if graph.version == 0 {
416        graph.version = 1;
417    }
418    if graph.entry.is_empty() {
419        graph.entry = graph
420            .nodes
421            .keys()
422            .next()
423            .cloned()
424            .unwrap_or_else(|| "act".to_string());
425    }
426    for (node_id, node) in &mut graph.nodes {
427        if node.raw_tools.is_none() {
428            node.raw_tools = as_dict
429                .get("nodes")
430                .and_then(|nodes| nodes.as_dict())
431                .and_then(|nodes| nodes.get(node_id))
432                .and_then(|node_value| node_value.as_dict())
433                .and_then(|raw_node| raw_node.get("tools"))
434                .cloned();
435        }
436        if node.id.is_none() {
437            node.id = Some(node_id.clone());
438        }
439        if node.kind.is_empty() {
440            node.kind = "stage".to_string();
441        }
442        if node.join_policy.strategy.is_empty() {
443            node.join_policy.strategy = "all".to_string();
444        }
445        if node.reduce_policy.strategy.is_empty() {
446            node.reduce_policy.strategy = "concat".to_string();
447        }
448        if node.output_contract.output_kinds.is_empty() {
449            node.output_contract.output_kinds = vec![match node.kind.as_str() {
450                "verify" => "verification_result".to_string(),
451                "reduce" => node
452                    .reduce_policy
453                    .output_kind
454                    .clone()
455                    .unwrap_or_else(|| "summary".to_string()),
456                "map" => node
457                    .map_policy
458                    .output_kind
459                    .clone()
460                    .unwrap_or_else(|| "artifact".to_string()),
461                "escalation" => "plan".to_string(),
462                _ => "artifact".to_string(),
463            }];
464        }
465        if node.retry_policy.max_attempts == 0 {
466            node.retry_policy.max_attempts = 1;
467        }
468    }
469    Ok(graph)
470}
471
472pub fn validate_workflow(
473    graph: &WorkflowGraph,
474    ceiling: Option<&CapabilityPolicy>,
475) -> WorkflowValidationReport {
476    let mut errors = Vec::new();
477    let mut warnings = Vec::new();
478
479    if !graph.nodes.contains_key(&graph.entry) {
480        errors.push(format!("entry node does not exist: {}", graph.entry));
481    }
482
483    let node_ids: BTreeSet<String> = graph.nodes.keys().cloned().collect();
484    for edge in &graph.edges {
485        if !node_ids.contains(&edge.from) {
486            errors.push(format!("edge.from references unknown node: {}", edge.from));
487        }
488        if !node_ids.contains(&edge.to) {
489            errors.push(format!("edge.to references unknown node: {}", edge.to));
490        }
491    }
492
493    let reachable_nodes = reachable_nodes(graph);
494    for node_id in &node_ids {
495        if !reachable_nodes.contains(node_id) {
496            warnings.push(format!("node is unreachable: {node_id}"));
497        }
498    }
499
500    for (node_id, node) in &graph.nodes {
501        let incoming = graph
502            .edges
503            .iter()
504            .filter(|edge| edge.to == *node_id)
505            .count();
506        let outgoing: Vec<&WorkflowEdge> = graph
507            .edges
508            .iter()
509            .filter(|edge| edge.from == *node_id)
510            .collect();
511        if let Some(min_inputs) = node.input_contract.min_inputs {
512            if let Some(max_inputs) = node.input_contract.max_inputs {
513                if min_inputs > max_inputs {
514                    errors.push(format!(
515                        "node {node_id}: input contract min_inputs exceeds max_inputs"
516                    ));
517                }
518            }
519        }
520        match node.kind.as_str() {
521            "condition" => {
522                let has_true = outgoing
523                    .iter()
524                    .any(|edge| edge.branch.as_deref() == Some("true"));
525                let has_false = outgoing
526                    .iter()
527                    .any(|edge| edge.branch.as_deref() == Some("false"));
528                if !has_true || !has_false {
529                    errors.push(format!(
530                        "node {node_id}: condition nodes require both 'true' and 'false' branch edges"
531                    ));
532                }
533            }
534            "fork" => {
535                if outgoing.len() < 2 {
536                    errors.push(format!(
537                        "node {node_id}: fork nodes require at least two outgoing edges"
538                    ));
539                }
540            }
541            "join" => {
542                if incoming < 2 {
543                    warnings.push(format!(
544                        "node {node_id}: join node has fewer than two incoming edges"
545                    ));
546                }
547            }
548            "map" => {
549                if node.map_policy.items.is_empty()
550                    && node.map_policy.item_artifact_kind.is_none()
551                    && node.input_contract.input_kinds.is_empty()
552                {
553                    errors.push(format!(
554                        "node {node_id}: map nodes require items, item_artifact_kind, or input_contract.input_kinds"
555                    ));
556                }
557            }
558            "reduce" => {
559                if node.input_contract.input_kinds.is_empty() {
560                    warnings.push(format!(
561                        "node {node_id}: reduce node has no input_contract.input_kinds; it will consume all available artifacts"
562                    ));
563                }
564            }
565            _ => {}
566        }
567    }
568
569    if let Some(ceiling) = ceiling {
570        if let Err(error) = ceiling.intersect(&graph.capability_policy) {
571            errors.push(error);
572        }
573        for (node_id, node) in &graph.nodes {
574            if let Err(error) = ceiling.intersect(&node.capability_policy) {
575                errors.push(format!("node {node_id}: {error}"));
576            }
577        }
578    }
579
580    WorkflowValidationReport {
581        valid: errors.is_empty(),
582        errors,
583        warnings,
584        reachable_nodes: reachable_nodes.into_iter().collect(),
585    }
586}
587
588fn reachable_nodes(graph: &WorkflowGraph) -> BTreeSet<String> {
589    let mut seen = BTreeSet::new();
590    let mut stack = vec![graph.entry.clone()];
591    while let Some(node_id) = stack.pop() {
592        if !seen.insert(node_id.clone()) {
593            continue;
594        }
595        for edge in graph.edges.iter().filter(|edge| edge.from == node_id) {
596            stack.push(edge.to.clone());
597        }
598    }
599    seen
600}
601
602pub async fn execute_stage_node(
603    node_id: &str,
604    node: &WorkflowNode,
605    task: &str,
606    artifacts: &[ArtifactRecord],
607    transcript: Option<VmValue>,
608) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
609    let mut selection_policy = node.context_policy.clone();
610    if selection_policy.include_kinds.is_empty() && !node.input_contract.input_kinds.is_empty() {
611        selection_policy.include_kinds = node.input_contract.input_kinds.clone();
612    }
613    let selected = super::select_artifacts_adaptive(artifacts.to_vec(), &selection_policy);
614    let rendered_context = super::render_artifacts_context(&selected, &node.context_policy);
615    let input_transcript = apply_input_transcript_policy(transcript, &node.transcript_policy);
616    if node.input_contract.require_transcript && input_transcript.is_none() {
617        return Err(VmError::Runtime(format!(
618            "workflow stage {node_id} requires transcript input"
619        )));
620    }
621    if let Some(min_inputs) = node.input_contract.min_inputs {
622        if selected.len() < min_inputs {
623            return Err(VmError::Runtime(format!(
624                "workflow stage {node_id} requires at least {min_inputs} input artifacts"
625            )));
626        }
627    }
628    if let Some(max_inputs) = node.input_contract.max_inputs {
629        if selected.len() > max_inputs {
630            return Err(VmError::Runtime(format!(
631                "workflow stage {node_id} accepts at most {max_inputs} input artifacts"
632            )));
633        }
634    }
635    let prompt = if rendered_context.is_empty() {
636        task.to_string()
637    } else {
638        format!(
639            "{rendered_context}\n\n{}:\n{task}",
640            node.task_label
641                .clone()
642                .unwrap_or_else(|| "Task".to_string())
643        )
644    };
645
646    let tool_format = std::env::var("HARN_AGENT_TOOL_FORMAT")
647        .ok()
648        .filter(|value| !value.trim().is_empty())
649        .unwrap_or_else(|| {
650            // Auto-detect from model/provider alias config.
651            let model = std::env::var("HARN_LLM_MODEL").unwrap_or_default();
652            let provider = std::env::var("HARN_LLM_PROVIDER").unwrap_or_default();
653            crate::llm_config::default_tool_format(&model, &provider)
654        });
655    let mut llm_result = if node.kind == "verify" {
656        if let Some(command) = node
657            .verify
658            .as_ref()
659            .and_then(|verify| verify.as_object())
660            .and_then(|verify| verify.get("command"))
661            .and_then(|value| value.as_str())
662            .map(str::trim)
663            .filter(|value| !value.is_empty())
664        {
665            let mut process = if cfg!(target_os = "windows") {
666                let mut cmd = tokio::process::Command::new("cmd");
667                cmd.arg("/C").arg(command);
668                cmd
669            } else {
670                let mut cmd = tokio::process::Command::new("/bin/sh");
671                cmd.arg("-lc").arg(command);
672                cmd
673            };
674            process.stdin(std::process::Stdio::null());
675            if let Some(context) = crate::stdlib::process::current_execution_context() {
676                if let Some(cwd) = context.cwd.filter(|cwd| !cwd.is_empty()) {
677                    process.current_dir(cwd);
678                }
679                if !context.env.is_empty() {
680                    process.envs(context.env);
681                }
682            }
683            let output = process
684                .output()
685                .await
686                .map_err(|e| VmError::Runtime(format!("workflow verify exec failed: {e}")))?;
687            let stdout = String::from_utf8_lossy(&output.stdout).to_string();
688            let stderr = String::from_utf8_lossy(&output.stderr).to_string();
689            let combined = if stderr.is_empty() {
690                stdout.clone()
691            } else if stdout.is_empty() {
692                stderr.clone()
693            } else {
694                format!("{stdout}\n{stderr}")
695            };
696            serde_json::json!({
697                "status": "completed",
698                "text": combined,
699                "visible_text": combined,
700                "command": command,
701                "stdout": stdout,
702                "stderr": stderr,
703                "exit_status": output.status.code().unwrap_or(-1),
704                "success": output.status.success(),
705            })
706        } else {
707            serde_json::json!({
708                "status": "completed",
709                "text": "",
710                "visible_text": "",
711            })
712        }
713    } else {
714        let mut options = BTreeMap::new();
715        if let Some(provider) = &node.model_policy.provider {
716            options.insert(
717                "provider".to_string(),
718                VmValue::String(Rc::from(provider.clone())),
719            );
720        }
721        if let Some(model) = &node.model_policy.model {
722            options.insert(
723                "model".to_string(),
724                VmValue::String(Rc::from(model.clone())),
725            );
726        }
727        if let Some(model_tier) = &node.model_policy.model_tier {
728            options.insert(
729                "model_tier".to_string(),
730                VmValue::String(Rc::from(model_tier.clone())),
731            );
732        }
733        if let Some(temperature) = node.model_policy.temperature {
734            options.insert("temperature".to_string(), VmValue::Float(temperature));
735        }
736        if let Some(max_tokens) = node.model_policy.max_tokens {
737            options.insert("max_tokens".to_string(), VmValue::Int(max_tokens));
738        }
739        let tool_names = workflow_tool_names(&node.tools);
740        let tools_value = node.raw_tools.clone().or_else(|| {
741            if matches!(node.tools, serde_json::Value::Null) {
742                None
743            } else {
744                Some(crate::stdlib::json_to_vm_value(&node.tools))
745            }
746        });
747        if tools_value.is_some() && !tool_names.is_empty() {
748            options.insert("tools".to_string(), tools_value.unwrap_or(VmValue::Nil));
749        }
750        if let Some(transcript) = input_transcript.clone() {
751            options.insert("transcript".to_string(), transcript);
752        }
753
754        let args = vec![
755            VmValue::String(Rc::from(prompt.clone())),
756            node.system
757                .clone()
758                .map(|s| VmValue::String(Rc::from(s)))
759                .unwrap_or(VmValue::Nil),
760            VmValue::Dict(Rc::new(options)),
761        ];
762        let mut opts = extract_llm_options(&args)?;
763
764        if node.mode.as_deref() == Some("agent") || !tool_names.is_empty() {
765            let tool_policy = workflow_tool_policy_from_tools(&node.tools);
766            let effective_policy = tool_policy
767                .intersect(&node.capability_policy)
768                .map_err(VmError::Runtime)?;
769            // Build auto-compact config from transcript_policy fields
770            let auto_compact = if node.transcript_policy.auto_compact {
771                let mut ac = crate::orchestration::AutoCompactConfig::default();
772                if let Some(v) = node.transcript_policy.compact_threshold {
773                    ac.token_threshold = v;
774                }
775                if let Some(v) = node.transcript_policy.tool_output_max_chars {
776                    ac.tool_output_max_chars = v;
777                }
778                if let Some(ref strategy) = node.transcript_policy.compact_strategy {
779                    if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
780                        ac.compact_strategy = s;
781                    }
782                }
783                if let Some(v) = node.transcript_policy.hard_limit_tokens {
784                    ac.hard_limit_tokens = Some(v);
785                }
786                if let Some(ref strategy) = node.transcript_policy.hard_limit_strategy {
787                    if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
788                        ac.hard_limit_strategy = s;
789                    }
790                }
791                // Extract closure fields from raw transcript_policy dict
792                if let Some(ref raw_tp) = node.raw_transcript_policy {
793                    if let Some(dict) = raw_tp.as_dict() {
794                        if let Some(cb) = dict.get("compress_callback") {
795                            ac.compress_callback = Some(cb.clone());
796                        }
797                        if let Some(cb) = dict.get("mask_callback") {
798                            ac.mask_callback = Some(cb.clone());
799                        }
800                    }
801                }
802                // Adapt thresholds to provider context window
803                {
804                    let user_specified_threshold =
805                        node.transcript_policy.compact_threshold.is_some();
806                    let user_specified_hard_limit =
807                        node.transcript_policy.hard_limit_tokens.is_some();
808                    crate::llm::api::adapt_auto_compact_to_provider(
809                        &mut ac,
810                        user_specified_threshold,
811                        user_specified_hard_limit,
812                        &opts.provider,
813                        &opts.model,
814                        &opts.api_key,
815                    )
816                    .await;
817                }
818                Some(ac)
819            } else {
820                None
821            };
822            crate::llm::run_agent_loop_internal(
823                &mut opts,
824                crate::llm::AgentLoopConfig {
825                    persistent: true,
826                    max_iterations: node.model_policy.max_iterations.unwrap_or(16),
827                    max_nudges: node.model_policy.max_nudges.unwrap_or(3),
828                    nudge: node.model_policy.nudge.clone(),
829                    done_sentinel: node.done_sentinel.clone(),
830                    break_unless_phase: None,
831                    tool_retries: 0,
832                    tool_backoff_ms: 1000,
833                    tool_format: tool_format.clone(),
834                    auto_compact,
835                    context_callback: None,
836                    policy: Some(effective_policy),
837                    approval_policy: Some(node.approval_policy.clone()),
838                    daemon: false,
839                    daemon_config: Default::default(),
840                    llm_retries: 2,
841                    llm_backoff_ms: 2000,
842                    exit_when_verified: node.exit_when_verified,
843                    loop_detect_warn: 2,
844                    loop_detect_block: 3,
845                    loop_detect_skip: 4,
846                    tool_examples: node.model_policy.tool_examples.clone(),
847                    // Extract post_turn_callback from raw dict since serde(skip) drops it
848                    post_turn_callback: node
849                        .raw_model_policy
850                        .as_ref()
851                        .and_then(|v| v.as_dict())
852                        .and_then(|d| d.get("post_turn_callback"))
853                        .cloned(),
854                    turn_policy: node.model_policy.turn_policy.clone(),
855                    stop_after_successful_tools: node
856                        .model_policy
857                        .stop_after_successful_tools
858                        .clone(),
859                    require_successful_tools: node.model_policy.require_successful_tools.clone(),
860                    on_tool_call: node
861                        .raw_model_policy
862                        .as_ref()
863                        .and_then(|v| v.as_dict())
864                        .and_then(|d| d.get("on_tool_call"))
865                        .cloned(),
866                    on_tool_result: node
867                        .raw_model_policy
868                        .as_ref()
869                        .and_then(|v| v.as_dict())
870                        .and_then(|d| d.get("on_tool_result"))
871                        .cloned(),
872                },
873            )
874            .await?
875        } else {
876            let result = vm_call_llm_full(&opts).await?;
877            crate::llm::agent_loop_result_from_llm(&result, opts)
878        }
879    };
880    if let Some(payload) = llm_result.as_object_mut() {
881        payload.insert("prompt".to_string(), serde_json::json!(prompt));
882        payload.insert(
883            "system_prompt".to_string(),
884            serde_json::json!(node.system.clone().unwrap_or_default()),
885        );
886        payload.insert(
887            "rendered_context".to_string(),
888            serde_json::json!(rendered_context),
889        );
890        payload.insert(
891            "selected_artifact_ids".to_string(),
892            serde_json::json!(selected
893                .iter()
894                .map(|artifact| artifact.id.clone())
895                .collect::<Vec<_>>()),
896        );
897        payload.insert(
898            "selected_artifact_titles".to_string(),
899            serde_json::json!(selected
900                .iter()
901                .map(|artifact| artifact.title.clone())
902                .collect::<Vec<_>>()),
903        );
904        payload.insert(
905            "tool_calling_mode".to_string(),
906            serde_json::json!(tool_format.clone()),
907        );
908    }
909
910    let visible_text = llm_result["text"].as_str().unwrap_or_default().to_string();
911    // Non-LLM stages (verify command, condition, fork, join, ...) don't
912    // produce a "transcript" field. Fall back to the stage's input transcript
913    // so the running conversation survives cross-stage transitions.
914    let result_transcript = llm_result
915        .get("transcript")
916        .cloned()
917        .map(|value| crate::stdlib::json_to_vm_value(&value));
918    let transcript = apply_output_transcript_policy(
919        result_transcript.or(input_transcript),
920        &node.transcript_policy,
921    );
922    let output_kind = node
923        .output_contract
924        .output_kinds
925        .first()
926        .cloned()
927        .unwrap_or_else(|| {
928            if node.kind == "verify" {
929                "verification_result".to_string()
930            } else {
931                "artifact".to_string()
932            }
933        });
934    let mut metadata = BTreeMap::new();
935    metadata.insert(
936        "input_artifact_ids".to_string(),
937        serde_json::json!(selected
938            .iter()
939            .map(|artifact| artifact.id.clone())
940            .collect::<Vec<_>>()),
941    );
942    metadata.insert("node_kind".to_string(), serde_json::json!(node.kind));
943    let artifact = ArtifactRecord {
944        type_name: "artifact".to_string(),
945        id: new_id("artifact"),
946        kind: output_kind,
947        title: Some(format!("stage {node_id} output")),
948        text: Some(visible_text),
949        data: Some(llm_result.clone()),
950        source: Some(node_id.to_string()),
951        created_at: now_rfc3339(),
952        freshness: Some("fresh".to_string()),
953        priority: None,
954        lineage: selected
955            .iter()
956            .map(|artifact| artifact.id.clone())
957            .collect(),
958        relevance: Some(1.0),
959        estimated_tokens: None,
960        stage: Some(node_id.to_string()),
961        metadata,
962    }
963    .normalize();
964
965    Ok((llm_result, vec![artifact], transcript))
966}
967
968pub fn next_nodes_for(
969    graph: &WorkflowGraph,
970    current: &str,
971    branch: Option<&str>,
972) -> Vec<WorkflowEdge> {
973    let mut matching: Vec<WorkflowEdge> = graph
974        .edges
975        .iter()
976        .filter(|edge| edge.from == current && edge.branch.as_deref() == branch)
977        .cloned()
978        .collect();
979    if matching.is_empty() {
980        matching = graph
981            .edges
982            .iter()
983            .filter(|edge| edge.from == current && edge.branch.is_none())
984            .cloned()
985            .collect();
986    }
987    matching
988}
989
990pub fn next_node_for(graph: &WorkflowGraph, current: &str, branch: &str) -> Option<String> {
991    next_nodes_for(graph, current, Some(branch))
992        .into_iter()
993        .next()
994        .map(|edge| edge.to)
995}
996
997pub fn append_audit_entry(
998    graph: &mut WorkflowGraph,
999    op: &str,
1000    node_id: Option<String>,
1001    reason: Option<String>,
1002    metadata: BTreeMap<String, serde_json::Value>,
1003) {
1004    graph.audit_log.push(WorkflowAuditEntry {
1005        id: new_id("audit"),
1006        op: op.to_string(),
1007        node_id,
1008        timestamp: now_rfc3339(),
1009        reason,
1010        metadata,
1011    });
1012}