Skip to main content

harn_vm/orchestration/
workflow.rs

1//! Workflow graph types, normalization, validation, and execution.
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::rc::Rc;
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9    apply_input_transcript_policy, apply_output_transcript_policy, new_id, now_rfc3339,
10    ArtifactRecord, BranchSemantics, CapabilityPolicy, ContextPolicy, EscalationPolicy, JoinPolicy,
11    MapPolicy, ModelPolicy, ReducePolicy, RetryPolicy, StageContract, TranscriptPolicy,
12};
13use crate::llm::{extract_llm_options, vm_call_llm_full, vm_value_to_json};
14use crate::tool_annotations::{SideEffectLevel, ToolAnnotations, ToolArgSchema, ToolKind};
15use crate::value::{VmError, VmValue};
16
17#[derive(Clone, Debug, Default, Serialize, Deserialize)]
18#[serde(default)]
19pub struct WorkflowNode {
20    pub id: Option<String>,
21    pub kind: String,
22    pub mode: Option<String>,
23    pub prompt: Option<String>,
24    pub system: Option<String>,
25    pub task_label: Option<String>,
26    pub done_sentinel: Option<String>,
27    pub tools: serde_json::Value,
28    pub model_policy: ModelPolicy,
29    pub transcript_policy: TranscriptPolicy,
30    pub context_policy: ContextPolicy,
31    pub retry_policy: RetryPolicy,
32    pub capability_policy: CapabilityPolicy,
33    pub approval_policy: super::ToolApprovalPolicy,
34    pub input_contract: StageContract,
35    pub output_contract: StageContract,
36    pub branch_semantics: BranchSemantics,
37    pub map_policy: MapPolicy,
38    pub join_policy: JoinPolicy,
39    pub reduce_policy: ReducePolicy,
40    pub escalation_policy: EscalationPolicy,
41    pub verify: Option<serde_json::Value>,
42    /// When true, the stage's agent loop gates the done sentinel on the most
43    /// recent `run()` tool call exiting cleanly (`exit_code == 0`). Use for
44    /// persistent execute stages that fold verification into the loop via a
45    /// shell-exec tool the model invokes explicitly.
46    #[serde(default)]
47    pub exit_when_verified: bool,
48    pub metadata: BTreeMap<String, serde_json::Value>,
49    #[serde(skip)]
50    pub raw_tools: Option<VmValue>,
51    /// Raw transcript_policy VmValue dict — preserved for extracting closure
52    /// fields (compress_callback, mask_callback) that can't go through serde.
53    #[serde(skip)]
54    pub raw_transcript_policy: Option<VmValue>,
55    /// Raw model_policy VmValue dict — preserved for extracting closure
56    /// fields (post_turn_callback) that can't go through serde.
57    #[serde(skip)]
58    pub raw_model_policy: Option<VmValue>,
59}
60
61impl PartialEq for WorkflowNode {
62    fn eq(&self, other: &Self) -> bool {
63        serde_json::to_value(self).ok() == serde_json::to_value(other).ok()
64    }
65}
66
67pub fn workflow_tool_names(value: &serde_json::Value) -> Vec<String> {
68    match value {
69        serde_json::Value::Null => Vec::new(),
70        serde_json::Value::Array(items) => items
71            .iter()
72            .filter_map(|item| match item {
73                serde_json::Value::Object(map) => map
74                    .get("name")
75                    .and_then(|value| value.as_str())
76                    .filter(|name| !name.is_empty())
77                    .map(|name| name.to_string()),
78                _ => None,
79            })
80            .collect(),
81        serde_json::Value::Object(map) => {
82            if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
83                return map
84                    .get("tools")
85                    .map(workflow_tool_names)
86                    .unwrap_or_default();
87            }
88            map.get("name")
89                .and_then(|value| value.as_str())
90                .filter(|name| !name.is_empty())
91                .map(|name| vec![name.to_string()])
92                .unwrap_or_default()
93        }
94        _ => Vec::new(),
95    }
96}
97
98fn max_side_effect_level(levels: impl Iterator<Item = String>) -> Option<String> {
99    fn rank(v: &str) -> usize {
100        match v {
101            "none" => 0,
102            "read_only" => 1,
103            "workspace_write" => 2,
104            "process_exec" => 3,
105            "network" => 4,
106            _ => 5,
107        }
108    }
109    levels.max_by_key(|level| rank(level))
110}
111
112fn parse_tool_kind(value: Option<&serde_json::Value>) -> ToolKind {
113    match value.and_then(|v| v.as_str()).unwrap_or("") {
114        "read" => ToolKind::Read,
115        "edit" => ToolKind::Edit,
116        "delete" => ToolKind::Delete,
117        "move" => ToolKind::Move,
118        "search" => ToolKind::Search,
119        "execute" => ToolKind::Execute,
120        "think" => ToolKind::Think,
121        "fetch" => ToolKind::Fetch,
122        _ => ToolKind::Other,
123    }
124}
125
126fn parse_tool_annotations(map: &serde_json::Map<String, serde_json::Value>) -> ToolAnnotations {
127    let policy = map
128        .get("policy")
129        .and_then(|value| value.as_object())
130        .cloned()
131        .unwrap_or_default();
132
133    let capabilities = policy
134        .get("capabilities")
135        .and_then(|value| value.as_object())
136        .map(|caps| {
137            caps.iter()
138                .map(|(capability, ops)| {
139                    let values = ops
140                        .as_array()
141                        .map(|items| {
142                            items
143                                .iter()
144                                .filter_map(|item| item.as_str().map(|s| s.to_string()))
145                                .collect::<Vec<_>>()
146                        })
147                        .unwrap_or_default();
148                    (capability.clone(), values)
149                })
150                .collect::<BTreeMap<_, _>>()
151        })
152        .unwrap_or_default();
153
154    // Pipelines may declare arg_schema under `policy.arg_schema` as a
155    // fully structured object, or as legacy flat fields on `policy`.
156    // Prefer the structured form; fall back to the flat form for
157    // gradual migration.
158    let arg_schema = if let Some(schema) = policy.get("arg_schema") {
159        serde_json::from_value::<ToolArgSchema>(schema.clone()).unwrap_or_default()
160    } else {
161        ToolArgSchema {
162            path_params: policy
163                .get("path_params")
164                .and_then(|value| value.as_array())
165                .map(|items| {
166                    items
167                        .iter()
168                        .filter_map(|item| item.as_str().map(|s| s.to_string()))
169                        .collect::<Vec<_>>()
170                })
171                .unwrap_or_default(),
172            arg_aliases: policy
173                .get("arg_aliases")
174                .and_then(|value| value.as_object())
175                .map(|aliases| {
176                    aliases
177                        .iter()
178                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
179                        .collect::<BTreeMap<_, _>>()
180                })
181                .unwrap_or_default(),
182            required: policy
183                .get("required")
184                .and_then(|value| value.as_array())
185                .map(|items| {
186                    items
187                        .iter()
188                        .filter_map(|item| item.as_str().map(|s| s.to_string()))
189                        .collect::<Vec<_>>()
190                })
191                .unwrap_or_default(),
192        }
193    };
194
195    let kind = parse_tool_kind(policy.get("kind"));
196    let side_effect_level = policy
197        .get("side_effect_level")
198        .and_then(|value| value.as_str())
199        .map(SideEffectLevel::parse)
200        .unwrap_or_default();
201
202    ToolAnnotations {
203        kind,
204        side_effect_level,
205        arg_schema,
206        capabilities,
207    }
208}
209
210pub fn workflow_tool_annotations(value: &serde_json::Value) -> BTreeMap<String, ToolAnnotations> {
211    match value {
212        serde_json::Value::Null => BTreeMap::new(),
213        serde_json::Value::Array(items) => items
214            .iter()
215            .filter_map(|item| match item {
216                serde_json::Value::Object(map) => map
217                    .get("name")
218                    .and_then(|value| value.as_str())
219                    .filter(|name| !name.is_empty())
220                    .map(|name| (name.to_string(), parse_tool_annotations(map))),
221                _ => None,
222            })
223            .collect(),
224        serde_json::Value::Object(map) => {
225            if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
226                return map
227                    .get("tools")
228                    .map(workflow_tool_annotations)
229                    .unwrap_or_default();
230            }
231            map.get("name")
232                .and_then(|value| value.as_str())
233                .filter(|name| !name.is_empty())
234                .map(|name| {
235                    let mut annotations = BTreeMap::new();
236                    annotations.insert(name.to_string(), parse_tool_annotations(map));
237                    annotations
238                })
239                .unwrap_or_default()
240        }
241        _ => BTreeMap::new(),
242    }
243}
244
245pub fn workflow_tool_policy_from_tools(value: &serde_json::Value) -> CapabilityPolicy {
246    let tools = workflow_tool_names(value);
247    let tool_annotations = workflow_tool_annotations(value);
248    let mut capabilities: BTreeMap<String, Vec<String>> = BTreeMap::new();
249    for annotations in tool_annotations.values() {
250        for (capability, ops) in &annotations.capabilities {
251            let entry = capabilities.entry(capability.clone()).or_default();
252            for op in ops {
253                if !entry.contains(op) {
254                    entry.push(op.clone());
255                }
256            }
257            entry.sort();
258        }
259    }
260    let side_effect_level = max_side_effect_level(
261        tool_annotations
262            .values()
263            .map(|annotations| annotations.side_effect_level.as_str().to_string())
264            .filter(|level| level != "none"),
265    );
266    CapabilityPolicy {
267        tools,
268        capabilities,
269        workspace_roots: Vec::new(),
270        side_effect_level,
271        recursion_limit: None,
272        tool_arg_constraints: Vec::new(),
273        tool_annotations,
274    }
275}
276
277#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
278#[serde(default)]
279pub struct WorkflowEdge {
280    pub from: String,
281    pub to: String,
282    pub branch: Option<String>,
283    pub label: Option<String>,
284}
285
286#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
287#[serde(default)]
288pub struct WorkflowGraph {
289    #[serde(rename = "_type")]
290    pub type_name: String,
291    pub id: String,
292    pub name: Option<String>,
293    pub version: usize,
294    pub entry: String,
295    pub nodes: BTreeMap<String, WorkflowNode>,
296    pub edges: Vec<WorkflowEdge>,
297    pub capability_policy: CapabilityPolicy,
298    pub approval_policy: super::ToolApprovalPolicy,
299    pub metadata: BTreeMap<String, serde_json::Value>,
300    pub audit_log: Vec<WorkflowAuditEntry>,
301}
302
303#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
304#[serde(default)]
305pub struct WorkflowAuditEntry {
306    pub id: String,
307    pub op: String,
308    pub node_id: Option<String>,
309    pub timestamp: String,
310    pub reason: Option<String>,
311    pub metadata: BTreeMap<String, serde_json::Value>,
312}
313
314#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
315#[serde(default)]
316pub struct WorkflowValidationReport {
317    pub valid: bool,
318    pub errors: Vec<String>,
319    pub warnings: Vec<String>,
320    pub reachable_nodes: Vec<String>,
321}
322
323pub fn parse_workflow_node_value(value: &VmValue, label: &str) -> Result<WorkflowNode, VmError> {
324    let mut node: WorkflowNode = super::parse_json_payload(vm_value_to_json(value), label)?;
325    let dict = value.as_dict();
326    node.raw_tools = dict.and_then(|d| d.get("tools")).cloned();
327    node.raw_transcript_policy = dict.and_then(|d| d.get("transcript_policy")).cloned();
328    node.raw_model_policy = dict.and_then(|d| d.get("model_policy")).cloned();
329    Ok(node)
330}
331
332pub fn parse_workflow_node_json(
333    json: serde_json::Value,
334    label: &str,
335) -> Result<WorkflowNode, VmError> {
336    super::parse_json_payload(json, label)
337}
338
339pub fn parse_workflow_edge_json(
340    json: serde_json::Value,
341    label: &str,
342) -> Result<WorkflowEdge, VmError> {
343    super::parse_json_payload(json, label)
344}
345
346pub fn normalize_workflow_value(value: &VmValue) -> Result<WorkflowGraph, VmError> {
347    let mut graph: WorkflowGraph = super::parse_json_value(value)?;
348    let as_dict = value.as_dict().cloned().unwrap_or_default();
349
350    if graph.nodes.is_empty() {
351        for key in ["act", "verify", "repair"] {
352            if let Some(node_value) = as_dict.get(key) {
353                let mut node = parse_workflow_node_value(node_value, "orchestration")?;
354                let raw_node = node_value.as_dict().cloned().unwrap_or_default();
355                node.id = Some(key.to_string());
356                if node.kind.is_empty() {
357                    node.kind = if key == "verify" {
358                        "verify".to_string()
359                    } else {
360                        "stage".to_string()
361                    };
362                }
363                if node.model_policy.provider.is_none() {
364                    node.model_policy.provider = as_dict
365                        .get("provider")
366                        .map(|value| value.display())
367                        .filter(|value| !value.is_empty());
368                }
369                if node.model_policy.model.is_none() {
370                    node.model_policy.model = as_dict
371                        .get("model")
372                        .map(|value| value.display())
373                        .filter(|value| !value.is_empty());
374                }
375                if node.model_policy.model_tier.is_none() {
376                    node.model_policy.model_tier = as_dict
377                        .get("model_tier")
378                        .or_else(|| as_dict.get("tier"))
379                        .map(|value| value.display())
380                        .filter(|value| !value.is_empty());
381                }
382                if node.model_policy.temperature.is_none() {
383                    node.model_policy.temperature = as_dict.get("temperature").and_then(|value| {
384                        if let VmValue::Float(number) = value {
385                            Some(*number)
386                        } else {
387                            value.as_int().map(|number| number as f64)
388                        }
389                    });
390                }
391                if node.model_policy.max_tokens.is_none() {
392                    node.model_policy.max_tokens =
393                        as_dict.get("max_tokens").and_then(|value| value.as_int());
394                }
395                if node.mode.is_none() {
396                    node.mode = as_dict
397                        .get("mode")
398                        .map(|value| value.display())
399                        .filter(|value| !value.is_empty());
400                }
401                if node.done_sentinel.is_none() {
402                    node.done_sentinel = as_dict
403                        .get("done_sentinel")
404                        .map(|value| value.display())
405                        .filter(|value| !value.is_empty());
406                }
407                if key == "verify"
408                    && node.verify.is_none()
409                    && (raw_node.contains_key("assert_text")
410                        || raw_node.contains_key("command")
411                        || raw_node.contains_key("expect_status")
412                        || raw_node.contains_key("expect_text"))
413                {
414                    node.verify = Some(serde_json::json!({
415                        "assert_text": raw_node.get("assert_text").map(vm_value_to_json),
416                        "command": raw_node.get("command").map(vm_value_to_json),
417                        "expect_status": raw_node.get("expect_status").map(vm_value_to_json),
418                        "expect_text": raw_node.get("expect_text").map(vm_value_to_json),
419                    }));
420                }
421                graph.nodes.insert(key.to_string(), node);
422            }
423        }
424        if graph.entry.is_empty() && graph.nodes.contains_key("act") {
425            graph.entry = "act".to_string();
426        }
427        if graph.edges.is_empty() && graph.nodes.contains_key("act") {
428            if graph.nodes.contains_key("verify") {
429                graph.edges.push(WorkflowEdge {
430                    from: "act".to_string(),
431                    to: "verify".to_string(),
432                    branch: None,
433                    label: None,
434                });
435            }
436            if graph.nodes.contains_key("repair") {
437                graph.edges.push(WorkflowEdge {
438                    from: "verify".to_string(),
439                    to: "repair".to_string(),
440                    branch: Some("failed".to_string()),
441                    label: None,
442                });
443                graph.edges.push(WorkflowEdge {
444                    from: "repair".to_string(),
445                    to: "verify".to_string(),
446                    branch: Some("retry".to_string()),
447                    label: None,
448                });
449            }
450        }
451    }
452
453    if graph.type_name.is_empty() {
454        graph.type_name = "workflow_graph".to_string();
455    }
456    if graph.id.is_empty() {
457        graph.id = new_id("workflow");
458    }
459    if graph.version == 0 {
460        graph.version = 1;
461    }
462    if graph.entry.is_empty() {
463        graph.entry = graph
464            .nodes
465            .keys()
466            .next()
467            .cloned()
468            .unwrap_or_else(|| "act".to_string());
469    }
470    for (node_id, node) in &mut graph.nodes {
471        if node.raw_tools.is_none() {
472            node.raw_tools = as_dict
473                .get("nodes")
474                .and_then(|nodes| nodes.as_dict())
475                .and_then(|nodes| nodes.get(node_id))
476                .and_then(|node_value| node_value.as_dict())
477                .and_then(|raw_node| raw_node.get("tools"))
478                .cloned();
479        }
480        if node.id.is_none() {
481            node.id = Some(node_id.clone());
482        }
483        if node.kind.is_empty() {
484            node.kind = "stage".to_string();
485        }
486        if node.join_policy.strategy.is_empty() {
487            node.join_policy.strategy = "all".to_string();
488        }
489        if node.reduce_policy.strategy.is_empty() {
490            node.reduce_policy.strategy = "concat".to_string();
491        }
492        if node.output_contract.output_kinds.is_empty() {
493            node.output_contract.output_kinds = vec![match node.kind.as_str() {
494                "verify" => "verification_result".to_string(),
495                "reduce" => node
496                    .reduce_policy
497                    .output_kind
498                    .clone()
499                    .unwrap_or_else(|| "summary".to_string()),
500                "map" => node
501                    .map_policy
502                    .output_kind
503                    .clone()
504                    .unwrap_or_else(|| "artifact".to_string()),
505                "escalation" => "plan".to_string(),
506                _ => "artifact".to_string(),
507            }];
508        }
509        if node.retry_policy.max_attempts == 0 {
510            node.retry_policy.max_attempts = 1;
511        }
512    }
513    Ok(graph)
514}
515
516pub fn validate_workflow(
517    graph: &WorkflowGraph,
518    ceiling: Option<&CapabilityPolicy>,
519) -> WorkflowValidationReport {
520    let mut errors = Vec::new();
521    let mut warnings = Vec::new();
522
523    if !graph.nodes.contains_key(&graph.entry) {
524        errors.push(format!("entry node does not exist: {}", graph.entry));
525    }
526
527    let node_ids: BTreeSet<String> = graph.nodes.keys().cloned().collect();
528    for edge in &graph.edges {
529        if !node_ids.contains(&edge.from) {
530            errors.push(format!("edge.from references unknown node: {}", edge.from));
531        }
532        if !node_ids.contains(&edge.to) {
533            errors.push(format!("edge.to references unknown node: {}", edge.to));
534        }
535    }
536
537    let reachable_nodes = reachable_nodes(graph);
538    for node_id in &node_ids {
539        if !reachable_nodes.contains(node_id) {
540            warnings.push(format!("node is unreachable: {node_id}"));
541        }
542    }
543
544    for (node_id, node) in &graph.nodes {
545        let incoming = graph
546            .edges
547            .iter()
548            .filter(|edge| edge.to == *node_id)
549            .count();
550        let outgoing: Vec<&WorkflowEdge> = graph
551            .edges
552            .iter()
553            .filter(|edge| edge.from == *node_id)
554            .collect();
555        if let Some(min_inputs) = node.input_contract.min_inputs {
556            if let Some(max_inputs) = node.input_contract.max_inputs {
557                if min_inputs > max_inputs {
558                    errors.push(format!(
559                        "node {node_id}: input contract min_inputs exceeds max_inputs"
560                    ));
561                }
562            }
563        }
564        match node.kind.as_str() {
565            "condition" => {
566                let has_true = outgoing
567                    .iter()
568                    .any(|edge| edge.branch.as_deref() == Some("true"));
569                let has_false = outgoing
570                    .iter()
571                    .any(|edge| edge.branch.as_deref() == Some("false"));
572                if !has_true || !has_false {
573                    errors.push(format!(
574                        "node {node_id}: condition nodes require both 'true' and 'false' branch edges"
575                    ));
576                }
577            }
578            "fork" => {
579                if outgoing.len() < 2 {
580                    errors.push(format!(
581                        "node {node_id}: fork nodes require at least two outgoing edges"
582                    ));
583                }
584            }
585            "join" => {
586                if incoming < 2 {
587                    warnings.push(format!(
588                        "node {node_id}: join node has fewer than two incoming edges"
589                    ));
590                }
591            }
592            "map" => {
593                if node.map_policy.items.is_empty()
594                    && node.map_policy.item_artifact_kind.is_none()
595                    && node.input_contract.input_kinds.is_empty()
596                {
597                    errors.push(format!(
598                        "node {node_id}: map nodes require items, item_artifact_kind, or input_contract.input_kinds"
599                    ));
600                }
601            }
602            "reduce" => {
603                if node.input_contract.input_kinds.is_empty() {
604                    warnings.push(format!(
605                        "node {node_id}: reduce node has no input_contract.input_kinds; it will consume all available artifacts"
606                    ));
607                }
608            }
609            _ => {}
610        }
611    }
612
613    if let Some(ceiling) = ceiling {
614        if let Err(error) = ceiling.intersect(&graph.capability_policy) {
615            errors.push(error);
616        }
617        for (node_id, node) in &graph.nodes {
618            if let Err(error) = ceiling.intersect(&node.capability_policy) {
619                errors.push(format!("node {node_id}: {error}"));
620            }
621        }
622    }
623
624    WorkflowValidationReport {
625        valid: errors.is_empty(),
626        errors,
627        warnings,
628        reachable_nodes: reachable_nodes.into_iter().collect(),
629    }
630}
631
632fn reachable_nodes(graph: &WorkflowGraph) -> BTreeSet<String> {
633    let mut seen = BTreeSet::new();
634    let mut stack = vec![graph.entry.clone()];
635    while let Some(node_id) = stack.pop() {
636        if !seen.insert(node_id.clone()) {
637            continue;
638        }
639        for edge in graph.edges.iter().filter(|edge| edge.from == node_id) {
640            stack.push(edge.to.clone());
641        }
642    }
643    seen
644}
645
646pub async fn execute_stage_node(
647    node_id: &str,
648    node: &WorkflowNode,
649    task: &str,
650    artifacts: &[ArtifactRecord],
651    transcript: Option<VmValue>,
652) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
653    let mut selection_policy = node.context_policy.clone();
654    if selection_policy.include_kinds.is_empty() && !node.input_contract.input_kinds.is_empty() {
655        selection_policy.include_kinds = node.input_contract.input_kinds.clone();
656    }
657    let selected = super::select_artifacts_adaptive(artifacts.to_vec(), &selection_policy);
658    let rendered_context = super::render_artifacts_context(&selected, &node.context_policy);
659    let input_transcript = apply_input_transcript_policy(transcript, &node.transcript_policy);
660    if node.input_contract.require_transcript && input_transcript.is_none() {
661        return Err(VmError::Runtime(format!(
662            "workflow stage {node_id} requires transcript input"
663        )));
664    }
665    if let Some(min_inputs) = node.input_contract.min_inputs {
666        if selected.len() < min_inputs {
667            return Err(VmError::Runtime(format!(
668                "workflow stage {node_id} requires at least {min_inputs} input artifacts"
669            )));
670        }
671    }
672    if let Some(max_inputs) = node.input_contract.max_inputs {
673        if selected.len() > max_inputs {
674            return Err(VmError::Runtime(format!(
675                "workflow stage {node_id} accepts at most {max_inputs} input artifacts"
676            )));
677        }
678    }
679    let prompt = super::render_workflow_prompt(task, node.task_label.as_deref(), &rendered_context);
680
681    let tool_format = std::env::var("HARN_AGENT_TOOL_FORMAT")
682        .ok()
683        .filter(|value| !value.trim().is_empty())
684        .unwrap_or_else(|| {
685            // Auto-detect from model/provider alias config.
686            let model = std::env::var("HARN_LLM_MODEL").unwrap_or_default();
687            let provider = std::env::var("HARN_LLM_PROVIDER").unwrap_or_default();
688            crate::llm_config::default_tool_format(&model, &provider)
689        });
690    let mut llm_result = if node.kind == "verify" {
691        if let Some(command) = node
692            .verify
693            .as_ref()
694            .and_then(|verify| verify.as_object())
695            .and_then(|verify| verify.get("command"))
696            .and_then(|value| value.as_str())
697            .map(str::trim)
698            .filter(|value| !value.is_empty())
699        {
700            let mut process = if cfg!(target_os = "windows") {
701                let mut cmd = tokio::process::Command::new("cmd");
702                cmd.arg("/C").arg(command);
703                cmd
704            } else {
705                let mut cmd = tokio::process::Command::new("/bin/sh");
706                cmd.arg("-lc").arg(command);
707                cmd
708            };
709            process.stdin(std::process::Stdio::null());
710            if let Some(context) = crate::stdlib::process::current_execution_context() {
711                if let Some(cwd) = context.cwd.filter(|cwd| !cwd.is_empty()) {
712                    process.current_dir(cwd);
713                }
714                if !context.env.is_empty() {
715                    process.envs(context.env);
716                }
717            }
718            let output = process
719                .output()
720                .await
721                .map_err(|e| VmError::Runtime(format!("workflow verify exec failed: {e}")))?;
722            let stdout = String::from_utf8_lossy(&output.stdout).to_string();
723            let stderr = String::from_utf8_lossy(&output.stderr).to_string();
724            let combined = if stderr.is_empty() {
725                stdout.clone()
726            } else if stdout.is_empty() {
727                stderr.clone()
728            } else {
729                format!("{stdout}\n{stderr}")
730            };
731            serde_json::json!({
732                "status": "completed",
733                "text": combined,
734                "visible_text": combined,
735                "command": command,
736                "stdout": stdout,
737                "stderr": stderr,
738                "exit_status": output.status.code().unwrap_or(-1),
739                "success": output.status.success(),
740            })
741        } else {
742            serde_json::json!({
743                "status": "completed",
744                "text": "",
745                "visible_text": "",
746            })
747        }
748    } else {
749        let mut options = BTreeMap::new();
750        if let Some(provider) = &node.model_policy.provider {
751            options.insert(
752                "provider".to_string(),
753                VmValue::String(Rc::from(provider.clone())),
754            );
755        }
756        if let Some(model) = &node.model_policy.model {
757            options.insert(
758                "model".to_string(),
759                VmValue::String(Rc::from(model.clone())),
760            );
761        }
762        if let Some(model_tier) = &node.model_policy.model_tier {
763            options.insert(
764                "model_tier".to_string(),
765                VmValue::String(Rc::from(model_tier.clone())),
766            );
767        }
768        if let Some(temperature) = node.model_policy.temperature {
769            options.insert("temperature".to_string(), VmValue::Float(temperature));
770        }
771        if let Some(max_tokens) = node.model_policy.max_tokens {
772            options.insert("max_tokens".to_string(), VmValue::Int(max_tokens));
773        }
774        let tool_names = workflow_tool_names(&node.tools);
775        let tools_value = node.raw_tools.clone().or_else(|| {
776            if matches!(node.tools, serde_json::Value::Null) {
777                None
778            } else {
779                Some(crate::stdlib::json_to_vm_value(&node.tools))
780            }
781        });
782        if tools_value.is_some() && !tool_names.is_empty() {
783            options.insert("tools".to_string(), tools_value.unwrap_or(VmValue::Nil));
784        }
785        if let Some(transcript) = input_transcript.clone() {
786            options.insert("transcript".to_string(), transcript);
787        }
788
789        let args = vec![
790            VmValue::String(Rc::from(prompt.clone())),
791            node.system
792                .clone()
793                .map(|s| VmValue::String(Rc::from(s)))
794                .unwrap_or(VmValue::Nil),
795            VmValue::Dict(Rc::new(options)),
796        ];
797        let mut opts = extract_llm_options(&args)?;
798
799        if node.mode.as_deref() == Some("agent") || !tool_names.is_empty() {
800            let tool_policy = workflow_tool_policy_from_tools(&node.tools);
801            let effective_policy = tool_policy
802                .intersect(&node.capability_policy)
803                .map_err(VmError::Runtime)?;
804            // Build auto-compact config from transcript_policy fields
805            let auto_compact = if node.transcript_policy.auto_compact {
806                let mut ac = crate::orchestration::AutoCompactConfig::default();
807                if let Some(v) = node.transcript_policy.compact_threshold {
808                    ac.token_threshold = v;
809                }
810                if let Some(v) = node.transcript_policy.tool_output_max_chars {
811                    ac.tool_output_max_chars = v;
812                }
813                if let Some(ref strategy) = node.transcript_policy.compact_strategy {
814                    if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
815                        ac.compact_strategy = s;
816                    }
817                }
818                if let Some(v) = node.transcript_policy.hard_limit_tokens {
819                    ac.hard_limit_tokens = Some(v);
820                }
821                if let Some(ref strategy) = node.transcript_policy.hard_limit_strategy {
822                    if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
823                        ac.hard_limit_strategy = s;
824                    }
825                }
826                // Extract closure fields from raw transcript_policy dict
827                if let Some(ref raw_tp) = node.raw_transcript_policy {
828                    if let Some(dict) = raw_tp.as_dict() {
829                        if let Some(cb) = dict.get("compress_callback") {
830                            ac.compress_callback = Some(cb.clone());
831                        }
832                        if let Some(cb) = dict.get("mask_callback") {
833                            ac.mask_callback = Some(cb.clone());
834                        }
835                    }
836                }
837                // Adapt thresholds to provider context window
838                {
839                    let user_specified_threshold =
840                        node.transcript_policy.compact_threshold.is_some();
841                    let user_specified_hard_limit =
842                        node.transcript_policy.hard_limit_tokens.is_some();
843                    crate::llm::api::adapt_auto_compact_to_provider(
844                        &mut ac,
845                        user_specified_threshold,
846                        user_specified_hard_limit,
847                        &opts.provider,
848                        &opts.model,
849                        &opts.api_key,
850                    )
851                    .await;
852                }
853                Some(ac)
854            } else {
855                None
856            };
857            crate::llm::run_agent_loop_internal(
858                &mut opts,
859                crate::llm::AgentLoopConfig {
860                    persistent: true,
861                    max_iterations: node.model_policy.max_iterations.unwrap_or(16),
862                    max_nudges: node.model_policy.max_nudges.unwrap_or(3),
863                    nudge: node.model_policy.nudge.clone(),
864                    done_sentinel: node.done_sentinel.clone(),
865                    break_unless_phase: None,
866                    tool_retries: 0,
867                    tool_backoff_ms: 1000,
868                    tool_format: tool_format.clone(),
869                    auto_compact,
870                    policy: Some(effective_policy),
871                    approval_policy: Some(node.approval_policy.clone()),
872                    daemon: false,
873                    daemon_config: Default::default(),
874                    llm_retries: 2,
875                    llm_backoff_ms: 2000,
876                    exit_when_verified: node.exit_when_verified,
877                    loop_detect_warn: 2,
878                    loop_detect_block: 3,
879                    loop_detect_skip: 4,
880                    tool_examples: node.model_policy.tool_examples.clone(),
881                    turn_policy: node.model_policy.turn_policy.clone(),
882                    stop_after_successful_tools: node
883                        .model_policy
884                        .stop_after_successful_tools
885                        .clone(),
886                    require_successful_tools: node.model_policy.require_successful_tools.clone(),
887                    // Honor any caller-supplied session_id on the
888                    // model_policy so pipelines that install
889                    // agent_subscribe handlers keyed on that id actually
890                    // receive events. Falls back to a freshly minted id
891                    // when the caller hasn't specified one.
892                    session_id: node
893                        .raw_model_policy
894                        .as_ref()
895                        .and_then(|v| v.as_dict())
896                        .and_then(|d| d.get("session_id"))
897                        .and_then(|v| match v {
898                            crate::value::VmValue::String(s) if !s.trim().is_empty() => {
899                                Some(s.to_string())
900                            }
901                            _ => None,
902                        })
903                        .unwrap_or_else(|| format!("workflow_stage_{}", uuid::Uuid::now_v7())),
904                    event_sink: None,
905                    // Seed the ledger from the workflow stage's explicit
906                    // deliverables/ledger fields so the graph can carry a
907                    // task-wide plan down through map branches and nested
908                    // stages. Falls back to an empty ledger (no gate).
909                    task_ledger: node
910                        .raw_model_policy
911                        .as_ref()
912                        .and_then(|v| v.as_dict())
913                        .and_then(|d| d.get("task_ledger"))
914                        .map(crate::llm::helpers::vm_value_to_json)
915                        .and_then(|json| serde_json::from_value(json).ok())
916                        .unwrap_or_default(),
917                    post_turn_callback: node
918                        .raw_model_policy
919                        .as_ref()
920                        .and_then(|v| v.as_dict())
921                        .and_then(|d| d.get("post_turn_callback"))
922                        .filter(|v| matches!(v, crate::value::VmValue::Closure(_)))
923                        .cloned(),
924                },
925            )
926            .await?
927        } else {
928            let result = vm_call_llm_full(&opts).await?;
929            crate::llm::agent_loop_result_from_llm(&result, opts)
930        }
931    };
932    if let Some(payload) = llm_result.as_object_mut() {
933        payload.insert("prompt".to_string(), serde_json::json!(prompt));
934        payload.insert(
935            "system_prompt".to_string(),
936            serde_json::json!(node.system.clone().unwrap_or_default()),
937        );
938        payload.insert(
939            "rendered_context".to_string(),
940            serde_json::json!(rendered_context),
941        );
942        payload.insert(
943            "selected_artifact_ids".to_string(),
944            serde_json::json!(selected
945                .iter()
946                .map(|artifact| artifact.id.clone())
947                .collect::<Vec<_>>()),
948        );
949        payload.insert(
950            "selected_artifact_titles".to_string(),
951            serde_json::json!(selected
952                .iter()
953                .map(|artifact| artifact.title.clone())
954                .collect::<Vec<_>>()),
955        );
956        payload.insert(
957            "tool_calling_mode".to_string(),
958            serde_json::json!(tool_format.clone()),
959        );
960    }
961
962    let visible_text = llm_result["text"].as_str().unwrap_or_default().to_string();
963    // Non-LLM stages (verify command, condition, fork, join, ...) don't
964    // produce a "transcript" field. Fall back to the stage's input transcript
965    // so the running conversation survives cross-stage transitions.
966    let result_transcript = llm_result
967        .get("transcript")
968        .cloned()
969        .map(|value| crate::stdlib::json_to_vm_value(&value));
970    let transcript = apply_output_transcript_policy(
971        result_transcript.or(input_transcript),
972        &node.transcript_policy,
973    );
974    let output_kind = node
975        .output_contract
976        .output_kinds
977        .first()
978        .cloned()
979        .unwrap_or_else(|| {
980            if node.kind == "verify" {
981                "verification_result".to_string()
982            } else {
983                "artifact".to_string()
984            }
985        });
986    let mut metadata = BTreeMap::new();
987    metadata.insert(
988        "input_artifact_ids".to_string(),
989        serde_json::json!(selected
990            .iter()
991            .map(|artifact| artifact.id.clone())
992            .collect::<Vec<_>>()),
993    );
994    metadata.insert("node_kind".to_string(), serde_json::json!(node.kind));
995    let artifact = ArtifactRecord {
996        type_name: "artifact".to_string(),
997        id: new_id("artifact"),
998        kind: output_kind,
999        title: Some(format!("stage {node_id} output")),
1000        text: Some(visible_text),
1001        data: Some(llm_result.clone()),
1002        source: Some(node_id.to_string()),
1003        created_at: now_rfc3339(),
1004        freshness: Some("fresh".to_string()),
1005        priority: None,
1006        lineage: selected
1007            .iter()
1008            .map(|artifact| artifact.id.clone())
1009            .collect(),
1010        relevance: Some(1.0),
1011        estimated_tokens: None,
1012        stage: Some(node_id.to_string()),
1013        metadata,
1014    }
1015    .normalize();
1016
1017    Ok((llm_result, vec![artifact], transcript))
1018}
1019
1020pub fn next_nodes_for(
1021    graph: &WorkflowGraph,
1022    current: &str,
1023    branch: Option<&str>,
1024) -> Vec<WorkflowEdge> {
1025    let mut matching: Vec<WorkflowEdge> = graph
1026        .edges
1027        .iter()
1028        .filter(|edge| edge.from == current && edge.branch.as_deref() == branch)
1029        .cloned()
1030        .collect();
1031    if matching.is_empty() {
1032        matching = graph
1033            .edges
1034            .iter()
1035            .filter(|edge| edge.from == current && edge.branch.is_none())
1036            .cloned()
1037            .collect();
1038    }
1039    matching
1040}
1041
1042pub fn next_node_for(graph: &WorkflowGraph, current: &str, branch: &str) -> Option<String> {
1043    next_nodes_for(graph, current, Some(branch))
1044        .into_iter()
1045        .next()
1046        .map(|edge| edge.to)
1047}
1048
1049pub fn append_audit_entry(
1050    graph: &mut WorkflowGraph,
1051    op: &str,
1052    node_id: Option<String>,
1053    reason: Option<String>,
1054    metadata: BTreeMap<String, serde_json::Value>,
1055) {
1056    graph.audit_log.push(WorkflowAuditEntry {
1057        id: new_id("audit"),
1058        op: op.to_string(),
1059        node_id,
1060        timestamp: now_rfc3339(),
1061        reason,
1062        metadata,
1063    });
1064}