Skip to main content

harn_vm/orchestration/
workflow.rs

1//! Workflow graph types, normalization, validation, and execution.
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::rc::Rc;
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9    new_id, now_rfc3339, redact_transcript_visibility, ArtifactRecord, AutoCompactPolicy,
10    BranchSemantics, CapabilityPolicy, ContextPolicy, EscalationPolicy, JoinPolicy, MapPolicy,
11    ModelPolicy, ReducePolicy, RetryPolicy, StageContract,
12};
13use crate::llm::{extract_llm_options, vm_call_llm_full, vm_value_to_json};
14use crate::tool_annotations::{SideEffectLevel, ToolAnnotations, ToolArgSchema, ToolKind};
15use crate::value::{VmError, VmValue};
16
17#[derive(Clone, Debug, Default, Serialize, Deserialize)]
18#[serde(default)]
19pub struct WorkflowNode {
20    pub id: Option<String>,
21    pub kind: String,
22    pub mode: Option<String>,
23    pub prompt: Option<String>,
24    pub system: Option<String>,
25    pub task_label: Option<String>,
26    pub done_sentinel: Option<String>,
27    pub tools: serde_json::Value,
28    pub model_policy: ModelPolicy,
29    /// Per-stage auto-compaction settings for the agent loop's context
30    /// window. Lifecycle operations (reset, fork, trim, compact) are NOT
31    /// expressible here — call the `agent_session_*` builtins before the
32    /// stage or in a prior stage.
33    pub auto_compact: AutoCompactPolicy,
34    /// Output visibility filter applied to the transcript after the
35    /// stage's agent loop exits. `"public"` / `"public_only"` drops
36    /// `tool_result` messages and non-public events. `None` or any
37    /// unknown string is a no-op.
38    #[serde(default)]
39    pub output_visibility: Option<String>,
40    pub context_policy: ContextPolicy,
41    pub retry_policy: RetryPolicy,
42    pub capability_policy: CapabilityPolicy,
43    pub approval_policy: super::ToolApprovalPolicy,
44    pub input_contract: StageContract,
45    pub output_contract: StageContract,
46    pub branch_semantics: BranchSemantics,
47    pub map_policy: MapPolicy,
48    pub join_policy: JoinPolicy,
49    pub reduce_policy: ReducePolicy,
50    pub escalation_policy: EscalationPolicy,
51    pub verify: Option<serde_json::Value>,
52    /// When true, the stage's agent loop gates the done sentinel on the most
53    /// recent `run()` tool call exiting cleanly (`exit_code == 0`). Use for
54    /// persistent execute stages that fold verification into the loop via a
55    /// shell-exec tool the model invokes explicitly.
56    #[serde(default)]
57    pub exit_when_verified: bool,
58    pub metadata: BTreeMap<String, serde_json::Value>,
59    #[serde(skip)]
60    pub raw_tools: Option<VmValue>,
61    /// Raw auto_compact VmValue dict — preserved for extracting closure
62    /// fields (compress_callback, mask_callback, custom_compactor) that
63    /// can't go through serde.
64    #[serde(skip)]
65    pub raw_auto_compact: Option<VmValue>,
66    /// Raw model_policy VmValue dict — preserved for extracting closure
67    /// fields (post_turn_callback) that can't go through serde.
68    #[serde(skip)]
69    pub raw_model_policy: Option<VmValue>,
70}
71
72impl PartialEq for WorkflowNode {
73    fn eq(&self, other: &Self) -> bool {
74        serde_json::to_value(self).ok() == serde_json::to_value(other).ok()
75    }
76}
77
78pub fn workflow_tool_names(value: &serde_json::Value) -> Vec<String> {
79    match value {
80        serde_json::Value::Null => Vec::new(),
81        serde_json::Value::Array(items) => items
82            .iter()
83            .filter_map(|item| match item {
84                serde_json::Value::Object(map) => map
85                    .get("name")
86                    .and_then(|value| value.as_str())
87                    .filter(|name| !name.is_empty())
88                    .map(|name| name.to_string()),
89                _ => None,
90            })
91            .collect(),
92        serde_json::Value::Object(map) => {
93            if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
94                return map
95                    .get("tools")
96                    .map(workflow_tool_names)
97                    .unwrap_or_default();
98            }
99            map.get("name")
100                .and_then(|value| value.as_str())
101                .filter(|name| !name.is_empty())
102                .map(|name| vec![name.to_string()])
103                .unwrap_or_default()
104        }
105        _ => Vec::new(),
106    }
107}
108
109fn max_side_effect_level(levels: impl Iterator<Item = String>) -> Option<String> {
110    fn rank(v: &str) -> usize {
111        match v {
112            "none" => 0,
113            "read_only" => 1,
114            "workspace_write" => 2,
115            "process_exec" => 3,
116            "network" => 4,
117            _ => 5,
118        }
119    }
120    levels.max_by_key(|level| rank(level))
121}
122
123fn parse_tool_kind(value: Option<&serde_json::Value>) -> ToolKind {
124    match value.and_then(|v| v.as_str()).unwrap_or("") {
125        "read" => ToolKind::Read,
126        "edit" => ToolKind::Edit,
127        "delete" => ToolKind::Delete,
128        "move" => ToolKind::Move,
129        "search" => ToolKind::Search,
130        "execute" => ToolKind::Execute,
131        "think" => ToolKind::Think,
132        "fetch" => ToolKind::Fetch,
133        _ => ToolKind::Other,
134    }
135}
136
137fn parse_tool_annotations(map: &serde_json::Map<String, serde_json::Value>) -> ToolAnnotations {
138    let policy = map
139        .get("policy")
140        .and_then(|value| value.as_object())
141        .cloned()
142        .unwrap_or_default();
143
144    let capabilities = policy
145        .get("capabilities")
146        .and_then(|value| value.as_object())
147        .map(|caps| {
148            caps.iter()
149                .map(|(capability, ops)| {
150                    let values = ops
151                        .as_array()
152                        .map(|items| {
153                            items
154                                .iter()
155                                .filter_map(|item| item.as_str().map(|s| s.to_string()))
156                                .collect::<Vec<_>>()
157                        })
158                        .unwrap_or_default();
159                    (capability.clone(), values)
160                })
161                .collect::<BTreeMap<_, _>>()
162        })
163        .unwrap_or_default();
164
165    // Accept both the structured `policy.arg_schema` object and the legacy
166    // flat fields on `policy` so pipelines can migrate gradually.
167    let arg_schema = if let Some(schema) = policy.get("arg_schema") {
168        serde_json::from_value::<ToolArgSchema>(schema.clone()).unwrap_or_default()
169    } else {
170        ToolArgSchema {
171            path_params: policy
172                .get("path_params")
173                .and_then(|value| value.as_array())
174                .map(|items| {
175                    items
176                        .iter()
177                        .filter_map(|item| item.as_str().map(|s| s.to_string()))
178                        .collect::<Vec<_>>()
179                })
180                .unwrap_or_default(),
181            arg_aliases: policy
182                .get("arg_aliases")
183                .and_then(|value| value.as_object())
184                .map(|aliases| {
185                    aliases
186                        .iter()
187                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
188                        .collect::<BTreeMap<_, _>>()
189                })
190                .unwrap_or_default(),
191            required: policy
192                .get("required")
193                .and_then(|value| value.as_array())
194                .map(|items| {
195                    items
196                        .iter()
197                        .filter_map(|item| item.as_str().map(|s| s.to_string()))
198                        .collect::<Vec<_>>()
199                })
200                .unwrap_or_default(),
201        }
202    };
203
204    let kind = parse_tool_kind(policy.get("kind"));
205    let side_effect_level = policy
206        .get("side_effect_level")
207        .and_then(|value| value.as_str())
208        .map(SideEffectLevel::parse)
209        .unwrap_or_default();
210
211    ToolAnnotations {
212        kind,
213        side_effect_level,
214        arg_schema,
215        capabilities,
216    }
217}
218
219pub fn workflow_tool_annotations(value: &serde_json::Value) -> BTreeMap<String, ToolAnnotations> {
220    match value {
221        serde_json::Value::Null => BTreeMap::new(),
222        serde_json::Value::Array(items) => items
223            .iter()
224            .filter_map(|item| match item {
225                serde_json::Value::Object(map) => map
226                    .get("name")
227                    .and_then(|value| value.as_str())
228                    .filter(|name| !name.is_empty())
229                    .map(|name| (name.to_string(), parse_tool_annotations(map))),
230                _ => None,
231            })
232            .collect(),
233        serde_json::Value::Object(map) => {
234            if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
235                return map
236                    .get("tools")
237                    .map(workflow_tool_annotations)
238                    .unwrap_or_default();
239            }
240            map.get("name")
241                .and_then(|value| value.as_str())
242                .filter(|name| !name.is_empty())
243                .map(|name| {
244                    let mut annotations = BTreeMap::new();
245                    annotations.insert(name.to_string(), parse_tool_annotations(map));
246                    annotations
247                })
248                .unwrap_or_default()
249        }
250        _ => BTreeMap::new(),
251    }
252}
253
254pub fn workflow_tool_policy_from_tools(value: &serde_json::Value) -> CapabilityPolicy {
255    let tools = workflow_tool_names(value);
256    let tool_annotations = workflow_tool_annotations(value);
257    let mut capabilities: BTreeMap<String, Vec<String>> = BTreeMap::new();
258    for annotations in tool_annotations.values() {
259        for (capability, ops) in &annotations.capabilities {
260            let entry = capabilities.entry(capability.clone()).or_default();
261            for op in ops {
262                if !entry.contains(op) {
263                    entry.push(op.clone());
264                }
265            }
266            entry.sort();
267        }
268    }
269    let side_effect_level = max_side_effect_level(
270        tool_annotations
271            .values()
272            .map(|annotations| annotations.side_effect_level.as_str().to_string())
273            .filter(|level| level != "none"),
274    );
275    CapabilityPolicy {
276        tools,
277        capabilities,
278        workspace_roots: Vec::new(),
279        side_effect_level,
280        recursion_limit: None,
281        tool_arg_constraints: Vec::new(),
282        tool_annotations,
283    }
284}
285
286#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
287#[serde(default)]
288pub struct WorkflowEdge {
289    pub from: String,
290    pub to: String,
291    pub branch: Option<String>,
292    pub label: Option<String>,
293}
294
295#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
296#[serde(default)]
297pub struct WorkflowGraph {
298    #[serde(rename = "_type")]
299    pub type_name: String,
300    pub id: String,
301    pub name: Option<String>,
302    pub version: usize,
303    pub entry: String,
304    pub nodes: BTreeMap<String, WorkflowNode>,
305    pub edges: Vec<WorkflowEdge>,
306    pub capability_policy: CapabilityPolicy,
307    pub approval_policy: super::ToolApprovalPolicy,
308    pub metadata: BTreeMap<String, serde_json::Value>,
309    pub audit_log: Vec<WorkflowAuditEntry>,
310}
311
312#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
313#[serde(default)]
314pub struct WorkflowAuditEntry {
315    pub id: String,
316    pub op: String,
317    pub node_id: Option<String>,
318    pub timestamp: String,
319    pub reason: Option<String>,
320    pub metadata: BTreeMap<String, serde_json::Value>,
321}
322
323#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
324#[serde(default)]
325pub struct WorkflowValidationReport {
326    pub valid: bool,
327    pub errors: Vec<String>,
328    pub warnings: Vec<String>,
329    pub reachable_nodes: Vec<String>,
330}
331
332pub fn parse_workflow_node_value(value: &VmValue, label: &str) -> Result<WorkflowNode, VmError> {
333    let mut node: WorkflowNode = super::parse_json_payload(vm_value_to_json(value), label)?;
334    let dict = value.as_dict();
335    node.raw_tools = dict.and_then(|d| d.get("tools")).cloned();
336    node.raw_auto_compact = dict.and_then(|d| d.get("auto_compact")).cloned();
337    node.raw_model_policy = dict.and_then(|d| d.get("model_policy")).cloned();
338    Ok(node)
339}
340
341pub fn parse_workflow_node_json(
342    json: serde_json::Value,
343    label: &str,
344) -> Result<WorkflowNode, VmError> {
345    super::parse_json_payload(json, label)
346}
347
348pub fn parse_workflow_edge_json(
349    json: serde_json::Value,
350    label: &str,
351) -> Result<WorkflowEdge, VmError> {
352    super::parse_json_payload(json, label)
353}
354
355pub fn normalize_workflow_value(value: &VmValue) -> Result<WorkflowGraph, VmError> {
356    let mut graph: WorkflowGraph = super::parse_json_value(value)?;
357    let as_dict = value.as_dict().cloned().unwrap_or_default();
358
359    if graph.nodes.is_empty() {
360        for key in ["act", "verify", "repair"] {
361            if let Some(node_value) = as_dict.get(key) {
362                let mut node = parse_workflow_node_value(node_value, "orchestration")?;
363                let raw_node = node_value.as_dict().cloned().unwrap_or_default();
364                node.id = Some(key.to_string());
365                if node.kind.is_empty() {
366                    node.kind = if key == "verify" {
367                        "verify".to_string()
368                    } else {
369                        "stage".to_string()
370                    };
371                }
372                if node.model_policy.provider.is_none() {
373                    node.model_policy.provider = as_dict
374                        .get("provider")
375                        .map(|value| value.display())
376                        .filter(|value| !value.is_empty());
377                }
378                if node.model_policy.model.is_none() {
379                    node.model_policy.model = as_dict
380                        .get("model")
381                        .map(|value| value.display())
382                        .filter(|value| !value.is_empty());
383                }
384                if node.model_policy.model_tier.is_none() {
385                    node.model_policy.model_tier = as_dict
386                        .get("model_tier")
387                        .or_else(|| as_dict.get("tier"))
388                        .map(|value| value.display())
389                        .filter(|value| !value.is_empty());
390                }
391                if node.model_policy.temperature.is_none() {
392                    node.model_policy.temperature = as_dict.get("temperature").and_then(|value| {
393                        if let VmValue::Float(number) = value {
394                            Some(*number)
395                        } else {
396                            value.as_int().map(|number| number as f64)
397                        }
398                    });
399                }
400                if node.model_policy.max_tokens.is_none() {
401                    node.model_policy.max_tokens =
402                        as_dict.get("max_tokens").and_then(|value| value.as_int());
403                }
404                if node.mode.is_none() {
405                    node.mode = as_dict
406                        .get("mode")
407                        .map(|value| value.display())
408                        .filter(|value| !value.is_empty());
409                }
410                if node.done_sentinel.is_none() {
411                    node.done_sentinel = as_dict
412                        .get("done_sentinel")
413                        .map(|value| value.display())
414                        .filter(|value| !value.is_empty());
415                }
416                if key == "verify"
417                    && node.verify.is_none()
418                    && (raw_node.contains_key("assert_text")
419                        || raw_node.contains_key("command")
420                        || raw_node.contains_key("expect_status")
421                        || raw_node.contains_key("expect_text"))
422                {
423                    node.verify = Some(serde_json::json!({
424                        "assert_text": raw_node.get("assert_text").map(vm_value_to_json),
425                        "command": raw_node.get("command").map(vm_value_to_json),
426                        "expect_status": raw_node.get("expect_status").map(vm_value_to_json),
427                        "expect_text": raw_node.get("expect_text").map(vm_value_to_json),
428                    }));
429                }
430                graph.nodes.insert(key.to_string(), node);
431            }
432        }
433        if graph.entry.is_empty() && graph.nodes.contains_key("act") {
434            graph.entry = "act".to_string();
435        }
436        if graph.edges.is_empty() && graph.nodes.contains_key("act") {
437            if graph.nodes.contains_key("verify") {
438                graph.edges.push(WorkflowEdge {
439                    from: "act".to_string(),
440                    to: "verify".to_string(),
441                    branch: None,
442                    label: None,
443                });
444            }
445            if graph.nodes.contains_key("repair") {
446                graph.edges.push(WorkflowEdge {
447                    from: "verify".to_string(),
448                    to: "repair".to_string(),
449                    branch: Some("failed".to_string()),
450                    label: None,
451                });
452                graph.edges.push(WorkflowEdge {
453                    from: "repair".to_string(),
454                    to: "verify".to_string(),
455                    branch: Some("retry".to_string()),
456                    label: None,
457                });
458            }
459        }
460    }
461
462    if graph.type_name.is_empty() {
463        graph.type_name = "workflow_graph".to_string();
464    }
465    if graph.id.is_empty() {
466        graph.id = new_id("workflow");
467    }
468    if graph.version == 0 {
469        graph.version = 1;
470    }
471    if graph.entry.is_empty() {
472        graph.entry = graph
473            .nodes
474            .keys()
475            .next()
476            .cloned()
477            .unwrap_or_else(|| "act".to_string());
478    }
479    for (node_id, node) in &mut graph.nodes {
480        if node.raw_tools.is_none() {
481            node.raw_tools = as_dict
482                .get("nodes")
483                .and_then(|nodes| nodes.as_dict())
484                .and_then(|nodes| nodes.get(node_id))
485                .and_then(|node_value| node_value.as_dict())
486                .and_then(|raw_node| raw_node.get("tools"))
487                .cloned();
488        }
489        if node.id.is_none() {
490            node.id = Some(node_id.clone());
491        }
492        if node.kind.is_empty() {
493            node.kind = "stage".to_string();
494        }
495        if node.join_policy.strategy.is_empty() {
496            node.join_policy.strategy = "all".to_string();
497        }
498        if node.reduce_policy.strategy.is_empty() {
499            node.reduce_policy.strategy = "concat".to_string();
500        }
501        if node.output_contract.output_kinds.is_empty() {
502            node.output_contract.output_kinds = vec![match node.kind.as_str() {
503                "verify" => "verification_result".to_string(),
504                "reduce" => node
505                    .reduce_policy
506                    .output_kind
507                    .clone()
508                    .unwrap_or_else(|| "summary".to_string()),
509                "map" => node
510                    .map_policy
511                    .output_kind
512                    .clone()
513                    .unwrap_or_else(|| "artifact".to_string()),
514                "escalation" => "plan".to_string(),
515                _ => "artifact".to_string(),
516            }];
517        }
518        if node.retry_policy.max_attempts == 0 {
519            node.retry_policy.max_attempts = 1;
520        }
521    }
522    Ok(graph)
523}
524
525pub fn validate_workflow(
526    graph: &WorkflowGraph,
527    ceiling: Option<&CapabilityPolicy>,
528) -> WorkflowValidationReport {
529    let mut errors = Vec::new();
530    let mut warnings = Vec::new();
531
532    if !graph.nodes.contains_key(&graph.entry) {
533        errors.push(format!("entry node does not exist: {}", graph.entry));
534    }
535
536    let node_ids: BTreeSet<String> = graph.nodes.keys().cloned().collect();
537    for edge in &graph.edges {
538        if !node_ids.contains(&edge.from) {
539            errors.push(format!("edge.from references unknown node: {}", edge.from));
540        }
541        if !node_ids.contains(&edge.to) {
542            errors.push(format!("edge.to references unknown node: {}", edge.to));
543        }
544    }
545
546    let reachable_nodes = reachable_nodes(graph);
547    for node_id in &node_ids {
548        if !reachable_nodes.contains(node_id) {
549            warnings.push(format!("node is unreachable: {node_id}"));
550        }
551    }
552
553    for (node_id, node) in &graph.nodes {
554        let incoming = graph
555            .edges
556            .iter()
557            .filter(|edge| edge.to == *node_id)
558            .count();
559        let outgoing: Vec<&WorkflowEdge> = graph
560            .edges
561            .iter()
562            .filter(|edge| edge.from == *node_id)
563            .collect();
564        if let Some(min_inputs) = node.input_contract.min_inputs {
565            if let Some(max_inputs) = node.input_contract.max_inputs {
566                if min_inputs > max_inputs {
567                    errors.push(format!(
568                        "node {node_id}: input contract min_inputs exceeds max_inputs"
569                    ));
570                }
571            }
572        }
573        match node.kind.as_str() {
574            "condition" => {
575                let has_true = outgoing
576                    .iter()
577                    .any(|edge| edge.branch.as_deref() == Some("true"));
578                let has_false = outgoing
579                    .iter()
580                    .any(|edge| edge.branch.as_deref() == Some("false"));
581                if !has_true || !has_false {
582                    errors.push(format!(
583                        "node {node_id}: condition nodes require both 'true' and 'false' branch edges"
584                    ));
585                }
586            }
587            "fork" if outgoing.len() < 2 => {
588                errors.push(format!(
589                    "node {node_id}: fork nodes require at least two outgoing edges"
590                ));
591            }
592            "join" if incoming < 2 => {
593                warnings.push(format!(
594                    "node {node_id}: join node has fewer than two incoming edges"
595                ));
596            }
597            "map"
598                if node.map_policy.items.is_empty()
599                    && node.map_policy.item_artifact_kind.is_none()
600                    && node.input_contract.input_kinds.is_empty() =>
601            {
602                errors.push(format!(
603                    "node {node_id}: map nodes require items, item_artifact_kind, or input_contract.input_kinds"
604                ));
605            }
606            "reduce" if node.input_contract.input_kinds.is_empty() => {
607                warnings.push(format!(
608                    "node {node_id}: reduce node has no input_contract.input_kinds; it will consume all available artifacts"
609                ));
610            }
611            _ => {}
612        }
613    }
614
615    if let Some(ceiling) = ceiling {
616        if let Err(error) = ceiling.intersect(&graph.capability_policy) {
617            errors.push(error);
618        }
619        for (node_id, node) in &graph.nodes {
620            if let Err(error) = ceiling.intersect(&node.capability_policy) {
621                errors.push(format!("node {node_id}: {error}"));
622            }
623        }
624    }
625
626    WorkflowValidationReport {
627        valid: errors.is_empty(),
628        errors,
629        warnings,
630        reachable_nodes: reachable_nodes.into_iter().collect(),
631    }
632}
633
634fn reachable_nodes(graph: &WorkflowGraph) -> BTreeSet<String> {
635    let mut seen = BTreeSet::new();
636    let mut stack = vec![graph.entry.clone()];
637    while let Some(node_id) = stack.pop() {
638        if !seen.insert(node_id.clone()) {
639            continue;
640        }
641        for edge in graph.edges.iter().filter(|edge| edge.from == node_id) {
642            stack.push(edge.to.clone());
643        }
644    }
645    seen
646}
647
648/// Pick the session id a stage should run under. Prefers an explicit
649/// `session_id` on the node's `model_policy` dict (so pipelines with
650/// `agent_session_open` / `agent_session_fork` flowing through a graph
651/// line up); falls back to a stable, node-derived id so multi-stage
652/// graphs with no explicit session share a conversation across stages.
653fn resolve_node_session_id(node: &WorkflowNode) -> String {
654    if let Some(explicit) = node
655        .raw_model_policy
656        .as_ref()
657        .and_then(|v| v.as_dict())
658        .and_then(|d| d.get("session_id"))
659        .and_then(|v| match v {
660            VmValue::String(s) if !s.trim().is_empty() => Some(s.to_string()),
661            _ => None,
662        })
663    {
664        return explicit;
665    }
666    format!("workflow_stage_{}", uuid::Uuid::now_v7())
667}
668
669pub async fn execute_stage_node(
670    node_id: &str,
671    node: &WorkflowNode,
672    task: &str,
673    artifacts: &[ArtifactRecord],
674) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
675    let mut selection_policy = node.context_policy.clone();
676    if selection_policy.include_kinds.is_empty() && !node.input_contract.input_kinds.is_empty() {
677        selection_policy.include_kinds = node.input_contract.input_kinds.clone();
678    }
679    let selected = super::select_artifacts_adaptive(artifacts.to_vec(), &selection_policy);
680    let rendered_context = super::render_artifacts_context(&selected, &node.context_policy);
681    let stage_session_id = resolve_node_session_id(node);
682    if node.input_contract.require_transcript && !crate::agent_sessions::exists(&stage_session_id) {
683        return Err(VmError::Runtime(format!(
684            "workflow stage {node_id} requires an existing session \
685             (call agent_session_open and feed session_id through model_policy \
686             before entering this stage)"
687        )));
688    }
689    if let Some(min_inputs) = node.input_contract.min_inputs {
690        if selected.len() < min_inputs {
691            return Err(VmError::Runtime(format!(
692                "workflow stage {node_id} requires at least {min_inputs} input artifacts"
693            )));
694        }
695    }
696    if let Some(max_inputs) = node.input_contract.max_inputs {
697        if selected.len() > max_inputs {
698            return Err(VmError::Runtime(format!(
699                "workflow stage {node_id} accepts at most {max_inputs} input artifacts"
700            )));
701        }
702    }
703    let prompt = super::render_workflow_prompt(task, node.task_label.as_deref(), &rendered_context);
704
705    let tool_format = std::env::var("HARN_AGENT_TOOL_FORMAT")
706        .ok()
707        .filter(|value| !value.trim().is_empty())
708        .unwrap_or_else(|| {
709            let model = std::env::var("HARN_LLM_MODEL").unwrap_or_default();
710            let provider = std::env::var("HARN_LLM_PROVIDER").unwrap_or_default();
711            crate::llm_config::default_tool_format(&model, &provider)
712        });
713    let mut llm_result = if node.kind == "verify" {
714        if let Some(command) = node
715            .verify
716            .as_ref()
717            .and_then(|verify| verify.as_object())
718            .and_then(|verify| verify.get("command"))
719            .and_then(|value| value.as_str())
720            .map(str::trim)
721            .filter(|value| !value.is_empty())
722        {
723            let mut process = if cfg!(target_os = "windows") {
724                let mut cmd = tokio::process::Command::new("cmd");
725                cmd.arg("/C").arg(command);
726                cmd
727            } else {
728                let mut cmd = tokio::process::Command::new("/bin/sh");
729                cmd.arg("-lc").arg(command);
730                cmd
731            };
732            process.stdin(std::process::Stdio::null());
733            if let Some(context) = crate::stdlib::process::current_execution_context() {
734                if let Some(cwd) = context.cwd.filter(|cwd| !cwd.is_empty()) {
735                    process.current_dir(cwd);
736                }
737                if !context.env.is_empty() {
738                    process.envs(context.env);
739                }
740            }
741            let output = process
742                .output()
743                .await
744                .map_err(|e| VmError::Runtime(format!("workflow verify exec failed: {e}")))?;
745            let stdout = String::from_utf8_lossy(&output.stdout).to_string();
746            let stderr = String::from_utf8_lossy(&output.stderr).to_string();
747            let combined = if stderr.is_empty() {
748                stdout.clone()
749            } else if stdout.is_empty() {
750                stderr.clone()
751            } else {
752                format!("{stdout}\n{stderr}")
753            };
754            serde_json::json!({
755                "status": "completed",
756                "text": combined,
757                "visible_text": combined,
758                "command": command,
759                "stdout": stdout,
760                "stderr": stderr,
761                "exit_status": output.status.code().unwrap_or(-1),
762                "success": output.status.success(),
763            })
764        } else {
765            serde_json::json!({
766                "status": "completed",
767                "text": "",
768                "visible_text": "",
769            })
770        }
771    } else {
772        let mut options = BTreeMap::new();
773        if let Some(provider) = &node.model_policy.provider {
774            options.insert(
775                "provider".to_string(),
776                VmValue::String(Rc::from(provider.clone())),
777            );
778        }
779        if let Some(model) = &node.model_policy.model {
780            options.insert(
781                "model".to_string(),
782                VmValue::String(Rc::from(model.clone())),
783            );
784        }
785        if let Some(model_tier) = &node.model_policy.model_tier {
786            options.insert(
787                "model_tier".to_string(),
788                VmValue::String(Rc::from(model_tier.clone())),
789            );
790        }
791        if let Some(temperature) = node.model_policy.temperature {
792            options.insert("temperature".to_string(), VmValue::Float(temperature));
793        }
794        if let Some(max_tokens) = node.model_policy.max_tokens {
795            options.insert("max_tokens".to_string(), VmValue::Int(max_tokens));
796        }
797        let tool_names = workflow_tool_names(&node.tools);
798        let tools_value = node.raw_tools.clone().or_else(|| {
799            if matches!(node.tools, serde_json::Value::Null) {
800                None
801            } else {
802                Some(crate::stdlib::json_to_vm_value(&node.tools))
803            }
804        });
805        if tools_value.is_some() && !tool_names.is_empty() {
806            options.insert("tools".to_string(), tools_value.unwrap_or(VmValue::Nil));
807        }
808        options.insert(
809            "session_id".to_string(),
810            VmValue::String(Rc::from(stage_session_id.clone())),
811        );
812
813        let args = vec![
814            VmValue::String(Rc::from(prompt.clone())),
815            node.system
816                .clone()
817                .map(|s| VmValue::String(Rc::from(s)))
818                .unwrap_or(VmValue::Nil),
819            VmValue::Dict(Rc::new(options)),
820        ];
821        let mut opts = extract_llm_options(&args)?;
822
823        if node.mode.as_deref() == Some("agent") || !tool_names.is_empty() {
824            let tool_policy = workflow_tool_policy_from_tools(&node.tools);
825            let effective_policy = tool_policy
826                .intersect(&node.capability_policy)
827                .map_err(VmError::Runtime)?;
828            let auto_compact = if node.auto_compact.enabled {
829                let mut ac = crate::orchestration::AutoCompactConfig::default();
830                if let Some(v) = node.auto_compact.token_threshold {
831                    ac.token_threshold = v;
832                }
833                if let Some(v) = node.auto_compact.tool_output_max_chars {
834                    ac.tool_output_max_chars = v;
835                }
836                if let Some(ref strategy) = node.auto_compact.compact_strategy {
837                    if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
838                        ac.compact_strategy = s;
839                    }
840                }
841                if let Some(v) = node.auto_compact.hard_limit_tokens {
842                    ac.hard_limit_tokens = Some(v);
843                }
844                if let Some(ref strategy) = node.auto_compact.hard_limit_strategy {
845                    if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
846                        ac.hard_limit_strategy = s;
847                    }
848                }
849                // Closure fields can't round-trip through serde, so extract them
850                // directly from the raw VmValue dict.
851                if let Some(ref raw_ac) = node.raw_auto_compact {
852                    if let Some(dict) = raw_ac.as_dict() {
853                        if let Some(cb) = dict.get("compress_callback") {
854                            ac.compress_callback = Some(cb.clone());
855                        }
856                        if let Some(cb) = dict.get("mask_callback") {
857                            ac.mask_callback = Some(cb.clone());
858                        }
859                        if let Some(cb) = dict.get("custom_compactor") {
860                            ac.custom_compactor = Some(cb.clone());
861                        }
862                    }
863                }
864                {
865                    let user_specified_threshold = node.auto_compact.token_threshold.is_some();
866                    let user_specified_hard_limit = node.auto_compact.hard_limit_tokens.is_some();
867                    crate::llm::api::adapt_auto_compact_to_provider(
868                        &mut ac,
869                        user_specified_threshold,
870                        user_specified_hard_limit,
871                        &opts.provider,
872                        &opts.model,
873                        &opts.api_key,
874                    )
875                    .await;
876                }
877                Some(ac)
878            } else {
879                None
880            };
881            crate::llm::run_agent_loop_internal(
882                &mut opts,
883                crate::llm::AgentLoopConfig {
884                    persistent: true,
885                    max_iterations: node.model_policy.max_iterations.unwrap_or(16),
886                    max_nudges: node.model_policy.max_nudges.unwrap_or(3),
887                    nudge: node.model_policy.nudge.clone(),
888                    done_sentinel: node.done_sentinel.clone(),
889                    break_unless_phase: None,
890                    tool_retries: 0,
891                    tool_backoff_ms: 1000,
892                    tool_format: tool_format.clone(),
893                    auto_compact,
894                    policy: Some(effective_policy),
895                    approval_policy: Some(node.approval_policy.clone()),
896                    daemon: false,
897                    daemon_config: Default::default(),
898                    llm_retries: 2,
899                    llm_backoff_ms: 2000,
900                    exit_when_verified: node.exit_when_verified,
901                    loop_detect_warn: 2,
902                    loop_detect_block: 3,
903                    loop_detect_skip: 4,
904                    tool_examples: node.model_policy.tool_examples.clone(),
905                    turn_policy: node.model_policy.turn_policy.clone(),
906                    stop_after_successful_tools: node
907                        .model_policy
908                        .stop_after_successful_tools
909                        .clone(),
910                    require_successful_tools: node.model_policy.require_successful_tools.clone(),
911                    // Use the same session id resolved for the stage so
912                    // agent_subscribe handlers keyed on it, and session
913                    // storage lookups in the agent loop, stay consistent.
914                    session_id: stage_session_id.clone(),
915                    event_sink: None,
916                    // Seed from the stage's explicit deliverables/ledger so the
917                    // graph carries a task-wide plan through map branches and
918                    // nested stages. Empty ledger means no gate.
919                    task_ledger: node
920                        .raw_model_policy
921                        .as_ref()
922                        .and_then(|v| v.as_dict())
923                        .and_then(|d| d.get("task_ledger"))
924                        .map(crate::llm::helpers::vm_value_to_json)
925                        .and_then(|json| serde_json::from_value(json).ok())
926                        .unwrap_or_default(),
927                    post_turn_callback: node
928                        .raw_model_policy
929                        .as_ref()
930                        .and_then(|v| v.as_dict())
931                        .and_then(|d| d.get("post_turn_callback"))
932                        .filter(|v| matches!(v, crate::value::VmValue::Closure(_)))
933                        .cloned(),
934                    skill_registry: None,
935                    skill_match: Default::default(),
936                    working_files: Vec::new(),
937                },
938            )
939            .await?
940        } else {
941            let result = vm_call_llm_full(&opts).await?;
942            crate::llm::agent_loop_result_from_llm(&result, opts)
943        }
944    };
945    if let Some(payload) = llm_result.as_object_mut() {
946        payload.insert("prompt".to_string(), serde_json::json!(prompt));
947        payload.insert(
948            "system_prompt".to_string(),
949            serde_json::json!(node.system.clone().unwrap_or_default()),
950        );
951        payload.insert(
952            "rendered_context".to_string(),
953            serde_json::json!(rendered_context),
954        );
955        payload.insert(
956            "selected_artifact_ids".to_string(),
957            serde_json::json!(selected
958                .iter()
959                .map(|artifact| artifact.id.clone())
960                .collect::<Vec<_>>()),
961        );
962        payload.insert(
963            "selected_artifact_titles".to_string(),
964            serde_json::json!(selected
965                .iter()
966                .map(|artifact| artifact.title.clone())
967                .collect::<Vec<_>>()),
968        );
969        payload.insert(
970            "tool_calling_mode".to_string(),
971            serde_json::json!(tool_format.clone()),
972        );
973    }
974
975    let visible_text = llm_result["text"].as_str().unwrap_or_default().to_string();
976    // Non-LLM stages (verify command, condition, fork, join, ...) don't produce
977    // a "transcript" field; fall back to the input so cross-stage conversation
978    // state survives transitions.
979    let result_transcript = llm_result
980        .get("transcript")
981        .cloned()
982        .map(|value| crate::stdlib::json_to_vm_value(&value));
983    let session_transcript = crate::agent_sessions::snapshot(&stage_session_id);
984    let transcript = result_transcript
985        .or(session_transcript)
986        .and_then(|value| redact_transcript_visibility(&value, node.output_visibility.as_deref()));
987    let output_kind = node
988        .output_contract
989        .output_kinds
990        .first()
991        .cloned()
992        .unwrap_or_else(|| {
993            if node.kind == "verify" {
994                "verification_result".to_string()
995            } else {
996                "artifact".to_string()
997            }
998        });
999    let mut metadata = BTreeMap::new();
1000    metadata.insert(
1001        "input_artifact_ids".to_string(),
1002        serde_json::json!(selected
1003            .iter()
1004            .map(|artifact| artifact.id.clone())
1005            .collect::<Vec<_>>()),
1006    );
1007    metadata.insert("node_kind".to_string(), serde_json::json!(node.kind));
1008    let artifact = ArtifactRecord {
1009        type_name: "artifact".to_string(),
1010        id: new_id("artifact"),
1011        kind: output_kind,
1012        title: Some(format!("stage {node_id} output")),
1013        text: Some(visible_text),
1014        data: Some(llm_result.clone()),
1015        source: Some(node_id.to_string()),
1016        created_at: now_rfc3339(),
1017        freshness: Some("fresh".to_string()),
1018        priority: None,
1019        lineage: selected
1020            .iter()
1021            .map(|artifact| artifact.id.clone())
1022            .collect(),
1023        relevance: Some(1.0),
1024        estimated_tokens: None,
1025        stage: Some(node_id.to_string()),
1026        metadata,
1027    }
1028    .normalize();
1029
1030    Ok((llm_result, vec![artifact], transcript))
1031}
1032
1033pub fn next_nodes_for(
1034    graph: &WorkflowGraph,
1035    current: &str,
1036    branch: Option<&str>,
1037) -> Vec<WorkflowEdge> {
1038    let mut matching: Vec<WorkflowEdge> = graph
1039        .edges
1040        .iter()
1041        .filter(|edge| edge.from == current && edge.branch.as_deref() == branch)
1042        .cloned()
1043        .collect();
1044    if matching.is_empty() {
1045        matching = graph
1046            .edges
1047            .iter()
1048            .filter(|edge| edge.from == current && edge.branch.is_none())
1049            .cloned()
1050            .collect();
1051    }
1052    matching
1053}
1054
1055pub fn next_node_for(graph: &WorkflowGraph, current: &str, branch: &str) -> Option<String> {
1056    next_nodes_for(graph, current, Some(branch))
1057        .into_iter()
1058        .next()
1059        .map(|edge| edge.to)
1060}
1061
1062pub fn append_audit_entry(
1063    graph: &mut WorkflowGraph,
1064    op: &str,
1065    node_id: Option<String>,
1066    reason: Option<String>,
1067    metadata: BTreeMap<String, serde_json::Value>,
1068) {
1069    graph.audit_log.push(WorkflowAuditEntry {
1070        id: new_id("audit"),
1071        op: op.to_string(),
1072        node_id,
1073        timestamp: now_rfc3339(),
1074        reason,
1075        metadata,
1076    });
1077}