Skip to main content

harn_vm/orchestration/
workflow.rs

1//! Workflow graph types, normalization, validation, and execution.
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::rc::Rc;
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9    new_id, now_rfc3339, redact_transcript_visibility, ArtifactRecord, AutoCompactPolicy,
10    BranchSemantics, CapabilityPolicy, ContextPolicy, EscalationPolicy, JoinPolicy, MapPolicy,
11    ModelPolicy, ReducePolicy, RetryPolicy, StageContract,
12};
13use crate::llm::{extract_llm_options, vm_call_llm_full, vm_value_to_json};
14use crate::tool_annotations::{SideEffectLevel, ToolAnnotations, ToolArgSchema, ToolKind};
15use crate::value::{VmError, VmValue};
16
17pub const WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY: &str = "workflow_verification_contracts";
18pub const WORKFLOW_VERIFICATION_SCOPE_METADATA_KEY: &str = "workflow_verification_scope";
19
20#[derive(Clone, Debug, Default, Serialize, Deserialize)]
21#[serde(default)]
22pub struct WorkflowNode {
23    pub id: Option<String>,
24    pub kind: String,
25    pub mode: Option<String>,
26    pub prompt: Option<String>,
27    pub system: Option<String>,
28    pub task_label: Option<String>,
29    pub done_sentinel: Option<String>,
30    pub tools: serde_json::Value,
31    pub model_policy: ModelPolicy,
32    /// Per-stage auto-compaction settings for the agent loop's context
33    /// window. Lifecycle operations (reset, fork, trim, compact) are NOT
34    /// expressible here — call the `agent_session_*` builtins before the
35    /// stage or in a prior stage.
36    pub auto_compact: AutoCompactPolicy,
37    /// Output visibility filter applied to the transcript after the
38    /// stage's agent loop exits. `"public"` / `"public_only"` drops
39    /// `tool_result` messages and non-public events. `None` or any
40    /// unknown string is a no-op.
41    #[serde(default)]
42    pub output_visibility: Option<String>,
43    pub context_policy: ContextPolicy,
44    pub retry_policy: RetryPolicy,
45    pub capability_policy: CapabilityPolicy,
46    pub approval_policy: super::ToolApprovalPolicy,
47    pub input_contract: StageContract,
48    pub output_contract: StageContract,
49    pub branch_semantics: BranchSemantics,
50    pub map_policy: MapPolicy,
51    pub join_policy: JoinPolicy,
52    pub reduce_policy: ReducePolicy,
53    pub escalation_policy: EscalationPolicy,
54    pub verify: Option<serde_json::Value>,
55    /// When true, the stage's agent loop gates the done sentinel on the most
56    /// recent `run()` tool call exiting cleanly (`exit_code == 0`). Use for
57    /// persistent execute stages that fold verification into the loop via a
58    /// shell-exec tool the model invokes explicitly.
59    #[serde(default)]
60    pub exit_when_verified: bool,
61    pub metadata: BTreeMap<String, serde_json::Value>,
62    #[serde(skip)]
63    pub raw_tools: Option<VmValue>,
64    /// Raw auto_compact VmValue dict — preserved for extracting closure
65    /// fields (compress_callback, mask_callback, custom_compactor) that
66    /// can't go through serde.
67    #[serde(skip)]
68    pub raw_auto_compact: Option<VmValue>,
69    /// Raw model_policy VmValue dict — preserved for extracting closure
70    /// fields (post_turn_callback) that can't go through serde.
71    #[serde(skip)]
72    pub raw_model_policy: Option<VmValue>,
73}
74
75impl PartialEq for WorkflowNode {
76    fn eq(&self, other: &Self) -> bool {
77        serde_json::to_value(self).ok() == serde_json::to_value(other).ok()
78    }
79}
80
81#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
82#[serde(default)]
83pub struct VerificationRequirement {
84    pub kind: String,
85    pub value: String,
86    pub note: Option<String>,
87}
88
89#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
90#[serde(default)]
91pub struct VerificationContract {
92    pub source_node: Option<String>,
93    pub summary: Option<String>,
94    pub command: Option<String>,
95    pub expect_status: Option<i64>,
96    pub assert_text: Option<String>,
97    pub expect_text: Option<String>,
98    pub required_identifiers: Vec<String>,
99    pub required_paths: Vec<String>,
100    pub required_text: Vec<String>,
101    pub notes: Vec<String>,
102    pub checks: Vec<VerificationRequirement>,
103}
104
105impl VerificationContract {
106    fn is_empty(&self) -> bool {
107        self.summary.is_none()
108            && self.command.is_none()
109            && self.expect_status.is_none()
110            && self.assert_text.is_none()
111            && self.expect_text.is_none()
112            && self.required_identifiers.is_empty()
113            && self.required_paths.is_empty()
114            && self.required_text.is_empty()
115            && self.notes.is_empty()
116            && self.checks.is_empty()
117    }
118}
119
120fn push_unique_string(values: &mut Vec<String>, value: &str) {
121    let trimmed = value.trim();
122    if trimmed.is_empty() {
123        return;
124    }
125    if !values.iter().any(|existing| existing == trimmed) {
126        values.push(trimmed.to_string());
127    }
128}
129
130fn push_unique_requirement(
131    values: &mut Vec<VerificationRequirement>,
132    kind: &str,
133    value: &str,
134    note: Option<&str>,
135) {
136    let trimmed_kind = kind.trim();
137    let trimmed_value = value.trim();
138    let trimmed_note = note
139        .map(str::trim)
140        .filter(|candidate| !candidate.is_empty())
141        .map(|candidate| candidate.to_string());
142    if trimmed_kind.is_empty() || trimmed_value.is_empty() {
143        return;
144    }
145    let candidate = VerificationRequirement {
146        kind: trimmed_kind.to_string(),
147        value: trimmed_value.to_string(),
148        note: trimmed_note,
149    };
150    if !values.iter().any(|existing| existing == &candidate) {
151        values.push(candidate);
152    }
153}
154
155fn json_string_list(value: Option<&serde_json::Value>) -> Vec<String> {
156    match value {
157        Some(serde_json::Value::String(text)) => {
158            let mut values = Vec::new();
159            push_unique_string(&mut values, text);
160            values
161        }
162        Some(serde_json::Value::Array(items)) => {
163            let mut values = Vec::new();
164            for item in items {
165                if let Some(text) = item.as_str() {
166                    push_unique_string(&mut values, text);
167                }
168            }
169            values
170        }
171        _ => Vec::new(),
172    }
173}
174
175fn merge_verification_requirement_list(
176    target: &mut Vec<VerificationRequirement>,
177    value: Option<&serde_json::Value>,
178) {
179    let Some(items) = value.and_then(|raw| raw.as_array()) else {
180        return;
181    };
182    for item in items {
183        let Some(object) = item.as_object() else {
184            continue;
185        };
186        let kind = object
187            .get("kind")
188            .and_then(|value| value.as_str())
189            .unwrap_or_default();
190        let value = object
191            .get("value")
192            .and_then(|value| value.as_str())
193            .unwrap_or_default();
194        let note = object
195            .get("note")
196            .or_else(|| object.get("description"))
197            .or_else(|| object.get("reason"))
198            .and_then(|value| value.as_str());
199        push_unique_requirement(target, kind, value, note);
200    }
201}
202
203fn merge_verification_contract_fields(
204    target: &mut VerificationContract,
205    object: &serde_json::Map<String, serde_json::Value>,
206) {
207    if target.summary.is_none() {
208        target.summary = object
209            .get("summary")
210            .and_then(|value| value.as_str())
211            .map(str::trim)
212            .filter(|value| !value.is_empty())
213            .map(|value| value.to_string());
214    }
215    if target.command.is_none() {
216        target.command = object
217            .get("command")
218            .and_then(|value| value.as_str())
219            .map(str::trim)
220            .filter(|value| !value.is_empty())
221            .map(|value| value.to_string());
222    }
223    if target.expect_status.is_none() {
224        target.expect_status = object.get("expect_status").and_then(|value| value.as_i64());
225    }
226    if target.assert_text.is_none() {
227        target.assert_text = object
228            .get("assert_text")
229            .and_then(|value| value.as_str())
230            .map(str::trim)
231            .filter(|value| !value.is_empty())
232            .map(|value| value.to_string());
233    }
234    if target.expect_text.is_none() {
235        target.expect_text = object
236            .get("expect_text")
237            .and_then(|value| value.as_str())
238            .map(str::trim)
239            .filter(|value| !value.is_empty())
240            .map(|value| value.to_string());
241    }
242
243    for value in json_string_list(
244        object
245            .get("required_identifiers")
246            .or_else(|| object.get("identifiers")),
247    ) {
248        push_unique_string(&mut target.required_identifiers, &value);
249    }
250    for value in json_string_list(object.get("required_paths").or_else(|| object.get("paths"))) {
251        push_unique_string(&mut target.required_paths, &value);
252    }
253    for value in json_string_list(
254        object
255            .get("required_text")
256            .or_else(|| object.get("exact_text"))
257            .or_else(|| object.get("required_strings")),
258    ) {
259        push_unique_string(&mut target.required_text, &value);
260    }
261    for value in json_string_list(object.get("notes")) {
262        push_unique_string(&mut target.notes, &value);
263    }
264    merge_verification_requirement_list(&mut target.checks, object.get("checks"));
265}
266
267fn load_verification_contract_file(path: &str) -> Result<serde_json::Value, VmError> {
268    let resolved = crate::stdlib::process::resolve_source_asset_path(path);
269    let contents = std::fs::read_to_string(&resolved).map_err(|error| {
270        VmError::Runtime(format!(
271            "workflow verification contract read failed for {}: {error}",
272            resolved.display()
273        ))
274    })?;
275    serde_json::from_str(&contents).map_err(|error| {
276        VmError::Runtime(format!(
277            "workflow verification contract parse failed for {}: {error}",
278            resolved.display()
279        ))
280    })
281}
282
283fn resolve_verification_contract_path(
284    verify: &serde_json::Map<String, serde_json::Value>,
285) -> Result<Option<serde_json::Value>, VmError> {
286    let Some(path) = verify
287        .get("contract_path")
288        .or_else(|| verify.get("verification_contract_path"))
289        .and_then(|value| value.as_str())
290        .map(str::trim)
291        .filter(|value| !value.is_empty())
292    else {
293        return Ok(None);
294    };
295    Ok(Some(load_verification_contract_file(path)?))
296}
297
298pub fn verification_contract_from_verify(
299    node_id: &str,
300    verify: Option<&serde_json::Value>,
301) -> Result<Option<VerificationContract>, VmError> {
302    let Some(verify_object) = verify.and_then(|value| value.as_object()) else {
303        return Ok(None);
304    };
305
306    let mut contract = VerificationContract {
307        source_node: Some(node_id.to_string()),
308        ..Default::default()
309    };
310
311    if let Some(file_contract) = resolve_verification_contract_path(verify_object)? {
312        let Some(object) = file_contract.as_object() else {
313            return Err(VmError::Runtime(
314                "workflow verification contract file must parse to a JSON object".to_string(),
315            ));
316        };
317        merge_verification_contract_fields(&mut contract, object);
318    }
319
320    if let Some(inline_contract) = verify_object.get("contract") {
321        let Some(object) = inline_contract.as_object() else {
322            return Err(VmError::Runtime(
323                "workflow verify.contract must be an object".to_string(),
324            ));
325        };
326        merge_verification_contract_fields(&mut contract, object);
327    }
328
329    merge_verification_contract_fields(&mut contract, verify_object);
330
331    if let Some(assert_text) = contract.assert_text.clone() {
332        push_unique_requirement(
333            &mut contract.checks,
334            "visible_text_contains",
335            &assert_text,
336            Some("verify stage requires visible output to contain this text"),
337        );
338    }
339    if let Some(expect_text) = contract.expect_text.clone() {
340        push_unique_requirement(
341            &mut contract.checks,
342            "combined_output_contains",
343            &expect_text,
344            Some("verify command requires combined stdout/stderr to contain this text"),
345        );
346    }
347    if let Some(expect_status) = contract.expect_status {
348        push_unique_requirement(
349            &mut contract.checks,
350            "expect_status",
351            &expect_status.to_string(),
352            Some("verify command exit status must match exactly"),
353        );
354    }
355    for identifier in contract.required_identifiers.clone() {
356        push_unique_requirement(
357            &mut contract.checks,
358            "identifier",
359            &identifier,
360            Some("use this exact identifier spelling"),
361        );
362    }
363    for path in contract.required_paths.clone() {
364        push_unique_requirement(
365            &mut contract.checks,
366            "path",
367            &path,
368            Some("preserve this exact path"),
369        );
370    }
371    for text in contract.required_text.clone() {
372        push_unique_requirement(
373            &mut contract.checks,
374            "text",
375            &text,
376            Some("required exact text or wiring snippet"),
377        );
378    }
379
380    if contract.is_empty() {
381        return Ok(None);
382    }
383    Ok(Some(contract))
384}
385
386fn push_unique_contract(values: &mut Vec<VerificationContract>, candidate: VerificationContract) {
387    if !values.iter().any(|existing| existing == &candidate) {
388        values.push(candidate);
389    }
390}
391
392pub fn workflow_verification_contracts(
393    graph: &WorkflowGraph,
394) -> Result<Vec<VerificationContract>, VmError> {
395    let mut contracts = Vec::new();
396    for (node_id, node) in &graph.nodes {
397        if let Some(contract) = verification_contract_from_verify(node_id, node.verify.as_ref())? {
398            push_unique_contract(&mut contracts, contract);
399        }
400    }
401    Ok(contracts)
402}
403
404pub fn inject_workflow_verification_contracts(
405    node: &mut WorkflowNode,
406    contracts: &[VerificationContract],
407) {
408    if contracts.is_empty() {
409        return;
410    }
411    node.metadata.insert(
412        WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY.to_string(),
413        serde_json::to_value(contracts).unwrap_or_default(),
414    );
415}
416
417pub fn stage_verification_contracts(
418    node_id: &str,
419    node: &WorkflowNode,
420) -> Result<Vec<VerificationContract>, VmError> {
421    let local_contract = verification_contract_from_verify(node_id, node.verify.as_ref())?;
422    let local_only = matches!(
423        node.metadata
424            .get(WORKFLOW_VERIFICATION_SCOPE_METADATA_KEY)
425            .and_then(|value| value.as_str()),
426        Some("local_only")
427    );
428    if local_only {
429        return Ok(local_contract.into_iter().collect());
430    }
431
432    let mut contracts = node
433        .metadata
434        .get(WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY)
435        .cloned()
436        .map(|value| {
437            serde_json::from_value::<Vec<VerificationContract>>(value).map_err(|error| {
438                VmError::Runtime(format!(
439                    "workflow stage {node_id} verification contract metadata parse failed: {error}"
440                ))
441            })
442        })
443        .transpose()?
444        .unwrap_or_default();
445
446    if let Some(local_contract) = local_contract {
447        push_unique_contract(&mut contracts, local_contract);
448    }
449    Ok(contracts)
450}
451
452pub fn workflow_tool_names(value: &serde_json::Value) -> Vec<String> {
453    match value {
454        serde_json::Value::Null => Vec::new(),
455        serde_json::Value::Array(items) => items
456            .iter()
457            .filter_map(|item| match item {
458                serde_json::Value::Object(map) => map
459                    .get("name")
460                    .and_then(|value| value.as_str())
461                    .filter(|name| !name.is_empty())
462                    .map(|name| name.to_string()),
463                _ => None,
464            })
465            .collect(),
466        serde_json::Value::Object(map) => {
467            if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
468                return map
469                    .get("tools")
470                    .map(workflow_tool_names)
471                    .unwrap_or_default();
472            }
473            map.get("name")
474                .and_then(|value| value.as_str())
475                .filter(|name| !name.is_empty())
476                .map(|name| vec![name.to_string()])
477                .unwrap_or_default()
478        }
479        _ => Vec::new(),
480    }
481}
482
483fn max_side_effect_level(levels: impl Iterator<Item = String>) -> Option<String> {
484    fn rank(v: &str) -> usize {
485        match v {
486            "none" => 0,
487            "read_only" => 1,
488            "workspace_write" => 2,
489            "process_exec" => 3,
490            "network" => 4,
491            _ => 5,
492        }
493    }
494    levels.max_by_key(|level| rank(level))
495}
496
497fn parse_tool_kind(value: Option<&serde_json::Value>) -> ToolKind {
498    match value.and_then(|v| v.as_str()).unwrap_or("") {
499        "read" => ToolKind::Read,
500        "edit" => ToolKind::Edit,
501        "delete" => ToolKind::Delete,
502        "move" => ToolKind::Move,
503        "search" => ToolKind::Search,
504        "execute" => ToolKind::Execute,
505        "think" => ToolKind::Think,
506        "fetch" => ToolKind::Fetch,
507        _ => ToolKind::Other,
508    }
509}
510
511fn parse_tool_annotations(map: &serde_json::Map<String, serde_json::Value>) -> ToolAnnotations {
512    let policy = map
513        .get("policy")
514        .and_then(|value| value.as_object())
515        .cloned()
516        .unwrap_or_default();
517
518    let capabilities = policy
519        .get("capabilities")
520        .and_then(|value| value.as_object())
521        .map(|caps| {
522            caps.iter()
523                .map(|(capability, ops)| {
524                    let values = ops
525                        .as_array()
526                        .map(|items| {
527                            items
528                                .iter()
529                                .filter_map(|item| item.as_str().map(|s| s.to_string()))
530                                .collect::<Vec<_>>()
531                        })
532                        .unwrap_or_default();
533                    (capability.clone(), values)
534                })
535                .collect::<BTreeMap<_, _>>()
536        })
537        .unwrap_or_default();
538
539    // Accept both the structured `policy.arg_schema` object and the legacy
540    // flat fields on `policy` so pipelines can migrate gradually.
541    let arg_schema = if let Some(schema) = policy.get("arg_schema") {
542        serde_json::from_value::<ToolArgSchema>(schema.clone()).unwrap_or_default()
543    } else {
544        ToolArgSchema {
545            path_params: policy
546                .get("path_params")
547                .and_then(|value| value.as_array())
548                .map(|items| {
549                    items
550                        .iter()
551                        .filter_map(|item| item.as_str().map(|s| s.to_string()))
552                        .collect::<Vec<_>>()
553                })
554                .unwrap_or_default(),
555            arg_aliases: policy
556                .get("arg_aliases")
557                .and_then(|value| value.as_object())
558                .map(|aliases| {
559                    aliases
560                        .iter()
561                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
562                        .collect::<BTreeMap<_, _>>()
563                })
564                .unwrap_or_default(),
565            required: policy
566                .get("required")
567                .and_then(|value| value.as_array())
568                .map(|items| {
569                    items
570                        .iter()
571                        .filter_map(|item| item.as_str().map(|s| s.to_string()))
572                        .collect::<Vec<_>>()
573                })
574                .unwrap_or_default(),
575        }
576    };
577
578    let kind = parse_tool_kind(policy.get("kind"));
579    let side_effect_level = policy
580        .get("side_effect_level")
581        .and_then(|value| value.as_str())
582        .map(SideEffectLevel::parse)
583        .unwrap_or_default();
584
585    ToolAnnotations {
586        kind,
587        side_effect_level,
588        arg_schema,
589        capabilities,
590    }
591}
592
593pub fn workflow_tool_annotations(value: &serde_json::Value) -> BTreeMap<String, ToolAnnotations> {
594    match value {
595        serde_json::Value::Null => BTreeMap::new(),
596        serde_json::Value::Array(items) => items
597            .iter()
598            .filter_map(|item| match item {
599                serde_json::Value::Object(map) => map
600                    .get("name")
601                    .and_then(|value| value.as_str())
602                    .filter(|name| !name.is_empty())
603                    .map(|name| (name.to_string(), parse_tool_annotations(map))),
604                _ => None,
605            })
606            .collect(),
607        serde_json::Value::Object(map) => {
608            if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
609                return map
610                    .get("tools")
611                    .map(workflow_tool_annotations)
612                    .unwrap_or_default();
613            }
614            map.get("name")
615                .and_then(|value| value.as_str())
616                .filter(|name| !name.is_empty())
617                .map(|name| {
618                    let mut annotations = BTreeMap::new();
619                    annotations.insert(name.to_string(), parse_tool_annotations(map));
620                    annotations
621                })
622                .unwrap_or_default()
623        }
624        _ => BTreeMap::new(),
625    }
626}
627
628pub fn workflow_tool_policy_from_tools(value: &serde_json::Value) -> CapabilityPolicy {
629    let tools = workflow_tool_names(value);
630    let tool_annotations = workflow_tool_annotations(value);
631    let mut capabilities: BTreeMap<String, Vec<String>> = BTreeMap::new();
632    for annotations in tool_annotations.values() {
633        for (capability, ops) in &annotations.capabilities {
634            let entry = capabilities.entry(capability.clone()).or_default();
635            for op in ops {
636                if !entry.contains(op) {
637                    entry.push(op.clone());
638                }
639            }
640            entry.sort();
641        }
642    }
643    let side_effect_level = max_side_effect_level(
644        tool_annotations
645            .values()
646            .map(|annotations| annotations.side_effect_level.as_str().to_string())
647            .filter(|level| level != "none"),
648    );
649    CapabilityPolicy {
650        tools,
651        capabilities,
652        workspace_roots: Vec::new(),
653        side_effect_level,
654        recursion_limit: None,
655        tool_arg_constraints: Vec::new(),
656        tool_annotations,
657    }
658}
659
660#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
661#[serde(default)]
662pub struct WorkflowEdge {
663    pub from: String,
664    pub to: String,
665    pub branch: Option<String>,
666    pub label: Option<String>,
667}
668
669#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
670#[serde(default)]
671pub struct WorkflowGraph {
672    #[serde(rename = "_type")]
673    pub type_name: String,
674    pub id: String,
675    pub name: Option<String>,
676    pub version: usize,
677    pub entry: String,
678    pub nodes: BTreeMap<String, WorkflowNode>,
679    pub edges: Vec<WorkflowEdge>,
680    pub capability_policy: CapabilityPolicy,
681    pub approval_policy: super::ToolApprovalPolicy,
682    pub metadata: BTreeMap<String, serde_json::Value>,
683    pub audit_log: Vec<WorkflowAuditEntry>,
684}
685
686#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
687#[serde(default)]
688pub struct WorkflowAuditEntry {
689    pub id: String,
690    pub op: String,
691    pub node_id: Option<String>,
692    pub timestamp: String,
693    pub reason: Option<String>,
694    pub metadata: BTreeMap<String, serde_json::Value>,
695}
696
697#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
698#[serde(default)]
699pub struct WorkflowValidationReport {
700    pub valid: bool,
701    pub errors: Vec<String>,
702    pub warnings: Vec<String>,
703    pub reachable_nodes: Vec<String>,
704}
705
706pub fn parse_workflow_node_value(value: &VmValue, label: &str) -> Result<WorkflowNode, VmError> {
707    let mut node: WorkflowNode = super::parse_json_payload(vm_value_to_json(value), label)?;
708    let dict = value.as_dict();
709    node.raw_tools = dict.and_then(|d| d.get("tools")).cloned();
710    node.raw_auto_compact = dict.and_then(|d| d.get("auto_compact")).cloned();
711    node.raw_model_policy = dict.and_then(|d| d.get("model_policy")).cloned();
712    Ok(node)
713}
714
715pub fn parse_workflow_node_json(
716    json: serde_json::Value,
717    label: &str,
718) -> Result<WorkflowNode, VmError> {
719    super::parse_json_payload(json, label)
720}
721
722pub fn parse_workflow_edge_json(
723    json: serde_json::Value,
724    label: &str,
725) -> Result<WorkflowEdge, VmError> {
726    super::parse_json_payload(json, label)
727}
728
729pub fn normalize_workflow_value(value: &VmValue) -> Result<WorkflowGraph, VmError> {
730    let mut graph: WorkflowGraph = super::parse_json_value(value)?;
731    let as_dict = value.as_dict().cloned().unwrap_or_default();
732
733    if graph.nodes.is_empty() {
734        for key in ["act", "verify", "repair"] {
735            if let Some(node_value) = as_dict.get(key) {
736                let mut node = parse_workflow_node_value(node_value, "orchestration")?;
737                let raw_node = node_value.as_dict().cloned().unwrap_or_default();
738                node.id = Some(key.to_string());
739                if node.kind.is_empty() {
740                    node.kind = if key == "verify" {
741                        "verify".to_string()
742                    } else {
743                        "stage".to_string()
744                    };
745                }
746                if node.model_policy.provider.is_none() {
747                    node.model_policy.provider = as_dict
748                        .get("provider")
749                        .map(|value| value.display())
750                        .filter(|value| !value.is_empty());
751                }
752                if node.model_policy.model.is_none() {
753                    node.model_policy.model = as_dict
754                        .get("model")
755                        .map(|value| value.display())
756                        .filter(|value| !value.is_empty());
757                }
758                if node.model_policy.model_tier.is_none() {
759                    node.model_policy.model_tier = as_dict
760                        .get("model_tier")
761                        .or_else(|| as_dict.get("tier"))
762                        .map(|value| value.display())
763                        .filter(|value| !value.is_empty());
764                }
765                if node.model_policy.temperature.is_none() {
766                    node.model_policy.temperature = as_dict.get("temperature").and_then(|value| {
767                        if let VmValue::Float(number) = value {
768                            Some(*number)
769                        } else {
770                            value.as_int().map(|number| number as f64)
771                        }
772                    });
773                }
774                if node.model_policy.max_tokens.is_none() {
775                    node.model_policy.max_tokens =
776                        as_dict.get("max_tokens").and_then(|value| value.as_int());
777                }
778                if node.mode.is_none() {
779                    node.mode = as_dict
780                        .get("mode")
781                        .map(|value| value.display())
782                        .filter(|value| !value.is_empty());
783                }
784                if node.done_sentinel.is_none() {
785                    node.done_sentinel = as_dict
786                        .get("done_sentinel")
787                        .map(|value| value.display())
788                        .filter(|value| !value.is_empty());
789                }
790                if key == "verify"
791                    && node.verify.is_none()
792                    && (raw_node.contains_key("assert_text")
793                        || raw_node.contains_key("command")
794                        || raw_node.contains_key("expect_status")
795                        || raw_node.contains_key("expect_text"))
796                {
797                    node.verify = Some(serde_json::json!({
798                        "assert_text": raw_node.get("assert_text").map(vm_value_to_json),
799                        "command": raw_node.get("command").map(vm_value_to_json),
800                        "expect_status": raw_node.get("expect_status").map(vm_value_to_json),
801                        "expect_text": raw_node.get("expect_text").map(vm_value_to_json),
802                    }));
803                }
804                graph.nodes.insert(key.to_string(), node);
805            }
806        }
807        if graph.entry.is_empty() && graph.nodes.contains_key("act") {
808            graph.entry = "act".to_string();
809        }
810        if graph.edges.is_empty() && graph.nodes.contains_key("act") {
811            if graph.nodes.contains_key("verify") {
812                graph.edges.push(WorkflowEdge {
813                    from: "act".to_string(),
814                    to: "verify".to_string(),
815                    branch: None,
816                    label: None,
817                });
818            }
819            if graph.nodes.contains_key("repair") {
820                graph.edges.push(WorkflowEdge {
821                    from: "verify".to_string(),
822                    to: "repair".to_string(),
823                    branch: Some("failed".to_string()),
824                    label: None,
825                });
826                graph.edges.push(WorkflowEdge {
827                    from: "repair".to_string(),
828                    to: "verify".to_string(),
829                    branch: Some("retry".to_string()),
830                    label: None,
831                });
832            }
833        }
834    }
835
836    if graph.type_name.is_empty() {
837        graph.type_name = "workflow_graph".to_string();
838    }
839    if graph.id.is_empty() {
840        graph.id = new_id("workflow");
841    }
842    if graph.version == 0 {
843        graph.version = 1;
844    }
845    if graph.entry.is_empty() {
846        graph.entry = graph
847            .nodes
848            .keys()
849            .next()
850            .cloned()
851            .unwrap_or_else(|| "act".to_string());
852    }
853    for (node_id, node) in &mut graph.nodes {
854        if node.raw_tools.is_none() {
855            node.raw_tools = as_dict
856                .get("nodes")
857                .and_then(|nodes| nodes.as_dict())
858                .and_then(|nodes| nodes.get(node_id))
859                .and_then(|node_value| node_value.as_dict())
860                .and_then(|raw_node| raw_node.get("tools"))
861                .cloned();
862        }
863        if node.id.is_none() {
864            node.id = Some(node_id.clone());
865        }
866        if node.kind.is_empty() {
867            node.kind = "stage".to_string();
868        }
869        if node.join_policy.strategy.is_empty() {
870            node.join_policy.strategy = "all".to_string();
871        }
872        if node.reduce_policy.strategy.is_empty() {
873            node.reduce_policy.strategy = "concat".to_string();
874        }
875        if node.output_contract.output_kinds.is_empty() {
876            node.output_contract.output_kinds = vec![match node.kind.as_str() {
877                "verify" => "verification_result".to_string(),
878                "reduce" => node
879                    .reduce_policy
880                    .output_kind
881                    .clone()
882                    .unwrap_or_else(|| "summary".to_string()),
883                "map" => node
884                    .map_policy
885                    .output_kind
886                    .clone()
887                    .unwrap_or_else(|| "artifact".to_string()),
888                "escalation" => "plan".to_string(),
889                _ => "artifact".to_string(),
890            }];
891        }
892        if node.retry_policy.max_attempts == 0 {
893            node.retry_policy.max_attempts = 1;
894        }
895    }
896    Ok(graph)
897}
898
899pub fn validate_workflow(
900    graph: &WorkflowGraph,
901    ceiling: Option<&CapabilityPolicy>,
902) -> WorkflowValidationReport {
903    let mut errors = Vec::new();
904    let mut warnings = Vec::new();
905
906    if !graph.nodes.contains_key(&graph.entry) {
907        errors.push(format!("entry node does not exist: {}", graph.entry));
908    }
909
910    let node_ids: BTreeSet<String> = graph.nodes.keys().cloned().collect();
911    for edge in &graph.edges {
912        if !node_ids.contains(&edge.from) {
913            errors.push(format!("edge.from references unknown node: {}", edge.from));
914        }
915        if !node_ids.contains(&edge.to) {
916            errors.push(format!("edge.to references unknown node: {}", edge.to));
917        }
918    }
919
920    let reachable_nodes = reachable_nodes(graph);
921    for node_id in &node_ids {
922        if !reachable_nodes.contains(node_id) {
923            warnings.push(format!("node is unreachable: {node_id}"));
924        }
925    }
926
927    for (node_id, node) in &graph.nodes {
928        let incoming = graph
929            .edges
930            .iter()
931            .filter(|edge| edge.to == *node_id)
932            .count();
933        let outgoing: Vec<&WorkflowEdge> = graph
934            .edges
935            .iter()
936            .filter(|edge| edge.from == *node_id)
937            .collect();
938        if let Some(min_inputs) = node.input_contract.min_inputs {
939            if let Some(max_inputs) = node.input_contract.max_inputs {
940                if min_inputs > max_inputs {
941                    errors.push(format!(
942                        "node {node_id}: input contract min_inputs exceeds max_inputs"
943                    ));
944                }
945            }
946        }
947        match node.kind.as_str() {
948            "condition" => {
949                let has_true = outgoing
950                    .iter()
951                    .any(|edge| edge.branch.as_deref() == Some("true"));
952                let has_false = outgoing
953                    .iter()
954                    .any(|edge| edge.branch.as_deref() == Some("false"));
955                if !has_true || !has_false {
956                    errors.push(format!(
957                        "node {node_id}: condition nodes require both 'true' and 'false' branch edges"
958                    ));
959                }
960            }
961            "fork" if outgoing.len() < 2 => {
962                errors.push(format!(
963                    "node {node_id}: fork nodes require at least two outgoing edges"
964                ));
965            }
966            "join" if incoming < 2 => {
967                warnings.push(format!(
968                    "node {node_id}: join node has fewer than two incoming edges"
969                ));
970            }
971            "map"
972                if node.map_policy.items.is_empty()
973                    && node.map_policy.item_artifact_kind.is_none()
974                    && node.input_contract.input_kinds.is_empty() =>
975            {
976                errors.push(format!(
977                    "node {node_id}: map nodes require items, item_artifact_kind, or input_contract.input_kinds"
978                ));
979            }
980            "reduce" if node.input_contract.input_kinds.is_empty() => {
981                warnings.push(format!(
982                    "node {node_id}: reduce node has no input_contract.input_kinds; it will consume all available artifacts"
983                ));
984            }
985            _ => {}
986        }
987    }
988
989    if let Some(ceiling) = ceiling {
990        if let Err(error) = ceiling.intersect(&graph.capability_policy) {
991            errors.push(error);
992        }
993        for (node_id, node) in &graph.nodes {
994            if let Err(error) = ceiling.intersect(&node.capability_policy) {
995                errors.push(format!("node {node_id}: {error}"));
996            }
997        }
998    }
999
1000    WorkflowValidationReport {
1001        valid: errors.is_empty(),
1002        errors,
1003        warnings,
1004        reachable_nodes: reachable_nodes.into_iter().collect(),
1005    }
1006}
1007
1008fn reachable_nodes(graph: &WorkflowGraph) -> BTreeSet<String> {
1009    let mut seen = BTreeSet::new();
1010    let mut stack = vec![graph.entry.clone()];
1011    while let Some(node_id) = stack.pop() {
1012        if !seen.insert(node_id.clone()) {
1013            continue;
1014        }
1015        for edge in graph.edges.iter().filter(|edge| edge.from == node_id) {
1016            stack.push(edge.to.clone());
1017        }
1018    }
1019    seen
1020}
1021
1022/// Pick the session id a stage should run under. Prefers an explicit
1023/// `session_id` on the node's `model_policy` dict (so pipelines with
1024/// `agent_session_open` / `agent_session_fork` flowing through a graph
1025/// line up); falls back to a stable, node-derived id so multi-stage
1026/// graphs with no explicit session share a conversation across stages.
1027/// Per-stage skill registry. Per-node `model_policy.skills` takes
1028/// precedence over the workflow-level `run_options.skills` — authors
1029/// can scope a skill set to one stage without affecting siblings. When
1030/// neither is set, returns `None` so the agent loop runs without
1031/// skill matching (preserves pre-Gap-2 behavior for callers that
1032/// didn't opt in).
1033fn resolve_stage_skill_registry(node: &WorkflowNode) -> Option<VmValue> {
1034    let per_node = node
1035        .raw_model_policy
1036        .as_ref()
1037        .and_then(|v| v.as_dict())
1038        .and_then(|d| d.get("skills"))
1039        .cloned()
1040        .and_then(normalize_inline_registry);
1041    if per_node.is_some() {
1042        return per_node;
1043    }
1044    super::current_workflow_skill_context().and_then(|ctx| ctx.registry)
1045}
1046
1047/// Mirror of `resolve_stage_skill_registry` for the match config:
1048/// per-node `model_policy.skill_match` wins, falling back to the
1049/// workflow-level setting.
1050fn resolve_stage_skill_match(node: &WorkflowNode) -> crate::llm::SkillMatchConfig {
1051    let per_node = node
1052        .raw_model_policy
1053        .as_ref()
1054        .and_then(|v| v.as_dict())
1055        .and_then(|d| d.get("skill_match"))
1056        .and_then(|v| v.as_dict().cloned());
1057    if let Some(dict) = per_node {
1058        return crate::llm::parse_skill_match_config_dict(&dict);
1059    }
1060    super::current_workflow_skill_context()
1061        .and_then(|ctx| ctx.match_config)
1062        .and_then(|v| v.as_dict().cloned())
1063        .map(|d| crate::llm::parse_skill_match_config_dict(&d))
1064        .unwrap_or_default()
1065}
1066
1067/// Accept both a validated `skill_registry` dict and a bare list of
1068/// skill entries. The workflow-level parser in `register.rs` does the
1069/// same — we duplicate here so per-node `model_policy.skills` settings
1070/// (not routed through that parser) also benefit.
1071fn normalize_inline_registry(value: VmValue) -> Option<VmValue> {
1072    use std::collections::BTreeMap;
1073    use std::rc::Rc;
1074    match &value {
1075        VmValue::Dict(d)
1076            if d.get("_type")
1077                .map(|v| v.display() == "skill_registry")
1078                .unwrap_or(false) =>
1079        {
1080            Some(value)
1081        }
1082        VmValue::List(list) => {
1083            let mut dict = BTreeMap::new();
1084            dict.insert(
1085                "_type".to_string(),
1086                VmValue::String(Rc::from("skill_registry")),
1087            );
1088            dict.insert("skills".to_string(), VmValue::List(list.clone()));
1089            Some(VmValue::Dict(Rc::new(dict)))
1090        }
1091        _ => None,
1092    }
1093}
1094
1095fn resolve_node_session_id(node: &WorkflowNode) -> String {
1096    if let Some(explicit) = node
1097        .raw_model_policy
1098        .as_ref()
1099        .and_then(|v| v.as_dict())
1100        .and_then(|d| d.get("session_id"))
1101        .and_then(|v| match v {
1102            VmValue::String(s) if !s.trim().is_empty() => Some(s.to_string()),
1103            _ => None,
1104        })
1105    {
1106        return explicit;
1107    }
1108    if let Some(persisted) = node
1109        .metadata
1110        .get("worker_session_id")
1111        .and_then(|value| value.as_str())
1112        .filter(|value| !value.trim().is_empty())
1113    {
1114        return persisted.to_string();
1115    }
1116    format!("workflow_stage_{}", uuid::Uuid::now_v7())
1117}
1118
1119fn raw_auto_compact_dict(
1120    node: &WorkflowNode,
1121) -> Option<&std::collections::BTreeMap<String, VmValue>> {
1122    node.raw_auto_compact
1123        .as_ref()
1124        .and_then(|value| value.as_dict())
1125}
1126
1127fn raw_auto_compact_int(node: &WorkflowNode, key: &str) -> Option<usize> {
1128    raw_auto_compact_dict(node)
1129        .and_then(|dict| dict.get(key))
1130        .and_then(|value| value.as_int())
1131        .filter(|value| *value >= 0)
1132        .map(|value| value as usize)
1133}
1134
1135fn raw_auto_compact_string(node: &WorkflowNode, key: &str) -> Option<String> {
1136    raw_auto_compact_dict(node)
1137        .and_then(|dict| dict.get(key))
1138        .and_then(|value| match value {
1139            VmValue::String(text) if !text.trim().is_empty() => Some(text.to_string()),
1140            _ => None,
1141        })
1142}
1143
1144pub(crate) async fn resolve_stage_auto_compact(
1145    node: &WorkflowNode,
1146    opts: &crate::llm::api::LlmCallOptions,
1147) -> Result<Option<crate::orchestration::AutoCompactConfig>, VmError> {
1148    if !node.auto_compact.enabled {
1149        return Ok(None);
1150    }
1151
1152    let mut ac = crate::orchestration::AutoCompactConfig::default();
1153    if let Some(v) = node.auto_compact.token_threshold {
1154        ac.token_threshold = v;
1155    }
1156    if let Some(v) = node.auto_compact.tool_output_max_chars {
1157        ac.tool_output_max_chars = v;
1158    }
1159    if let Some(ref strategy) = node.auto_compact.compact_strategy {
1160        if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
1161            ac.compact_strategy = s;
1162        }
1163    }
1164    if let Some(v) = node.auto_compact.hard_limit_tokens {
1165        ac.hard_limit_tokens = Some(v);
1166    }
1167    if let Some(ref strategy) = node.auto_compact.hard_limit_strategy {
1168        if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
1169            ac.hard_limit_strategy = s;
1170        }
1171    }
1172
1173    // Workflow nodes keep the richer agent-loop-only compaction knobs in the
1174    // raw dict because the typed policy shape intentionally models only the
1175    // common workflow fields.
1176    if let Some(v) = raw_auto_compact_int(node, "compact_keep_last")
1177        .or_else(|| raw_auto_compact_int(node, "keep_last"))
1178    {
1179        ac.keep_last = v;
1180    }
1181    if let Some(prompt) = raw_auto_compact_string(node, "summarize_prompt") {
1182        ac.summarize_prompt = Some(prompt);
1183    }
1184
1185    // Closure fields can't round-trip through serde, so extract them directly
1186    // from the raw VmValue dict.
1187    if let Some(dict) = raw_auto_compact_dict(node) {
1188        if let Some(cb) = dict.get("compress_callback") {
1189            ac.compress_callback = Some(cb.clone());
1190        }
1191        if let Some(cb) = dict.get("mask_callback") {
1192            ac.mask_callback = Some(cb.clone());
1193        }
1194        if let Some(cb) = dict.get("custom_compactor") {
1195            ac.custom_compactor = Some(cb.clone());
1196        }
1197    }
1198
1199    let user_specified_threshold = node.auto_compact.token_threshold.is_some();
1200    let user_specified_hard_limit = node.auto_compact.hard_limit_tokens.is_some();
1201    crate::llm::api::adapt_auto_compact_to_provider(
1202        &mut ac,
1203        user_specified_threshold,
1204        user_specified_hard_limit,
1205        &opts.provider,
1206        &opts.model,
1207        &opts.api_key,
1208    )
1209    .await;
1210
1211    Ok(Some(ac))
1212}
1213
1214pub async fn execute_stage_node(
1215    node_id: &str,
1216    node: &WorkflowNode,
1217    task: &str,
1218    artifacts: &[ArtifactRecord],
1219) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
1220    let mut selection_policy = node.context_policy.clone();
1221    if selection_policy.include_kinds.is_empty() && !node.input_contract.input_kinds.is_empty() {
1222        selection_policy.include_kinds = node.input_contract.input_kinds.clone();
1223    }
1224    let selected = super::select_artifacts_adaptive(artifacts.to_vec(), &selection_policy);
1225    let rendered_context = super::render_artifacts_context(&selected, &node.context_policy);
1226    let verification_contracts = super::stage_verification_contracts(node_id, node)?;
1227    let rendered_verification = super::render_verification_context(&verification_contracts);
1228    let stage_session_id = resolve_node_session_id(node);
1229    if node.input_contract.require_transcript && !crate::agent_sessions::exists(&stage_session_id) {
1230        return Err(VmError::Runtime(format!(
1231            "workflow stage {node_id} requires an existing session \
1232             (call agent_session_open and feed session_id through model_policy \
1233             before entering this stage)"
1234        )));
1235    }
1236    if let Some(min_inputs) = node.input_contract.min_inputs {
1237        if selected.len() < min_inputs {
1238            return Err(VmError::Runtime(format!(
1239                "workflow stage {node_id} requires at least {min_inputs} input artifacts"
1240            )));
1241        }
1242    }
1243    if let Some(max_inputs) = node.input_contract.max_inputs {
1244        if selected.len() > max_inputs {
1245            return Err(VmError::Runtime(format!(
1246                "workflow stage {node_id} accepts at most {max_inputs} input artifacts"
1247            )));
1248        }
1249    }
1250    let prompt = super::render_workflow_prompt(
1251        task,
1252        node.task_label.as_deref(),
1253        &rendered_verification,
1254        &rendered_context,
1255    );
1256
1257    // Precedence for the tool-calling contract format:
1258    //   1. explicit `model_policy.tool_format` on the node
1259    //   2. `HARN_AGENT_TOOL_FORMAT` env override
1260    //   3. provider/model default
1261    // Mirrors the top-level agent_loop / llm_call resolution so workflow
1262    // authors can pin `tool_format: "native"` per-stage and have it
1263    // reach the inner agent loop.
1264    let tool_format = node
1265        .model_policy
1266        .tool_format
1267        .clone()
1268        .filter(|value| !value.trim().is_empty())
1269        .or_else(|| {
1270            std::env::var("HARN_AGENT_TOOL_FORMAT")
1271                .ok()
1272                .filter(|value| !value.trim().is_empty())
1273        })
1274        .unwrap_or_else(|| {
1275            let model = std::env::var("HARN_LLM_MODEL").unwrap_or_default();
1276            let provider = std::env::var("HARN_LLM_PROVIDER").unwrap_or_default();
1277            crate::llm_config::default_tool_format(&model, &provider)
1278        });
1279    let mut llm_result = if node.kind == "verify" {
1280        if let Some(command) = node
1281            .verify
1282            .as_ref()
1283            .and_then(|verify| verify.as_object())
1284            .and_then(|verify| verify.get("command"))
1285            .and_then(|value| value.as_str())
1286            .map(str::trim)
1287            .filter(|value| !value.is_empty())
1288        {
1289            let mut process = if cfg!(target_os = "windows") {
1290                crate::stdlib::sandbox::tokio_command_for(
1291                    "cmd",
1292                    &["/C".to_string(), command.to_string()],
1293                )?
1294            } else {
1295                crate::stdlib::sandbox::tokio_command_for(
1296                    "/bin/sh",
1297                    &["-lc".to_string(), command.to_string()],
1298                )?
1299            };
1300            process.stdin(std::process::Stdio::null());
1301            if let Some(context) = crate::stdlib::process::current_execution_context() {
1302                if let Some(cwd) = context.cwd.filter(|cwd| !cwd.is_empty()) {
1303                    crate::stdlib::sandbox::enforce_process_cwd(std::path::Path::new(&cwd))?;
1304                    process.current_dir(cwd);
1305                }
1306                if !context.env.is_empty() {
1307                    process.envs(context.env);
1308                }
1309            }
1310            let output = process.output().await.map_err(|e| {
1311                crate::stdlib::sandbox::process_spawn_error(&e).unwrap_or_else(|| {
1312                    VmError::Runtime(format!("workflow verify exec failed: {e}"))
1313                })
1314            })?;
1315            if let Some(error) = crate::stdlib::sandbox::process_violation_error(&output) {
1316                return Err(error);
1317            }
1318            let stdout = String::from_utf8_lossy(&output.stdout).to_string();
1319            let stderr = String::from_utf8_lossy(&output.stderr).to_string();
1320            let combined = if stderr.is_empty() {
1321                stdout.clone()
1322            } else if stdout.is_empty() {
1323                stderr.clone()
1324            } else {
1325                format!("{stdout}\n{stderr}")
1326            };
1327            serde_json::json!({
1328                "status": "completed",
1329                "text": combined,
1330                "visible_text": combined,
1331                "command": command,
1332                "stdout": stdout,
1333                "stderr": stderr,
1334                "exit_status": output.status.code().unwrap_or(-1),
1335                "success": output.status.success(),
1336            })
1337        } else {
1338            serde_json::json!({
1339                "status": "completed",
1340                "text": "",
1341                "visible_text": "",
1342            })
1343        }
1344    } else {
1345        let mut options = BTreeMap::new();
1346        if let Some(provider) = &node.model_policy.provider {
1347            options.insert(
1348                "provider".to_string(),
1349                VmValue::String(Rc::from(provider.clone())),
1350            );
1351        }
1352        if let Some(model) = &node.model_policy.model {
1353            options.insert(
1354                "model".to_string(),
1355                VmValue::String(Rc::from(model.clone())),
1356            );
1357        }
1358        if let Some(model_tier) = &node.model_policy.model_tier {
1359            options.insert(
1360                "model_tier".to_string(),
1361                VmValue::String(Rc::from(model_tier.clone())),
1362            );
1363        }
1364        if let Some(temperature) = node.model_policy.temperature {
1365            options.insert("temperature".to_string(), VmValue::Float(temperature));
1366        }
1367        if let Some(max_tokens) = node.model_policy.max_tokens {
1368            options.insert("max_tokens".to_string(), VmValue::Int(max_tokens));
1369        }
1370        let tool_names = workflow_tool_names(&node.tools);
1371        let tools_value = node.raw_tools.clone().or_else(|| {
1372            if matches!(node.tools, serde_json::Value::Null) {
1373                None
1374            } else {
1375                Some(crate::stdlib::json_to_vm_value(&node.tools))
1376            }
1377        });
1378        if tools_value.is_some() && !tool_names.is_empty() {
1379            options.insert("tools".to_string(), tools_value.unwrap_or(VmValue::Nil));
1380        }
1381        options.insert(
1382            "session_id".to_string(),
1383            VmValue::String(Rc::from(stage_session_id.clone())),
1384        );
1385
1386        let args = vec![
1387            VmValue::String(Rc::from(prompt.clone())),
1388            node.system
1389                .clone()
1390                .map(|s| VmValue::String(Rc::from(s)))
1391                .unwrap_or(VmValue::Nil),
1392            VmValue::Dict(Rc::new(options)),
1393        ];
1394        let mut opts = extract_llm_options(&args)?;
1395        let auto_compact = resolve_stage_auto_compact(node, &opts).await?;
1396
1397        if node.mode.as_deref() == Some("agent") || !tool_names.is_empty() {
1398            let tool_policy = workflow_tool_policy_from_tools(&node.tools);
1399            let effective_policy = tool_policy
1400                .intersect(&node.capability_policy)
1401                .map_err(VmError::Runtime)?;
1402            crate::llm::run_agent_loop_internal(
1403                &mut opts,
1404                crate::llm::AgentLoopConfig {
1405                    persistent: true,
1406                    max_iterations: node.model_policy.max_iterations.unwrap_or(16),
1407                    max_nudges: node.model_policy.max_nudges.unwrap_or(3),
1408                    nudge: node.model_policy.nudge.clone(),
1409                    done_sentinel: node.done_sentinel.clone(),
1410                    break_unless_phase: None,
1411                    tool_retries: 0,
1412                    tool_backoff_ms: 1000,
1413                    tool_format: tool_format.clone(),
1414                    native_tool_fallback: node.model_policy.native_tool_fallback,
1415                    auto_compact,
1416                    policy: Some(effective_policy),
1417                    approval_policy: Some(node.approval_policy.clone()),
1418                    daemon: false,
1419                    daemon_config: Default::default(),
1420                    llm_retries: 2,
1421                    llm_backoff_ms: 2000,
1422                    token_budget: None,
1423                    exit_when_verified: node.exit_when_verified,
1424                    loop_detect_warn: 2,
1425                    loop_detect_block: 3,
1426                    loop_detect_skip: 4,
1427                    tool_examples: node.model_policy.tool_examples.clone(),
1428                    turn_policy: node.model_policy.turn_policy.clone(),
1429                    stop_after_successful_tools: node
1430                        .model_policy
1431                        .stop_after_successful_tools
1432                        .clone(),
1433                    require_successful_tools: node.model_policy.require_successful_tools.clone(),
1434                    // Use the same session id resolved for the stage so
1435                    // agent_subscribe handlers keyed on it, and session
1436                    // storage lookups in the agent loop, stay consistent.
1437                    session_id: stage_session_id.clone(),
1438                    event_sink: None,
1439                    // Seed from the stage's explicit deliverables/ledger so the
1440                    // graph carries a task-wide plan through map branches and
1441                    // nested stages. Empty ledger means no gate.
1442                    task_ledger: node
1443                        .raw_model_policy
1444                        .as_ref()
1445                        .and_then(|v| v.as_dict())
1446                        .and_then(|d| d.get("task_ledger"))
1447                        .map(crate::llm::helpers::vm_value_to_json)
1448                        .and_then(|json| serde_json::from_value(json).ok())
1449                        .unwrap_or_default(),
1450                    post_turn_callback: node
1451                        .raw_model_policy
1452                        .as_ref()
1453                        .and_then(|v| v.as_dict())
1454                        .and_then(|d| d.get("post_turn_callback"))
1455                        .filter(|v| matches!(v, crate::value::VmValue::Closure(_)))
1456                        .cloned(),
1457                    // Inherit the workflow-level skill wiring installed
1458                    // by `workflow_execute`. Per-node `model_policy.skills`
1459                    // (optional) overrides, letting authors scope a skill
1460                    // set to one stage without affecting siblings. Empty
1461                    // thread-local = no skills configured (direct
1462                    // `execute_stage_node` callers outside a workflow).
1463                    skill_registry: resolve_stage_skill_registry(node),
1464                    skill_match: resolve_stage_skill_match(node),
1465                    working_files: Vec::new(),
1466                },
1467            )
1468            .await?
1469        } else {
1470            let result = vm_call_llm_full(&opts).await?;
1471            crate::llm::agent_loop_result_from_llm(&result, opts)
1472        }
1473    };
1474    if let Some(payload) = llm_result.as_object_mut() {
1475        payload.insert("prompt".to_string(), serde_json::json!(prompt));
1476        payload.insert(
1477            "system_prompt".to_string(),
1478            serde_json::json!(node.system.clone().unwrap_or_default()),
1479        );
1480        payload.insert(
1481            "rendered_context".to_string(),
1482            serde_json::json!(rendered_context),
1483        );
1484        if !verification_contracts.is_empty() {
1485            payload.insert(
1486                "verification_contracts".to_string(),
1487                serde_json::to_value(&verification_contracts).unwrap_or_default(),
1488            );
1489            payload.insert(
1490                "rendered_verification_context".to_string(),
1491                serde_json::json!(rendered_verification),
1492            );
1493        }
1494        payload.insert(
1495            "selected_artifact_ids".to_string(),
1496            serde_json::json!(selected
1497                .iter()
1498                .map(|artifact| artifact.id.clone())
1499                .collect::<Vec<_>>()),
1500        );
1501        payload.insert(
1502            "selected_artifact_titles".to_string(),
1503            serde_json::json!(selected
1504                .iter()
1505                .map(|artifact| artifact.title.clone())
1506                .collect::<Vec<_>>()),
1507        );
1508        payload.insert(
1509            "tool_calling_mode".to_string(),
1510            serde_json::json!(tool_format.clone()),
1511        );
1512    }
1513
1514    let visible_text = llm_result["text"].as_str().unwrap_or_default().to_string();
1515    // Non-LLM stages (verify command, condition, fork, join, ...) don't produce
1516    // a "transcript" field; fall back to the input so cross-stage conversation
1517    // state survives transitions.
1518    let result_transcript = llm_result
1519        .get("transcript")
1520        .cloned()
1521        .map(|value| crate::stdlib::json_to_vm_value(&value));
1522    let session_transcript = crate::agent_sessions::snapshot(&stage_session_id);
1523    let transcript = result_transcript
1524        .or(session_transcript)
1525        .and_then(|value| redact_transcript_visibility(&value, node.output_visibility.as_deref()));
1526    let output_kind = node
1527        .output_contract
1528        .output_kinds
1529        .first()
1530        .cloned()
1531        .unwrap_or_else(|| {
1532            if node.kind == "verify" {
1533                "verification_result".to_string()
1534            } else {
1535                "artifact".to_string()
1536            }
1537        });
1538    let mut metadata = BTreeMap::new();
1539    metadata.insert(
1540        "input_artifact_ids".to_string(),
1541        serde_json::json!(selected
1542            .iter()
1543            .map(|artifact| artifact.id.clone())
1544            .collect::<Vec<_>>()),
1545    );
1546    metadata.insert("node_kind".to_string(), serde_json::json!(node.kind));
1547    if !node.approval_policy.write_path_allowlist.is_empty() {
1548        metadata.insert(
1549            "changed_paths".to_string(),
1550            serde_json::json!(node.approval_policy.write_path_allowlist),
1551        );
1552    }
1553    let artifact = ArtifactRecord {
1554        type_name: "artifact".to_string(),
1555        id: new_id("artifact"),
1556        kind: output_kind,
1557        title: Some(format!("stage {node_id} output")),
1558        text: Some(visible_text),
1559        data: Some(llm_result.clone()),
1560        source: Some(node_id.to_string()),
1561        created_at: now_rfc3339(),
1562        freshness: Some("fresh".to_string()),
1563        priority: None,
1564        lineage: selected
1565            .iter()
1566            .map(|artifact| artifact.id.clone())
1567            .collect(),
1568        relevance: Some(1.0),
1569        estimated_tokens: None,
1570        stage: Some(node_id.to_string()),
1571        metadata,
1572    }
1573    .normalize();
1574
1575    Ok((llm_result, vec![artifact], transcript))
1576}
1577
1578pub fn next_nodes_for(
1579    graph: &WorkflowGraph,
1580    current: &str,
1581    branch: Option<&str>,
1582) -> Vec<WorkflowEdge> {
1583    let mut matching: Vec<WorkflowEdge> = graph
1584        .edges
1585        .iter()
1586        .filter(|edge| edge.from == current && edge.branch.as_deref() == branch)
1587        .cloned()
1588        .collect();
1589    if matching.is_empty() {
1590        matching = graph
1591            .edges
1592            .iter()
1593            .filter(|edge| edge.from == current && edge.branch.is_none())
1594            .cloned()
1595            .collect();
1596    }
1597    matching
1598}
1599
1600pub fn next_node_for(graph: &WorkflowGraph, current: &str, branch: &str) -> Option<String> {
1601    next_nodes_for(graph, current, Some(branch))
1602        .into_iter()
1603        .next()
1604        .map(|edge| edge.to)
1605}
1606
1607pub fn append_audit_entry(
1608    graph: &mut WorkflowGraph,
1609    op: &str,
1610    node_id: Option<String>,
1611    reason: Option<String>,
1612    metadata: BTreeMap<String, serde_json::Value>,
1613) {
1614    graph.audit_log.push(WorkflowAuditEntry {
1615        id: new_id("audit"),
1616        op: op.to_string(),
1617        node_id,
1618        timestamp: now_rfc3339(),
1619        reason,
1620        metadata,
1621    });
1622}