Skip to main content

harn_vm/orchestration/
workflow.rs

1//! Workflow graph types, normalization, validation, and execution.
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::rc::Rc;
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9    apply_input_transcript_policy, apply_output_transcript_policy, new_id, now_rfc3339,
10    ArtifactRecord, BranchSemantics, CapabilityPolicy, ContextPolicy, EscalationPolicy, JoinPolicy,
11    MapPolicy, ModelPolicy, ReducePolicy, RetryPolicy, StageContract, TranscriptPolicy,
12};
13use crate::llm::{extract_llm_options, vm_call_llm_full, vm_value_to_json};
14use crate::tool_annotations::{SideEffectLevel, ToolAnnotations, ToolArgSchema, ToolKind};
15use crate::value::{VmError, VmValue};
16
17#[derive(Clone, Debug, Default, Serialize, Deserialize)]
18#[serde(default)]
19pub struct WorkflowNode {
20    pub id: Option<String>,
21    pub kind: String,
22    pub mode: Option<String>,
23    pub prompt: Option<String>,
24    pub system: Option<String>,
25    pub task_label: Option<String>,
26    pub done_sentinel: Option<String>,
27    pub tools: serde_json::Value,
28    pub model_policy: ModelPolicy,
29    pub transcript_policy: TranscriptPolicy,
30    pub context_policy: ContextPolicy,
31    pub retry_policy: RetryPolicy,
32    pub capability_policy: CapabilityPolicy,
33    pub approval_policy: super::ToolApprovalPolicy,
34    pub input_contract: StageContract,
35    pub output_contract: StageContract,
36    pub branch_semantics: BranchSemantics,
37    pub map_policy: MapPolicy,
38    pub join_policy: JoinPolicy,
39    pub reduce_policy: ReducePolicy,
40    pub escalation_policy: EscalationPolicy,
41    pub verify: Option<serde_json::Value>,
42    /// When true, the stage's agent loop gates the done sentinel on the most
43    /// recent `run()` tool call exiting cleanly (`exit_code == 0`). Use for
44    /// persistent execute stages that fold verification into the loop via a
45    /// shell-exec tool the model invokes explicitly.
46    #[serde(default)]
47    pub exit_when_verified: bool,
48    pub metadata: BTreeMap<String, serde_json::Value>,
49    #[serde(skip)]
50    pub raw_tools: Option<VmValue>,
51    /// Raw transcript_policy VmValue dict — preserved for extracting closure
52    /// fields (compress_callback, mask_callback) that can't go through serde.
53    #[serde(skip)]
54    pub raw_transcript_policy: Option<VmValue>,
55    /// Raw model_policy VmValue dict — preserved for extracting closure
56    /// fields (post_turn_callback) that can't go through serde.
57    #[serde(skip)]
58    pub raw_model_policy: Option<VmValue>,
59}
60
61impl PartialEq for WorkflowNode {
62    fn eq(&self, other: &Self) -> bool {
63        serde_json::to_value(self).ok() == serde_json::to_value(other).ok()
64    }
65}
66
67pub fn workflow_tool_names(value: &serde_json::Value) -> Vec<String> {
68    match value {
69        serde_json::Value::Null => Vec::new(),
70        serde_json::Value::Array(items) => items
71            .iter()
72            .filter_map(|item| match item {
73                serde_json::Value::Object(map) => map
74                    .get("name")
75                    .and_then(|value| value.as_str())
76                    .filter(|name| !name.is_empty())
77                    .map(|name| name.to_string()),
78                _ => None,
79            })
80            .collect(),
81        serde_json::Value::Object(map) => {
82            if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
83                return map
84                    .get("tools")
85                    .map(workflow_tool_names)
86                    .unwrap_or_default();
87            }
88            map.get("name")
89                .and_then(|value| value.as_str())
90                .filter(|name| !name.is_empty())
91                .map(|name| vec![name.to_string()])
92                .unwrap_or_default()
93        }
94        _ => Vec::new(),
95    }
96}
97
98fn max_side_effect_level(levels: impl Iterator<Item = String>) -> Option<String> {
99    fn rank(v: &str) -> usize {
100        match v {
101            "none" => 0,
102            "read_only" => 1,
103            "workspace_write" => 2,
104            "process_exec" => 3,
105            "network" => 4,
106            _ => 5,
107        }
108    }
109    levels.max_by_key(|level| rank(level))
110}
111
112fn parse_tool_kind(value: Option<&serde_json::Value>) -> ToolKind {
113    match value.and_then(|v| v.as_str()).unwrap_or("") {
114        "read" => ToolKind::Read,
115        "edit" => ToolKind::Edit,
116        "delete" => ToolKind::Delete,
117        "move" => ToolKind::Move,
118        "search" => ToolKind::Search,
119        "execute" => ToolKind::Execute,
120        "think" => ToolKind::Think,
121        "fetch" => ToolKind::Fetch,
122        _ => ToolKind::Other,
123    }
124}
125
126fn parse_tool_annotations(map: &serde_json::Map<String, serde_json::Value>) -> ToolAnnotations {
127    let policy = map
128        .get("policy")
129        .and_then(|value| value.as_object())
130        .cloned()
131        .unwrap_or_default();
132
133    let capabilities = policy
134        .get("capabilities")
135        .and_then(|value| value.as_object())
136        .map(|caps| {
137            caps.iter()
138                .map(|(capability, ops)| {
139                    let values = ops
140                        .as_array()
141                        .map(|items| {
142                            items
143                                .iter()
144                                .filter_map(|item| item.as_str().map(|s| s.to_string()))
145                                .collect::<Vec<_>>()
146                        })
147                        .unwrap_or_default();
148                    (capability.clone(), values)
149                })
150                .collect::<BTreeMap<_, _>>()
151        })
152        .unwrap_or_default();
153
154    // Accept both the structured `policy.arg_schema` object and the legacy
155    // flat fields on `policy` so pipelines can migrate gradually.
156    let arg_schema = if let Some(schema) = policy.get("arg_schema") {
157        serde_json::from_value::<ToolArgSchema>(schema.clone()).unwrap_or_default()
158    } else {
159        ToolArgSchema {
160            path_params: policy
161                .get("path_params")
162                .and_then(|value| value.as_array())
163                .map(|items| {
164                    items
165                        .iter()
166                        .filter_map(|item| item.as_str().map(|s| s.to_string()))
167                        .collect::<Vec<_>>()
168                })
169                .unwrap_or_default(),
170            arg_aliases: policy
171                .get("arg_aliases")
172                .and_then(|value| value.as_object())
173                .map(|aliases| {
174                    aliases
175                        .iter()
176                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
177                        .collect::<BTreeMap<_, _>>()
178                })
179                .unwrap_or_default(),
180            required: policy
181                .get("required")
182                .and_then(|value| value.as_array())
183                .map(|items| {
184                    items
185                        .iter()
186                        .filter_map(|item| item.as_str().map(|s| s.to_string()))
187                        .collect::<Vec<_>>()
188                })
189                .unwrap_or_default(),
190        }
191    };
192
193    let kind = parse_tool_kind(policy.get("kind"));
194    let side_effect_level = policy
195        .get("side_effect_level")
196        .and_then(|value| value.as_str())
197        .map(SideEffectLevel::parse)
198        .unwrap_or_default();
199
200    ToolAnnotations {
201        kind,
202        side_effect_level,
203        arg_schema,
204        capabilities,
205    }
206}
207
208pub fn workflow_tool_annotations(value: &serde_json::Value) -> BTreeMap<String, ToolAnnotations> {
209    match value {
210        serde_json::Value::Null => BTreeMap::new(),
211        serde_json::Value::Array(items) => items
212            .iter()
213            .filter_map(|item| match item {
214                serde_json::Value::Object(map) => map
215                    .get("name")
216                    .and_then(|value| value.as_str())
217                    .filter(|name| !name.is_empty())
218                    .map(|name| (name.to_string(), parse_tool_annotations(map))),
219                _ => None,
220            })
221            .collect(),
222        serde_json::Value::Object(map) => {
223            if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
224                return map
225                    .get("tools")
226                    .map(workflow_tool_annotations)
227                    .unwrap_or_default();
228            }
229            map.get("name")
230                .and_then(|value| value.as_str())
231                .filter(|name| !name.is_empty())
232                .map(|name| {
233                    let mut annotations = BTreeMap::new();
234                    annotations.insert(name.to_string(), parse_tool_annotations(map));
235                    annotations
236                })
237                .unwrap_or_default()
238        }
239        _ => BTreeMap::new(),
240    }
241}
242
243pub fn workflow_tool_policy_from_tools(value: &serde_json::Value) -> CapabilityPolicy {
244    let tools = workflow_tool_names(value);
245    let tool_annotations = workflow_tool_annotations(value);
246    let mut capabilities: BTreeMap<String, Vec<String>> = BTreeMap::new();
247    for annotations in tool_annotations.values() {
248        for (capability, ops) in &annotations.capabilities {
249            let entry = capabilities.entry(capability.clone()).or_default();
250            for op in ops {
251                if !entry.contains(op) {
252                    entry.push(op.clone());
253                }
254            }
255            entry.sort();
256        }
257    }
258    let side_effect_level = max_side_effect_level(
259        tool_annotations
260            .values()
261            .map(|annotations| annotations.side_effect_level.as_str().to_string())
262            .filter(|level| level != "none"),
263    );
264    CapabilityPolicy {
265        tools,
266        capabilities,
267        workspace_roots: Vec::new(),
268        side_effect_level,
269        recursion_limit: None,
270        tool_arg_constraints: Vec::new(),
271        tool_annotations,
272    }
273}
274
275#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
276#[serde(default)]
277pub struct WorkflowEdge {
278    pub from: String,
279    pub to: String,
280    pub branch: Option<String>,
281    pub label: Option<String>,
282}
283
284#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
285#[serde(default)]
286pub struct WorkflowGraph {
287    #[serde(rename = "_type")]
288    pub type_name: String,
289    pub id: String,
290    pub name: Option<String>,
291    pub version: usize,
292    pub entry: String,
293    pub nodes: BTreeMap<String, WorkflowNode>,
294    pub edges: Vec<WorkflowEdge>,
295    pub capability_policy: CapabilityPolicy,
296    pub approval_policy: super::ToolApprovalPolicy,
297    pub metadata: BTreeMap<String, serde_json::Value>,
298    pub audit_log: Vec<WorkflowAuditEntry>,
299}
300
301#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
302#[serde(default)]
303pub struct WorkflowAuditEntry {
304    pub id: String,
305    pub op: String,
306    pub node_id: Option<String>,
307    pub timestamp: String,
308    pub reason: Option<String>,
309    pub metadata: BTreeMap<String, serde_json::Value>,
310}
311
312#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
313#[serde(default)]
314pub struct WorkflowValidationReport {
315    pub valid: bool,
316    pub errors: Vec<String>,
317    pub warnings: Vec<String>,
318    pub reachable_nodes: Vec<String>,
319}
320
321pub fn parse_workflow_node_value(value: &VmValue, label: &str) -> Result<WorkflowNode, VmError> {
322    let mut node: WorkflowNode = super::parse_json_payload(vm_value_to_json(value), label)?;
323    let dict = value.as_dict();
324    node.raw_tools = dict.and_then(|d| d.get("tools")).cloned();
325    node.raw_transcript_policy = dict.and_then(|d| d.get("transcript_policy")).cloned();
326    node.raw_model_policy = dict.and_then(|d| d.get("model_policy")).cloned();
327    Ok(node)
328}
329
330pub fn parse_workflow_node_json(
331    json: serde_json::Value,
332    label: &str,
333) -> Result<WorkflowNode, VmError> {
334    super::parse_json_payload(json, label)
335}
336
337pub fn parse_workflow_edge_json(
338    json: serde_json::Value,
339    label: &str,
340) -> Result<WorkflowEdge, VmError> {
341    super::parse_json_payload(json, label)
342}
343
344pub fn normalize_workflow_value(value: &VmValue) -> Result<WorkflowGraph, VmError> {
345    let mut graph: WorkflowGraph = super::parse_json_value(value)?;
346    let as_dict = value.as_dict().cloned().unwrap_or_default();
347
348    if graph.nodes.is_empty() {
349        for key in ["act", "verify", "repair"] {
350            if let Some(node_value) = as_dict.get(key) {
351                let mut node = parse_workflow_node_value(node_value, "orchestration")?;
352                let raw_node = node_value.as_dict().cloned().unwrap_or_default();
353                node.id = Some(key.to_string());
354                if node.kind.is_empty() {
355                    node.kind = if key == "verify" {
356                        "verify".to_string()
357                    } else {
358                        "stage".to_string()
359                    };
360                }
361                if node.model_policy.provider.is_none() {
362                    node.model_policy.provider = as_dict
363                        .get("provider")
364                        .map(|value| value.display())
365                        .filter(|value| !value.is_empty());
366                }
367                if node.model_policy.model.is_none() {
368                    node.model_policy.model = as_dict
369                        .get("model")
370                        .map(|value| value.display())
371                        .filter(|value| !value.is_empty());
372                }
373                if node.model_policy.model_tier.is_none() {
374                    node.model_policy.model_tier = as_dict
375                        .get("model_tier")
376                        .or_else(|| as_dict.get("tier"))
377                        .map(|value| value.display())
378                        .filter(|value| !value.is_empty());
379                }
380                if node.model_policy.temperature.is_none() {
381                    node.model_policy.temperature = as_dict.get("temperature").and_then(|value| {
382                        if let VmValue::Float(number) = value {
383                            Some(*number)
384                        } else {
385                            value.as_int().map(|number| number as f64)
386                        }
387                    });
388                }
389                if node.model_policy.max_tokens.is_none() {
390                    node.model_policy.max_tokens =
391                        as_dict.get("max_tokens").and_then(|value| value.as_int());
392                }
393                if node.mode.is_none() {
394                    node.mode = as_dict
395                        .get("mode")
396                        .map(|value| value.display())
397                        .filter(|value| !value.is_empty());
398                }
399                if node.done_sentinel.is_none() {
400                    node.done_sentinel = as_dict
401                        .get("done_sentinel")
402                        .map(|value| value.display())
403                        .filter(|value| !value.is_empty());
404                }
405                if key == "verify"
406                    && node.verify.is_none()
407                    && (raw_node.contains_key("assert_text")
408                        || raw_node.contains_key("command")
409                        || raw_node.contains_key("expect_status")
410                        || raw_node.contains_key("expect_text"))
411                {
412                    node.verify = Some(serde_json::json!({
413                        "assert_text": raw_node.get("assert_text").map(vm_value_to_json),
414                        "command": raw_node.get("command").map(vm_value_to_json),
415                        "expect_status": raw_node.get("expect_status").map(vm_value_to_json),
416                        "expect_text": raw_node.get("expect_text").map(vm_value_to_json),
417                    }));
418                }
419                graph.nodes.insert(key.to_string(), node);
420            }
421        }
422        if graph.entry.is_empty() && graph.nodes.contains_key("act") {
423            graph.entry = "act".to_string();
424        }
425        if graph.edges.is_empty() && graph.nodes.contains_key("act") {
426            if graph.nodes.contains_key("verify") {
427                graph.edges.push(WorkflowEdge {
428                    from: "act".to_string(),
429                    to: "verify".to_string(),
430                    branch: None,
431                    label: None,
432                });
433            }
434            if graph.nodes.contains_key("repair") {
435                graph.edges.push(WorkflowEdge {
436                    from: "verify".to_string(),
437                    to: "repair".to_string(),
438                    branch: Some("failed".to_string()),
439                    label: None,
440                });
441                graph.edges.push(WorkflowEdge {
442                    from: "repair".to_string(),
443                    to: "verify".to_string(),
444                    branch: Some("retry".to_string()),
445                    label: None,
446                });
447            }
448        }
449    }
450
451    if graph.type_name.is_empty() {
452        graph.type_name = "workflow_graph".to_string();
453    }
454    if graph.id.is_empty() {
455        graph.id = new_id("workflow");
456    }
457    if graph.version == 0 {
458        graph.version = 1;
459    }
460    if graph.entry.is_empty() {
461        graph.entry = graph
462            .nodes
463            .keys()
464            .next()
465            .cloned()
466            .unwrap_or_else(|| "act".to_string());
467    }
468    for (node_id, node) in &mut graph.nodes {
469        if node.raw_tools.is_none() {
470            node.raw_tools = as_dict
471                .get("nodes")
472                .and_then(|nodes| nodes.as_dict())
473                .and_then(|nodes| nodes.get(node_id))
474                .and_then(|node_value| node_value.as_dict())
475                .and_then(|raw_node| raw_node.get("tools"))
476                .cloned();
477        }
478        if node.id.is_none() {
479            node.id = Some(node_id.clone());
480        }
481        if node.kind.is_empty() {
482            node.kind = "stage".to_string();
483        }
484        if node.join_policy.strategy.is_empty() {
485            node.join_policy.strategy = "all".to_string();
486        }
487        if node.reduce_policy.strategy.is_empty() {
488            node.reduce_policy.strategy = "concat".to_string();
489        }
490        if node.output_contract.output_kinds.is_empty() {
491            node.output_contract.output_kinds = vec![match node.kind.as_str() {
492                "verify" => "verification_result".to_string(),
493                "reduce" => node
494                    .reduce_policy
495                    .output_kind
496                    .clone()
497                    .unwrap_or_else(|| "summary".to_string()),
498                "map" => node
499                    .map_policy
500                    .output_kind
501                    .clone()
502                    .unwrap_or_else(|| "artifact".to_string()),
503                "escalation" => "plan".to_string(),
504                _ => "artifact".to_string(),
505            }];
506        }
507        if node.retry_policy.max_attempts == 0 {
508            node.retry_policy.max_attempts = 1;
509        }
510    }
511    Ok(graph)
512}
513
514pub fn validate_workflow(
515    graph: &WorkflowGraph,
516    ceiling: Option<&CapabilityPolicy>,
517) -> WorkflowValidationReport {
518    let mut errors = Vec::new();
519    let mut warnings = Vec::new();
520
521    if !graph.nodes.contains_key(&graph.entry) {
522        errors.push(format!("entry node does not exist: {}", graph.entry));
523    }
524
525    let node_ids: BTreeSet<String> = graph.nodes.keys().cloned().collect();
526    for edge in &graph.edges {
527        if !node_ids.contains(&edge.from) {
528            errors.push(format!("edge.from references unknown node: {}", edge.from));
529        }
530        if !node_ids.contains(&edge.to) {
531            errors.push(format!("edge.to references unknown node: {}", edge.to));
532        }
533    }
534
535    let reachable_nodes = reachable_nodes(graph);
536    for node_id in &node_ids {
537        if !reachable_nodes.contains(node_id) {
538            warnings.push(format!("node is unreachable: {node_id}"));
539        }
540    }
541
542    for (node_id, node) in &graph.nodes {
543        let incoming = graph
544            .edges
545            .iter()
546            .filter(|edge| edge.to == *node_id)
547            .count();
548        let outgoing: Vec<&WorkflowEdge> = graph
549            .edges
550            .iter()
551            .filter(|edge| edge.from == *node_id)
552            .collect();
553        if let Some(min_inputs) = node.input_contract.min_inputs {
554            if let Some(max_inputs) = node.input_contract.max_inputs {
555                if min_inputs > max_inputs {
556                    errors.push(format!(
557                        "node {node_id}: input contract min_inputs exceeds max_inputs"
558                    ));
559                }
560            }
561        }
562        match node.kind.as_str() {
563            "condition" => {
564                let has_true = outgoing
565                    .iter()
566                    .any(|edge| edge.branch.as_deref() == Some("true"));
567                let has_false = outgoing
568                    .iter()
569                    .any(|edge| edge.branch.as_deref() == Some("false"));
570                if !has_true || !has_false {
571                    errors.push(format!(
572                        "node {node_id}: condition nodes require both 'true' and 'false' branch edges"
573                    ));
574                }
575            }
576            "fork" => {
577                if outgoing.len() < 2 {
578                    errors.push(format!(
579                        "node {node_id}: fork nodes require at least two outgoing edges"
580                    ));
581                }
582            }
583            "join" => {
584                if incoming < 2 {
585                    warnings.push(format!(
586                        "node {node_id}: join node has fewer than two incoming edges"
587                    ));
588                }
589            }
590            "map" => {
591                if node.map_policy.items.is_empty()
592                    && node.map_policy.item_artifact_kind.is_none()
593                    && node.input_contract.input_kinds.is_empty()
594                {
595                    errors.push(format!(
596                        "node {node_id}: map nodes require items, item_artifact_kind, or input_contract.input_kinds"
597                    ));
598                }
599            }
600            "reduce" => {
601                if node.input_contract.input_kinds.is_empty() {
602                    warnings.push(format!(
603                        "node {node_id}: reduce node has no input_contract.input_kinds; it will consume all available artifacts"
604                    ));
605                }
606            }
607            _ => {}
608        }
609    }
610
611    if let Some(ceiling) = ceiling {
612        if let Err(error) = ceiling.intersect(&graph.capability_policy) {
613            errors.push(error);
614        }
615        for (node_id, node) in &graph.nodes {
616            if let Err(error) = ceiling.intersect(&node.capability_policy) {
617                errors.push(format!("node {node_id}: {error}"));
618            }
619        }
620    }
621
622    WorkflowValidationReport {
623        valid: errors.is_empty(),
624        errors,
625        warnings,
626        reachable_nodes: reachable_nodes.into_iter().collect(),
627    }
628}
629
630fn reachable_nodes(graph: &WorkflowGraph) -> BTreeSet<String> {
631    let mut seen = BTreeSet::new();
632    let mut stack = vec![graph.entry.clone()];
633    while let Some(node_id) = stack.pop() {
634        if !seen.insert(node_id.clone()) {
635            continue;
636        }
637        for edge in graph.edges.iter().filter(|edge| edge.from == node_id) {
638            stack.push(edge.to.clone());
639        }
640    }
641    seen
642}
643
644pub async fn execute_stage_node(
645    node_id: &str,
646    node: &WorkflowNode,
647    task: &str,
648    artifacts: &[ArtifactRecord],
649    transcript: Option<VmValue>,
650) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
651    let mut selection_policy = node.context_policy.clone();
652    if selection_policy.include_kinds.is_empty() && !node.input_contract.input_kinds.is_empty() {
653        selection_policy.include_kinds = node.input_contract.input_kinds.clone();
654    }
655    let selected = super::select_artifacts_adaptive(artifacts.to_vec(), &selection_policy);
656    let rendered_context = super::render_artifacts_context(&selected, &node.context_policy);
657    let input_transcript = apply_input_transcript_policy(transcript, &node.transcript_policy);
658    if node.input_contract.require_transcript && input_transcript.is_none() {
659        return Err(VmError::Runtime(format!(
660            "workflow stage {node_id} requires transcript input"
661        )));
662    }
663    if let Some(min_inputs) = node.input_contract.min_inputs {
664        if selected.len() < min_inputs {
665            return Err(VmError::Runtime(format!(
666                "workflow stage {node_id} requires at least {min_inputs} input artifacts"
667            )));
668        }
669    }
670    if let Some(max_inputs) = node.input_contract.max_inputs {
671        if selected.len() > max_inputs {
672            return Err(VmError::Runtime(format!(
673                "workflow stage {node_id} accepts at most {max_inputs} input artifacts"
674            )));
675        }
676    }
677    let prompt = super::render_workflow_prompt(task, node.task_label.as_deref(), &rendered_context);
678
679    let tool_format = std::env::var("HARN_AGENT_TOOL_FORMAT")
680        .ok()
681        .filter(|value| !value.trim().is_empty())
682        .unwrap_or_else(|| {
683            let model = std::env::var("HARN_LLM_MODEL").unwrap_or_default();
684            let provider = std::env::var("HARN_LLM_PROVIDER").unwrap_or_default();
685            crate::llm_config::default_tool_format(&model, &provider)
686        });
687    let mut llm_result = if node.kind == "verify" {
688        if let Some(command) = node
689            .verify
690            .as_ref()
691            .and_then(|verify| verify.as_object())
692            .and_then(|verify| verify.get("command"))
693            .and_then(|value| value.as_str())
694            .map(str::trim)
695            .filter(|value| !value.is_empty())
696        {
697            let mut process = if cfg!(target_os = "windows") {
698                let mut cmd = tokio::process::Command::new("cmd");
699                cmd.arg("/C").arg(command);
700                cmd
701            } else {
702                let mut cmd = tokio::process::Command::new("/bin/sh");
703                cmd.arg("-lc").arg(command);
704                cmd
705            };
706            process.stdin(std::process::Stdio::null());
707            if let Some(context) = crate::stdlib::process::current_execution_context() {
708                if let Some(cwd) = context.cwd.filter(|cwd| !cwd.is_empty()) {
709                    process.current_dir(cwd);
710                }
711                if !context.env.is_empty() {
712                    process.envs(context.env);
713                }
714            }
715            let output = process
716                .output()
717                .await
718                .map_err(|e| VmError::Runtime(format!("workflow verify exec failed: {e}")))?;
719            let stdout = String::from_utf8_lossy(&output.stdout).to_string();
720            let stderr = String::from_utf8_lossy(&output.stderr).to_string();
721            let combined = if stderr.is_empty() {
722                stdout.clone()
723            } else if stdout.is_empty() {
724                stderr.clone()
725            } else {
726                format!("{stdout}\n{stderr}")
727            };
728            serde_json::json!({
729                "status": "completed",
730                "text": combined,
731                "visible_text": combined,
732                "command": command,
733                "stdout": stdout,
734                "stderr": stderr,
735                "exit_status": output.status.code().unwrap_or(-1),
736                "success": output.status.success(),
737            })
738        } else {
739            serde_json::json!({
740                "status": "completed",
741                "text": "",
742                "visible_text": "",
743            })
744        }
745    } else {
746        let mut options = BTreeMap::new();
747        if let Some(provider) = &node.model_policy.provider {
748            options.insert(
749                "provider".to_string(),
750                VmValue::String(Rc::from(provider.clone())),
751            );
752        }
753        if let Some(model) = &node.model_policy.model {
754            options.insert(
755                "model".to_string(),
756                VmValue::String(Rc::from(model.clone())),
757            );
758        }
759        if let Some(model_tier) = &node.model_policy.model_tier {
760            options.insert(
761                "model_tier".to_string(),
762                VmValue::String(Rc::from(model_tier.clone())),
763            );
764        }
765        if let Some(temperature) = node.model_policy.temperature {
766            options.insert("temperature".to_string(), VmValue::Float(temperature));
767        }
768        if let Some(max_tokens) = node.model_policy.max_tokens {
769            options.insert("max_tokens".to_string(), VmValue::Int(max_tokens));
770        }
771        let tool_names = workflow_tool_names(&node.tools);
772        let tools_value = node.raw_tools.clone().or_else(|| {
773            if matches!(node.tools, serde_json::Value::Null) {
774                None
775            } else {
776                Some(crate::stdlib::json_to_vm_value(&node.tools))
777            }
778        });
779        if tools_value.is_some() && !tool_names.is_empty() {
780            options.insert("tools".to_string(), tools_value.unwrap_or(VmValue::Nil));
781        }
782        if let Some(transcript) = input_transcript.clone() {
783            options.insert("transcript".to_string(), transcript);
784        }
785
786        let args = vec![
787            VmValue::String(Rc::from(prompt.clone())),
788            node.system
789                .clone()
790                .map(|s| VmValue::String(Rc::from(s)))
791                .unwrap_or(VmValue::Nil),
792            VmValue::Dict(Rc::new(options)),
793        ];
794        let mut opts = extract_llm_options(&args)?;
795
796        if node.mode.as_deref() == Some("agent") || !tool_names.is_empty() {
797            let tool_policy = workflow_tool_policy_from_tools(&node.tools);
798            let effective_policy = tool_policy
799                .intersect(&node.capability_policy)
800                .map_err(VmError::Runtime)?;
801            let auto_compact = if node.transcript_policy.auto_compact {
802                let mut ac = crate::orchestration::AutoCompactConfig::default();
803                if let Some(v) = node.transcript_policy.compact_threshold {
804                    ac.token_threshold = v;
805                }
806                if let Some(v) = node.transcript_policy.tool_output_max_chars {
807                    ac.tool_output_max_chars = v;
808                }
809                if let Some(ref strategy) = node.transcript_policy.compact_strategy {
810                    if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
811                        ac.compact_strategy = s;
812                    }
813                }
814                if let Some(v) = node.transcript_policy.hard_limit_tokens {
815                    ac.hard_limit_tokens = Some(v);
816                }
817                if let Some(ref strategy) = node.transcript_policy.hard_limit_strategy {
818                    if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
819                        ac.hard_limit_strategy = s;
820                    }
821                }
822                // Closure fields can't round-trip through serde, so extract them
823                // directly from the raw VmValue dict.
824                if let Some(ref raw_tp) = node.raw_transcript_policy {
825                    if let Some(dict) = raw_tp.as_dict() {
826                        if let Some(cb) = dict.get("compress_callback") {
827                            ac.compress_callback = Some(cb.clone());
828                        }
829                        if let Some(cb) = dict.get("mask_callback") {
830                            ac.mask_callback = Some(cb.clone());
831                        }
832                    }
833                }
834                {
835                    let user_specified_threshold =
836                        node.transcript_policy.compact_threshold.is_some();
837                    let user_specified_hard_limit =
838                        node.transcript_policy.hard_limit_tokens.is_some();
839                    crate::llm::api::adapt_auto_compact_to_provider(
840                        &mut ac,
841                        user_specified_threshold,
842                        user_specified_hard_limit,
843                        &opts.provider,
844                        &opts.model,
845                        &opts.api_key,
846                    )
847                    .await;
848                }
849                Some(ac)
850            } else {
851                None
852            };
853            crate::llm::run_agent_loop_internal(
854                &mut opts,
855                crate::llm::AgentLoopConfig {
856                    persistent: true,
857                    max_iterations: node.model_policy.max_iterations.unwrap_or(16),
858                    max_nudges: node.model_policy.max_nudges.unwrap_or(3),
859                    nudge: node.model_policy.nudge.clone(),
860                    done_sentinel: node.done_sentinel.clone(),
861                    break_unless_phase: None,
862                    tool_retries: 0,
863                    tool_backoff_ms: 1000,
864                    tool_format: tool_format.clone(),
865                    auto_compact,
866                    policy: Some(effective_policy),
867                    approval_policy: Some(node.approval_policy.clone()),
868                    daemon: false,
869                    daemon_config: Default::default(),
870                    llm_retries: 2,
871                    llm_backoff_ms: 2000,
872                    exit_when_verified: node.exit_when_verified,
873                    loop_detect_warn: 2,
874                    loop_detect_block: 3,
875                    loop_detect_skip: 4,
876                    tool_examples: node.model_policy.tool_examples.clone(),
877                    turn_policy: node.model_policy.turn_policy.clone(),
878                    stop_after_successful_tools: node
879                        .model_policy
880                        .stop_after_successful_tools
881                        .clone(),
882                    require_successful_tools: node.model_policy.require_successful_tools.clone(),
883                    // Honor caller-supplied session_id so pipelines that install
884                    // agent_subscribe handlers keyed on that id actually receive
885                    // events. Falls back to a freshly minted id.
886                    session_id: node
887                        .raw_model_policy
888                        .as_ref()
889                        .and_then(|v| v.as_dict())
890                        .and_then(|d| d.get("session_id"))
891                        .and_then(|v| match v {
892                            crate::value::VmValue::String(s) if !s.trim().is_empty() => {
893                                Some(s.to_string())
894                            }
895                            _ => None,
896                        })
897                        .unwrap_or_else(|| format!("workflow_stage_{}", uuid::Uuid::now_v7())),
898                    event_sink: None,
899                    // Seed from the stage's explicit deliverables/ledger so the
900                    // graph carries a task-wide plan through map branches and
901                    // nested stages. Empty ledger means no gate.
902                    task_ledger: node
903                        .raw_model_policy
904                        .as_ref()
905                        .and_then(|v| v.as_dict())
906                        .and_then(|d| d.get("task_ledger"))
907                        .map(crate::llm::helpers::vm_value_to_json)
908                        .and_then(|json| serde_json::from_value(json).ok())
909                        .unwrap_or_default(),
910                    post_turn_callback: node
911                        .raw_model_policy
912                        .as_ref()
913                        .and_then(|v| v.as_dict())
914                        .and_then(|d| d.get("post_turn_callback"))
915                        .filter(|v| matches!(v, crate::value::VmValue::Closure(_)))
916                        .cloned(),
917                },
918            )
919            .await?
920        } else {
921            let result = vm_call_llm_full(&opts).await?;
922            crate::llm::agent_loop_result_from_llm(&result, opts)
923        }
924    };
925    if let Some(payload) = llm_result.as_object_mut() {
926        payload.insert("prompt".to_string(), serde_json::json!(prompt));
927        payload.insert(
928            "system_prompt".to_string(),
929            serde_json::json!(node.system.clone().unwrap_or_default()),
930        );
931        payload.insert(
932            "rendered_context".to_string(),
933            serde_json::json!(rendered_context),
934        );
935        payload.insert(
936            "selected_artifact_ids".to_string(),
937            serde_json::json!(selected
938                .iter()
939                .map(|artifact| artifact.id.clone())
940                .collect::<Vec<_>>()),
941        );
942        payload.insert(
943            "selected_artifact_titles".to_string(),
944            serde_json::json!(selected
945                .iter()
946                .map(|artifact| artifact.title.clone())
947                .collect::<Vec<_>>()),
948        );
949        payload.insert(
950            "tool_calling_mode".to_string(),
951            serde_json::json!(tool_format.clone()),
952        );
953    }
954
955    let visible_text = llm_result["text"].as_str().unwrap_or_default().to_string();
956    // Non-LLM stages (verify command, condition, fork, join, ...) don't produce
957    // a "transcript" field; fall back to the input so cross-stage conversation
958    // state survives transitions.
959    let result_transcript = llm_result
960        .get("transcript")
961        .cloned()
962        .map(|value| crate::stdlib::json_to_vm_value(&value));
963    let transcript = apply_output_transcript_policy(
964        result_transcript.or(input_transcript),
965        &node.transcript_policy,
966    );
967    let output_kind = node
968        .output_contract
969        .output_kinds
970        .first()
971        .cloned()
972        .unwrap_or_else(|| {
973            if node.kind == "verify" {
974                "verification_result".to_string()
975            } else {
976                "artifact".to_string()
977            }
978        });
979    let mut metadata = BTreeMap::new();
980    metadata.insert(
981        "input_artifact_ids".to_string(),
982        serde_json::json!(selected
983            .iter()
984            .map(|artifact| artifact.id.clone())
985            .collect::<Vec<_>>()),
986    );
987    metadata.insert("node_kind".to_string(), serde_json::json!(node.kind));
988    let artifact = ArtifactRecord {
989        type_name: "artifact".to_string(),
990        id: new_id("artifact"),
991        kind: output_kind,
992        title: Some(format!("stage {node_id} output")),
993        text: Some(visible_text),
994        data: Some(llm_result.clone()),
995        source: Some(node_id.to_string()),
996        created_at: now_rfc3339(),
997        freshness: Some("fresh".to_string()),
998        priority: None,
999        lineage: selected
1000            .iter()
1001            .map(|artifact| artifact.id.clone())
1002            .collect(),
1003        relevance: Some(1.0),
1004        estimated_tokens: None,
1005        stage: Some(node_id.to_string()),
1006        metadata,
1007    }
1008    .normalize();
1009
1010    Ok((llm_result, vec![artifact], transcript))
1011}
1012
1013pub fn next_nodes_for(
1014    graph: &WorkflowGraph,
1015    current: &str,
1016    branch: Option<&str>,
1017) -> Vec<WorkflowEdge> {
1018    let mut matching: Vec<WorkflowEdge> = graph
1019        .edges
1020        .iter()
1021        .filter(|edge| edge.from == current && edge.branch.as_deref() == branch)
1022        .cloned()
1023        .collect();
1024    if matching.is_empty() {
1025        matching = graph
1026            .edges
1027            .iter()
1028            .filter(|edge| edge.from == current && edge.branch.is_none())
1029            .cloned()
1030            .collect();
1031    }
1032    matching
1033}
1034
1035pub fn next_node_for(graph: &WorkflowGraph, current: &str, branch: &str) -> Option<String> {
1036    next_nodes_for(graph, current, Some(branch))
1037        .into_iter()
1038        .next()
1039        .map(|edge| edge.to)
1040}
1041
1042pub fn append_audit_entry(
1043    graph: &mut WorkflowGraph,
1044    op: &str,
1045    node_id: Option<String>,
1046    reason: Option<String>,
1047    metadata: BTreeMap<String, serde_json::Value>,
1048) {
1049    graph.audit_log.push(WorkflowAuditEntry {
1050        id: new_id("audit"),
1051        op: op.to_string(),
1052        node_id,
1053        timestamp: now_rfc3339(),
1054        reason,
1055        metadata,
1056    });
1057}