Skip to main content

harn_vm/orchestration/
workflow.rs

1//! Workflow graph types, normalization, validation, and execution.
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::rc::Rc;
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9    new_id, now_rfc3339, redact_transcript_visibility, ArtifactRecord, AutoCompactPolicy,
10    BranchSemantics, CapabilityPolicy, ContextPolicy, EscalationPolicy, JoinPolicy, MapPolicy,
11    ModelPolicy, ReducePolicy, RetryPolicy, StageContract,
12};
13use crate::llm::{extract_llm_options, vm_call_llm_full, vm_value_to_json};
14use crate::tool_annotations::{SideEffectLevel, ToolAnnotations, ToolArgSchema, ToolKind};
15use crate::value::{VmError, VmValue};
16
17pub const WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY: &str = "workflow_verification_contracts";
18
19#[derive(Clone, Debug, Default, Serialize, Deserialize)]
20#[serde(default)]
21pub struct WorkflowNode {
22    pub id: Option<String>,
23    pub kind: String,
24    pub mode: Option<String>,
25    pub prompt: Option<String>,
26    pub system: Option<String>,
27    pub task_label: Option<String>,
28    pub done_sentinel: Option<String>,
29    pub tools: serde_json::Value,
30    pub model_policy: ModelPolicy,
31    /// Per-stage auto-compaction settings for the agent loop's context
32    /// window. Lifecycle operations (reset, fork, trim, compact) are NOT
33    /// expressible here — call the `agent_session_*` builtins before the
34    /// stage or in a prior stage.
35    pub auto_compact: AutoCompactPolicy,
36    /// Output visibility filter applied to the transcript after the
37    /// stage's agent loop exits. `"public"` / `"public_only"` drops
38    /// `tool_result` messages and non-public events. `None` or any
39    /// unknown string is a no-op.
40    #[serde(default)]
41    pub output_visibility: Option<String>,
42    pub context_policy: ContextPolicy,
43    pub retry_policy: RetryPolicy,
44    pub capability_policy: CapabilityPolicy,
45    pub approval_policy: super::ToolApprovalPolicy,
46    pub input_contract: StageContract,
47    pub output_contract: StageContract,
48    pub branch_semantics: BranchSemantics,
49    pub map_policy: MapPolicy,
50    pub join_policy: JoinPolicy,
51    pub reduce_policy: ReducePolicy,
52    pub escalation_policy: EscalationPolicy,
53    pub verify: Option<serde_json::Value>,
54    /// When true, the stage's agent loop gates the done sentinel on the most
55    /// recent `run()` tool call exiting cleanly (`exit_code == 0`). Use for
56    /// persistent execute stages that fold verification into the loop via a
57    /// shell-exec tool the model invokes explicitly.
58    #[serde(default)]
59    pub exit_when_verified: bool,
60    pub metadata: BTreeMap<String, serde_json::Value>,
61    #[serde(skip)]
62    pub raw_tools: Option<VmValue>,
63    /// Raw auto_compact VmValue dict — preserved for extracting closure
64    /// fields (compress_callback, mask_callback, custom_compactor) that
65    /// can't go through serde.
66    #[serde(skip)]
67    pub raw_auto_compact: Option<VmValue>,
68    /// Raw model_policy VmValue dict — preserved for extracting closure
69    /// fields (post_turn_callback) that can't go through serde.
70    #[serde(skip)]
71    pub raw_model_policy: Option<VmValue>,
72}
73
74impl PartialEq for WorkflowNode {
75    fn eq(&self, other: &Self) -> bool {
76        serde_json::to_value(self).ok() == serde_json::to_value(other).ok()
77    }
78}
79
80#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
81#[serde(default)]
82pub struct VerificationRequirement {
83    pub kind: String,
84    pub value: String,
85    pub note: Option<String>,
86}
87
88#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
89#[serde(default)]
90pub struct VerificationContract {
91    pub source_node: Option<String>,
92    pub summary: Option<String>,
93    pub command: Option<String>,
94    pub expect_status: Option<i64>,
95    pub assert_text: Option<String>,
96    pub expect_text: Option<String>,
97    pub required_identifiers: Vec<String>,
98    pub required_paths: Vec<String>,
99    pub required_text: Vec<String>,
100    pub notes: Vec<String>,
101    pub checks: Vec<VerificationRequirement>,
102}
103
104impl VerificationContract {
105    fn is_empty(&self) -> bool {
106        self.summary.is_none()
107            && self.command.is_none()
108            && self.expect_status.is_none()
109            && self.assert_text.is_none()
110            && self.expect_text.is_none()
111            && self.required_identifiers.is_empty()
112            && self.required_paths.is_empty()
113            && self.required_text.is_empty()
114            && self.notes.is_empty()
115            && self.checks.is_empty()
116    }
117}
118
119fn push_unique_string(values: &mut Vec<String>, value: &str) {
120    let trimmed = value.trim();
121    if trimmed.is_empty() {
122        return;
123    }
124    if !values.iter().any(|existing| existing == trimmed) {
125        values.push(trimmed.to_string());
126    }
127}
128
129fn push_unique_requirement(
130    values: &mut Vec<VerificationRequirement>,
131    kind: &str,
132    value: &str,
133    note: Option<&str>,
134) {
135    let trimmed_kind = kind.trim();
136    let trimmed_value = value.trim();
137    let trimmed_note = note
138        .map(str::trim)
139        .filter(|candidate| !candidate.is_empty())
140        .map(|candidate| candidate.to_string());
141    if trimmed_kind.is_empty() || trimmed_value.is_empty() {
142        return;
143    }
144    let candidate = VerificationRequirement {
145        kind: trimmed_kind.to_string(),
146        value: trimmed_value.to_string(),
147        note: trimmed_note,
148    };
149    if !values.iter().any(|existing| existing == &candidate) {
150        values.push(candidate);
151    }
152}
153
154fn json_string_list(value: Option<&serde_json::Value>) -> Vec<String> {
155    match value {
156        Some(serde_json::Value::String(text)) => {
157            let mut values = Vec::new();
158            push_unique_string(&mut values, text);
159            values
160        }
161        Some(serde_json::Value::Array(items)) => {
162            let mut values = Vec::new();
163            for item in items {
164                if let Some(text) = item.as_str() {
165                    push_unique_string(&mut values, text);
166                }
167            }
168            values
169        }
170        _ => Vec::new(),
171    }
172}
173
174fn merge_verification_requirement_list(
175    target: &mut Vec<VerificationRequirement>,
176    value: Option<&serde_json::Value>,
177) {
178    let Some(items) = value.and_then(|raw| raw.as_array()) else {
179        return;
180    };
181    for item in items {
182        let Some(object) = item.as_object() else {
183            continue;
184        };
185        let kind = object
186            .get("kind")
187            .and_then(|value| value.as_str())
188            .unwrap_or_default();
189        let value = object
190            .get("value")
191            .and_then(|value| value.as_str())
192            .unwrap_or_default();
193        let note = object
194            .get("note")
195            .or_else(|| object.get("description"))
196            .or_else(|| object.get("reason"))
197            .and_then(|value| value.as_str());
198        push_unique_requirement(target, kind, value, note);
199    }
200}
201
202fn merge_verification_contract_fields(
203    target: &mut VerificationContract,
204    object: &serde_json::Map<String, serde_json::Value>,
205) {
206    if target.summary.is_none() {
207        target.summary = object
208            .get("summary")
209            .and_then(|value| value.as_str())
210            .map(str::trim)
211            .filter(|value| !value.is_empty())
212            .map(|value| value.to_string());
213    }
214    if target.command.is_none() {
215        target.command = object
216            .get("command")
217            .and_then(|value| value.as_str())
218            .map(str::trim)
219            .filter(|value| !value.is_empty())
220            .map(|value| value.to_string());
221    }
222    if target.expect_status.is_none() {
223        target.expect_status = object.get("expect_status").and_then(|value| value.as_i64());
224    }
225    if target.assert_text.is_none() {
226        target.assert_text = object
227            .get("assert_text")
228            .and_then(|value| value.as_str())
229            .map(str::trim)
230            .filter(|value| !value.is_empty())
231            .map(|value| value.to_string());
232    }
233    if target.expect_text.is_none() {
234        target.expect_text = object
235            .get("expect_text")
236            .and_then(|value| value.as_str())
237            .map(str::trim)
238            .filter(|value| !value.is_empty())
239            .map(|value| value.to_string());
240    }
241
242    for value in json_string_list(
243        object
244            .get("required_identifiers")
245            .or_else(|| object.get("identifiers")),
246    ) {
247        push_unique_string(&mut target.required_identifiers, &value);
248    }
249    for value in json_string_list(object.get("required_paths").or_else(|| object.get("paths"))) {
250        push_unique_string(&mut target.required_paths, &value);
251    }
252    for value in json_string_list(
253        object
254            .get("required_text")
255            .or_else(|| object.get("exact_text"))
256            .or_else(|| object.get("required_strings")),
257    ) {
258        push_unique_string(&mut target.required_text, &value);
259    }
260    for value in json_string_list(object.get("notes")) {
261        push_unique_string(&mut target.notes, &value);
262    }
263    merge_verification_requirement_list(&mut target.checks, object.get("checks"));
264}
265
266fn load_verification_contract_file(path: &str) -> Result<serde_json::Value, VmError> {
267    let resolved = crate::stdlib::process::resolve_source_asset_path(path);
268    let contents = std::fs::read_to_string(&resolved).map_err(|error| {
269        VmError::Runtime(format!(
270            "workflow verification contract read failed for {}: {error}",
271            resolved.display()
272        ))
273    })?;
274    serde_json::from_str(&contents).map_err(|error| {
275        VmError::Runtime(format!(
276            "workflow verification contract parse failed for {}: {error}",
277            resolved.display()
278        ))
279    })
280}
281
282fn resolve_verification_contract_path(
283    verify: &serde_json::Map<String, serde_json::Value>,
284) -> Result<Option<serde_json::Value>, VmError> {
285    let Some(path) = verify
286        .get("contract_path")
287        .or_else(|| verify.get("verification_contract_path"))
288        .and_then(|value| value.as_str())
289        .map(str::trim)
290        .filter(|value| !value.is_empty())
291    else {
292        return Ok(None);
293    };
294    Ok(Some(load_verification_contract_file(path)?))
295}
296
297pub fn verification_contract_from_verify(
298    node_id: &str,
299    verify: Option<&serde_json::Value>,
300) -> Result<Option<VerificationContract>, VmError> {
301    let Some(verify_object) = verify.and_then(|value| value.as_object()) else {
302        return Ok(None);
303    };
304
305    let mut contract = VerificationContract {
306        source_node: Some(node_id.to_string()),
307        ..Default::default()
308    };
309
310    if let Some(file_contract) = resolve_verification_contract_path(verify_object)? {
311        let Some(object) = file_contract.as_object() else {
312            return Err(VmError::Runtime(
313                "workflow verification contract file must parse to a JSON object".to_string(),
314            ));
315        };
316        merge_verification_contract_fields(&mut contract, object);
317    }
318
319    if let Some(inline_contract) = verify_object.get("contract") {
320        let Some(object) = inline_contract.as_object() else {
321            return Err(VmError::Runtime(
322                "workflow verify.contract must be an object".to_string(),
323            ));
324        };
325        merge_verification_contract_fields(&mut contract, object);
326    }
327
328    merge_verification_contract_fields(&mut contract, verify_object);
329
330    if let Some(assert_text) = contract.assert_text.clone() {
331        push_unique_requirement(
332            &mut contract.checks,
333            "visible_text_contains",
334            &assert_text,
335            Some("verify stage requires visible output to contain this text"),
336        );
337    }
338    if let Some(expect_text) = contract.expect_text.clone() {
339        push_unique_requirement(
340            &mut contract.checks,
341            "combined_output_contains",
342            &expect_text,
343            Some("verify command requires combined stdout/stderr to contain this text"),
344        );
345    }
346    if let Some(expect_status) = contract.expect_status {
347        push_unique_requirement(
348            &mut contract.checks,
349            "expect_status",
350            &expect_status.to_string(),
351            Some("verify command exit status must match exactly"),
352        );
353    }
354    for identifier in contract.required_identifiers.clone() {
355        push_unique_requirement(
356            &mut contract.checks,
357            "identifier",
358            &identifier,
359            Some("use this exact identifier spelling"),
360        );
361    }
362    for path in contract.required_paths.clone() {
363        push_unique_requirement(
364            &mut contract.checks,
365            "path",
366            &path,
367            Some("preserve this exact path"),
368        );
369    }
370    for text in contract.required_text.clone() {
371        push_unique_requirement(
372            &mut contract.checks,
373            "text",
374            &text,
375            Some("required exact text or wiring snippet"),
376        );
377    }
378
379    if contract.is_empty() {
380        return Ok(None);
381    }
382    Ok(Some(contract))
383}
384
385fn push_unique_contract(values: &mut Vec<VerificationContract>, candidate: VerificationContract) {
386    if !values.iter().any(|existing| existing == &candidate) {
387        values.push(candidate);
388    }
389}
390
391pub fn workflow_verification_contracts(
392    graph: &WorkflowGraph,
393) -> Result<Vec<VerificationContract>, VmError> {
394    let mut contracts = Vec::new();
395    for (node_id, node) in &graph.nodes {
396        if let Some(contract) = verification_contract_from_verify(node_id, node.verify.as_ref())? {
397            push_unique_contract(&mut contracts, contract);
398        }
399    }
400    Ok(contracts)
401}
402
403pub fn inject_workflow_verification_contracts(
404    node: &mut WorkflowNode,
405    contracts: &[VerificationContract],
406) {
407    if contracts.is_empty() {
408        return;
409    }
410    node.metadata.insert(
411        WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY.to_string(),
412        serde_json::to_value(contracts).unwrap_or_default(),
413    );
414}
415
416pub fn stage_verification_contracts(
417    node_id: &str,
418    node: &WorkflowNode,
419) -> Result<Vec<VerificationContract>, VmError> {
420    let mut contracts = node
421        .metadata
422        .get(WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY)
423        .cloned()
424        .map(|value| {
425            serde_json::from_value::<Vec<VerificationContract>>(value).map_err(|error| {
426                VmError::Runtime(format!(
427                    "workflow stage {node_id} verification contract metadata parse failed: {error}"
428                ))
429            })
430        })
431        .transpose()?
432        .unwrap_or_default();
433
434    if let Some(local_contract) = verification_contract_from_verify(node_id, node.verify.as_ref())?
435    {
436        push_unique_contract(&mut contracts, local_contract);
437    }
438    Ok(contracts)
439}
440
441pub fn workflow_tool_names(value: &serde_json::Value) -> Vec<String> {
442    match value {
443        serde_json::Value::Null => Vec::new(),
444        serde_json::Value::Array(items) => items
445            .iter()
446            .filter_map(|item| match item {
447                serde_json::Value::Object(map) => map
448                    .get("name")
449                    .and_then(|value| value.as_str())
450                    .filter(|name| !name.is_empty())
451                    .map(|name| name.to_string()),
452                _ => None,
453            })
454            .collect(),
455        serde_json::Value::Object(map) => {
456            if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
457                return map
458                    .get("tools")
459                    .map(workflow_tool_names)
460                    .unwrap_or_default();
461            }
462            map.get("name")
463                .and_then(|value| value.as_str())
464                .filter(|name| !name.is_empty())
465                .map(|name| vec![name.to_string()])
466                .unwrap_or_default()
467        }
468        _ => Vec::new(),
469    }
470}
471
472fn max_side_effect_level(levels: impl Iterator<Item = String>) -> Option<String> {
473    fn rank(v: &str) -> usize {
474        match v {
475            "none" => 0,
476            "read_only" => 1,
477            "workspace_write" => 2,
478            "process_exec" => 3,
479            "network" => 4,
480            _ => 5,
481        }
482    }
483    levels.max_by_key(|level| rank(level))
484}
485
486fn parse_tool_kind(value: Option<&serde_json::Value>) -> ToolKind {
487    match value.and_then(|v| v.as_str()).unwrap_or("") {
488        "read" => ToolKind::Read,
489        "edit" => ToolKind::Edit,
490        "delete" => ToolKind::Delete,
491        "move" => ToolKind::Move,
492        "search" => ToolKind::Search,
493        "execute" => ToolKind::Execute,
494        "think" => ToolKind::Think,
495        "fetch" => ToolKind::Fetch,
496        _ => ToolKind::Other,
497    }
498}
499
500fn parse_tool_annotations(map: &serde_json::Map<String, serde_json::Value>) -> ToolAnnotations {
501    let policy = map
502        .get("policy")
503        .and_then(|value| value.as_object())
504        .cloned()
505        .unwrap_or_default();
506
507    let capabilities = policy
508        .get("capabilities")
509        .and_then(|value| value.as_object())
510        .map(|caps| {
511            caps.iter()
512                .map(|(capability, ops)| {
513                    let values = ops
514                        .as_array()
515                        .map(|items| {
516                            items
517                                .iter()
518                                .filter_map(|item| item.as_str().map(|s| s.to_string()))
519                                .collect::<Vec<_>>()
520                        })
521                        .unwrap_or_default();
522                    (capability.clone(), values)
523                })
524                .collect::<BTreeMap<_, _>>()
525        })
526        .unwrap_or_default();
527
528    // Accept both the structured `policy.arg_schema` object and the legacy
529    // flat fields on `policy` so pipelines can migrate gradually.
530    let arg_schema = if let Some(schema) = policy.get("arg_schema") {
531        serde_json::from_value::<ToolArgSchema>(schema.clone()).unwrap_or_default()
532    } else {
533        ToolArgSchema {
534            path_params: policy
535                .get("path_params")
536                .and_then(|value| value.as_array())
537                .map(|items| {
538                    items
539                        .iter()
540                        .filter_map(|item| item.as_str().map(|s| s.to_string()))
541                        .collect::<Vec<_>>()
542                })
543                .unwrap_or_default(),
544            arg_aliases: policy
545                .get("arg_aliases")
546                .and_then(|value| value.as_object())
547                .map(|aliases| {
548                    aliases
549                        .iter()
550                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
551                        .collect::<BTreeMap<_, _>>()
552                })
553                .unwrap_or_default(),
554            required: policy
555                .get("required")
556                .and_then(|value| value.as_array())
557                .map(|items| {
558                    items
559                        .iter()
560                        .filter_map(|item| item.as_str().map(|s| s.to_string()))
561                        .collect::<Vec<_>>()
562                })
563                .unwrap_or_default(),
564        }
565    };
566
567    let kind = parse_tool_kind(policy.get("kind"));
568    let side_effect_level = policy
569        .get("side_effect_level")
570        .and_then(|value| value.as_str())
571        .map(SideEffectLevel::parse)
572        .unwrap_or_default();
573
574    ToolAnnotations {
575        kind,
576        side_effect_level,
577        arg_schema,
578        capabilities,
579    }
580}
581
582pub fn workflow_tool_annotations(value: &serde_json::Value) -> BTreeMap<String, ToolAnnotations> {
583    match value {
584        serde_json::Value::Null => BTreeMap::new(),
585        serde_json::Value::Array(items) => items
586            .iter()
587            .filter_map(|item| match item {
588                serde_json::Value::Object(map) => map
589                    .get("name")
590                    .and_then(|value| value.as_str())
591                    .filter(|name| !name.is_empty())
592                    .map(|name| (name.to_string(), parse_tool_annotations(map))),
593                _ => None,
594            })
595            .collect(),
596        serde_json::Value::Object(map) => {
597            if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
598                return map
599                    .get("tools")
600                    .map(workflow_tool_annotations)
601                    .unwrap_or_default();
602            }
603            map.get("name")
604                .and_then(|value| value.as_str())
605                .filter(|name| !name.is_empty())
606                .map(|name| {
607                    let mut annotations = BTreeMap::new();
608                    annotations.insert(name.to_string(), parse_tool_annotations(map));
609                    annotations
610                })
611                .unwrap_or_default()
612        }
613        _ => BTreeMap::new(),
614    }
615}
616
617pub fn workflow_tool_policy_from_tools(value: &serde_json::Value) -> CapabilityPolicy {
618    let tools = workflow_tool_names(value);
619    let tool_annotations = workflow_tool_annotations(value);
620    let mut capabilities: BTreeMap<String, Vec<String>> = BTreeMap::new();
621    for annotations in tool_annotations.values() {
622        for (capability, ops) in &annotations.capabilities {
623            let entry = capabilities.entry(capability.clone()).or_default();
624            for op in ops {
625                if !entry.contains(op) {
626                    entry.push(op.clone());
627                }
628            }
629            entry.sort();
630        }
631    }
632    let side_effect_level = max_side_effect_level(
633        tool_annotations
634            .values()
635            .map(|annotations| annotations.side_effect_level.as_str().to_string())
636            .filter(|level| level != "none"),
637    );
638    CapabilityPolicy {
639        tools,
640        capabilities,
641        workspace_roots: Vec::new(),
642        side_effect_level,
643        recursion_limit: None,
644        tool_arg_constraints: Vec::new(),
645        tool_annotations,
646    }
647}
648
649#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
650#[serde(default)]
651pub struct WorkflowEdge {
652    pub from: String,
653    pub to: String,
654    pub branch: Option<String>,
655    pub label: Option<String>,
656}
657
658#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
659#[serde(default)]
660pub struct WorkflowGraph {
661    #[serde(rename = "_type")]
662    pub type_name: String,
663    pub id: String,
664    pub name: Option<String>,
665    pub version: usize,
666    pub entry: String,
667    pub nodes: BTreeMap<String, WorkflowNode>,
668    pub edges: Vec<WorkflowEdge>,
669    pub capability_policy: CapabilityPolicy,
670    pub approval_policy: super::ToolApprovalPolicy,
671    pub metadata: BTreeMap<String, serde_json::Value>,
672    pub audit_log: Vec<WorkflowAuditEntry>,
673}
674
675#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
676#[serde(default)]
677pub struct WorkflowAuditEntry {
678    pub id: String,
679    pub op: String,
680    pub node_id: Option<String>,
681    pub timestamp: String,
682    pub reason: Option<String>,
683    pub metadata: BTreeMap<String, serde_json::Value>,
684}
685
686#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
687#[serde(default)]
688pub struct WorkflowValidationReport {
689    pub valid: bool,
690    pub errors: Vec<String>,
691    pub warnings: Vec<String>,
692    pub reachable_nodes: Vec<String>,
693}
694
695pub fn parse_workflow_node_value(value: &VmValue, label: &str) -> Result<WorkflowNode, VmError> {
696    let mut node: WorkflowNode = super::parse_json_payload(vm_value_to_json(value), label)?;
697    let dict = value.as_dict();
698    node.raw_tools = dict.and_then(|d| d.get("tools")).cloned();
699    node.raw_auto_compact = dict.and_then(|d| d.get("auto_compact")).cloned();
700    node.raw_model_policy = dict.and_then(|d| d.get("model_policy")).cloned();
701    Ok(node)
702}
703
704pub fn parse_workflow_node_json(
705    json: serde_json::Value,
706    label: &str,
707) -> Result<WorkflowNode, VmError> {
708    super::parse_json_payload(json, label)
709}
710
711pub fn parse_workflow_edge_json(
712    json: serde_json::Value,
713    label: &str,
714) -> Result<WorkflowEdge, VmError> {
715    super::parse_json_payload(json, label)
716}
717
718pub fn normalize_workflow_value(value: &VmValue) -> Result<WorkflowGraph, VmError> {
719    let mut graph: WorkflowGraph = super::parse_json_value(value)?;
720    let as_dict = value.as_dict().cloned().unwrap_or_default();
721
722    if graph.nodes.is_empty() {
723        for key in ["act", "verify", "repair"] {
724            if let Some(node_value) = as_dict.get(key) {
725                let mut node = parse_workflow_node_value(node_value, "orchestration")?;
726                let raw_node = node_value.as_dict().cloned().unwrap_or_default();
727                node.id = Some(key.to_string());
728                if node.kind.is_empty() {
729                    node.kind = if key == "verify" {
730                        "verify".to_string()
731                    } else {
732                        "stage".to_string()
733                    };
734                }
735                if node.model_policy.provider.is_none() {
736                    node.model_policy.provider = as_dict
737                        .get("provider")
738                        .map(|value| value.display())
739                        .filter(|value| !value.is_empty());
740                }
741                if node.model_policy.model.is_none() {
742                    node.model_policy.model = as_dict
743                        .get("model")
744                        .map(|value| value.display())
745                        .filter(|value| !value.is_empty());
746                }
747                if node.model_policy.model_tier.is_none() {
748                    node.model_policy.model_tier = as_dict
749                        .get("model_tier")
750                        .or_else(|| as_dict.get("tier"))
751                        .map(|value| value.display())
752                        .filter(|value| !value.is_empty());
753                }
754                if node.model_policy.temperature.is_none() {
755                    node.model_policy.temperature = as_dict.get("temperature").and_then(|value| {
756                        if let VmValue::Float(number) = value {
757                            Some(*number)
758                        } else {
759                            value.as_int().map(|number| number as f64)
760                        }
761                    });
762                }
763                if node.model_policy.max_tokens.is_none() {
764                    node.model_policy.max_tokens =
765                        as_dict.get("max_tokens").and_then(|value| value.as_int());
766                }
767                if node.mode.is_none() {
768                    node.mode = as_dict
769                        .get("mode")
770                        .map(|value| value.display())
771                        .filter(|value| !value.is_empty());
772                }
773                if node.done_sentinel.is_none() {
774                    node.done_sentinel = as_dict
775                        .get("done_sentinel")
776                        .map(|value| value.display())
777                        .filter(|value| !value.is_empty());
778                }
779                if key == "verify"
780                    && node.verify.is_none()
781                    && (raw_node.contains_key("assert_text")
782                        || raw_node.contains_key("command")
783                        || raw_node.contains_key("expect_status")
784                        || raw_node.contains_key("expect_text"))
785                {
786                    node.verify = Some(serde_json::json!({
787                        "assert_text": raw_node.get("assert_text").map(vm_value_to_json),
788                        "command": raw_node.get("command").map(vm_value_to_json),
789                        "expect_status": raw_node.get("expect_status").map(vm_value_to_json),
790                        "expect_text": raw_node.get("expect_text").map(vm_value_to_json),
791                    }));
792                }
793                graph.nodes.insert(key.to_string(), node);
794            }
795        }
796        if graph.entry.is_empty() && graph.nodes.contains_key("act") {
797            graph.entry = "act".to_string();
798        }
799        if graph.edges.is_empty() && graph.nodes.contains_key("act") {
800            if graph.nodes.contains_key("verify") {
801                graph.edges.push(WorkflowEdge {
802                    from: "act".to_string(),
803                    to: "verify".to_string(),
804                    branch: None,
805                    label: None,
806                });
807            }
808            if graph.nodes.contains_key("repair") {
809                graph.edges.push(WorkflowEdge {
810                    from: "verify".to_string(),
811                    to: "repair".to_string(),
812                    branch: Some("failed".to_string()),
813                    label: None,
814                });
815                graph.edges.push(WorkflowEdge {
816                    from: "repair".to_string(),
817                    to: "verify".to_string(),
818                    branch: Some("retry".to_string()),
819                    label: None,
820                });
821            }
822        }
823    }
824
825    if graph.type_name.is_empty() {
826        graph.type_name = "workflow_graph".to_string();
827    }
828    if graph.id.is_empty() {
829        graph.id = new_id("workflow");
830    }
831    if graph.version == 0 {
832        graph.version = 1;
833    }
834    if graph.entry.is_empty() {
835        graph.entry = graph
836            .nodes
837            .keys()
838            .next()
839            .cloned()
840            .unwrap_or_else(|| "act".to_string());
841    }
842    for (node_id, node) in &mut graph.nodes {
843        if node.raw_tools.is_none() {
844            node.raw_tools = as_dict
845                .get("nodes")
846                .and_then(|nodes| nodes.as_dict())
847                .and_then(|nodes| nodes.get(node_id))
848                .and_then(|node_value| node_value.as_dict())
849                .and_then(|raw_node| raw_node.get("tools"))
850                .cloned();
851        }
852        if node.id.is_none() {
853            node.id = Some(node_id.clone());
854        }
855        if node.kind.is_empty() {
856            node.kind = "stage".to_string();
857        }
858        if node.join_policy.strategy.is_empty() {
859            node.join_policy.strategy = "all".to_string();
860        }
861        if node.reduce_policy.strategy.is_empty() {
862            node.reduce_policy.strategy = "concat".to_string();
863        }
864        if node.output_contract.output_kinds.is_empty() {
865            node.output_contract.output_kinds = vec![match node.kind.as_str() {
866                "verify" => "verification_result".to_string(),
867                "reduce" => node
868                    .reduce_policy
869                    .output_kind
870                    .clone()
871                    .unwrap_or_else(|| "summary".to_string()),
872                "map" => node
873                    .map_policy
874                    .output_kind
875                    .clone()
876                    .unwrap_or_else(|| "artifact".to_string()),
877                "escalation" => "plan".to_string(),
878                _ => "artifact".to_string(),
879            }];
880        }
881        if node.retry_policy.max_attempts == 0 {
882            node.retry_policy.max_attempts = 1;
883        }
884    }
885    Ok(graph)
886}
887
888pub fn validate_workflow(
889    graph: &WorkflowGraph,
890    ceiling: Option<&CapabilityPolicy>,
891) -> WorkflowValidationReport {
892    let mut errors = Vec::new();
893    let mut warnings = Vec::new();
894
895    if !graph.nodes.contains_key(&graph.entry) {
896        errors.push(format!("entry node does not exist: {}", graph.entry));
897    }
898
899    let node_ids: BTreeSet<String> = graph.nodes.keys().cloned().collect();
900    for edge in &graph.edges {
901        if !node_ids.contains(&edge.from) {
902            errors.push(format!("edge.from references unknown node: {}", edge.from));
903        }
904        if !node_ids.contains(&edge.to) {
905            errors.push(format!("edge.to references unknown node: {}", edge.to));
906        }
907    }
908
909    let reachable_nodes = reachable_nodes(graph);
910    for node_id in &node_ids {
911        if !reachable_nodes.contains(node_id) {
912            warnings.push(format!("node is unreachable: {node_id}"));
913        }
914    }
915
916    for (node_id, node) in &graph.nodes {
917        let incoming = graph
918            .edges
919            .iter()
920            .filter(|edge| edge.to == *node_id)
921            .count();
922        let outgoing: Vec<&WorkflowEdge> = graph
923            .edges
924            .iter()
925            .filter(|edge| edge.from == *node_id)
926            .collect();
927        if let Some(min_inputs) = node.input_contract.min_inputs {
928            if let Some(max_inputs) = node.input_contract.max_inputs {
929                if min_inputs > max_inputs {
930                    errors.push(format!(
931                        "node {node_id}: input contract min_inputs exceeds max_inputs"
932                    ));
933                }
934            }
935        }
936        match node.kind.as_str() {
937            "condition" => {
938                let has_true = outgoing
939                    .iter()
940                    .any(|edge| edge.branch.as_deref() == Some("true"));
941                let has_false = outgoing
942                    .iter()
943                    .any(|edge| edge.branch.as_deref() == Some("false"));
944                if !has_true || !has_false {
945                    errors.push(format!(
946                        "node {node_id}: condition nodes require both 'true' and 'false' branch edges"
947                    ));
948                }
949            }
950            "fork" if outgoing.len() < 2 => {
951                errors.push(format!(
952                    "node {node_id}: fork nodes require at least two outgoing edges"
953                ));
954            }
955            "join" if incoming < 2 => {
956                warnings.push(format!(
957                    "node {node_id}: join node has fewer than two incoming edges"
958                ));
959            }
960            "map"
961                if node.map_policy.items.is_empty()
962                    && node.map_policy.item_artifact_kind.is_none()
963                    && node.input_contract.input_kinds.is_empty() =>
964            {
965                errors.push(format!(
966                    "node {node_id}: map nodes require items, item_artifact_kind, or input_contract.input_kinds"
967                ));
968            }
969            "reduce" if node.input_contract.input_kinds.is_empty() => {
970                warnings.push(format!(
971                    "node {node_id}: reduce node has no input_contract.input_kinds; it will consume all available artifacts"
972                ));
973            }
974            _ => {}
975        }
976    }
977
978    if let Some(ceiling) = ceiling {
979        if let Err(error) = ceiling.intersect(&graph.capability_policy) {
980            errors.push(error);
981        }
982        for (node_id, node) in &graph.nodes {
983            if let Err(error) = ceiling.intersect(&node.capability_policy) {
984                errors.push(format!("node {node_id}: {error}"));
985            }
986        }
987    }
988
989    WorkflowValidationReport {
990        valid: errors.is_empty(),
991        errors,
992        warnings,
993        reachable_nodes: reachable_nodes.into_iter().collect(),
994    }
995}
996
997fn reachable_nodes(graph: &WorkflowGraph) -> BTreeSet<String> {
998    let mut seen = BTreeSet::new();
999    let mut stack = vec![graph.entry.clone()];
1000    while let Some(node_id) = stack.pop() {
1001        if !seen.insert(node_id.clone()) {
1002            continue;
1003        }
1004        for edge in graph.edges.iter().filter(|edge| edge.from == node_id) {
1005            stack.push(edge.to.clone());
1006        }
1007    }
1008    seen
1009}
1010
1011/// Pick the session id a stage should run under. Prefers an explicit
1012/// `session_id` on the node's `model_policy` dict (so pipelines with
1013/// `agent_session_open` / `agent_session_fork` flowing through a graph
1014/// line up); falls back to a stable, node-derived id so multi-stage
1015/// graphs with no explicit session share a conversation across stages.
1016/// Per-stage skill registry. Per-node `model_policy.skills` takes
1017/// precedence over the workflow-level `run_options.skills` — authors
1018/// can scope a skill set to one stage without affecting siblings. When
1019/// neither is set, returns `None` so the agent loop runs without
1020/// skill matching (preserves pre-Gap-2 behavior for callers that
1021/// didn't opt in).
1022fn resolve_stage_skill_registry(node: &WorkflowNode) -> Option<VmValue> {
1023    let per_node = node
1024        .raw_model_policy
1025        .as_ref()
1026        .and_then(|v| v.as_dict())
1027        .and_then(|d| d.get("skills"))
1028        .cloned()
1029        .and_then(normalize_inline_registry);
1030    if per_node.is_some() {
1031        return per_node;
1032    }
1033    super::current_workflow_skill_context().and_then(|ctx| ctx.registry)
1034}
1035
1036/// Mirror of `resolve_stage_skill_registry` for the match config:
1037/// per-node `model_policy.skill_match` wins, falling back to the
1038/// workflow-level setting.
1039fn resolve_stage_skill_match(node: &WorkflowNode) -> crate::llm::SkillMatchConfig {
1040    let per_node = node
1041        .raw_model_policy
1042        .as_ref()
1043        .and_then(|v| v.as_dict())
1044        .and_then(|d| d.get("skill_match"))
1045        .and_then(|v| v.as_dict().cloned());
1046    if let Some(dict) = per_node {
1047        return crate::llm::parse_skill_match_config_dict(&dict);
1048    }
1049    super::current_workflow_skill_context()
1050        .and_then(|ctx| ctx.match_config)
1051        .and_then(|v| v.as_dict().cloned())
1052        .map(|d| crate::llm::parse_skill_match_config_dict(&d))
1053        .unwrap_or_default()
1054}
1055
1056/// Accept both a validated `skill_registry` dict and a bare list of
1057/// skill entries. The workflow-level parser in `register.rs` does the
1058/// same — we duplicate here so per-node `model_policy.skills` settings
1059/// (not routed through that parser) also benefit.
1060fn normalize_inline_registry(value: VmValue) -> Option<VmValue> {
1061    use std::collections::BTreeMap;
1062    use std::rc::Rc;
1063    match &value {
1064        VmValue::Dict(d)
1065            if d.get("_type")
1066                .map(|v| v.display() == "skill_registry")
1067                .unwrap_or(false) =>
1068        {
1069            Some(value)
1070        }
1071        VmValue::List(list) => {
1072            let mut dict = BTreeMap::new();
1073            dict.insert(
1074                "_type".to_string(),
1075                VmValue::String(Rc::from("skill_registry")),
1076            );
1077            dict.insert("skills".to_string(), VmValue::List(list.clone()));
1078            Some(VmValue::Dict(Rc::new(dict)))
1079        }
1080        _ => None,
1081    }
1082}
1083
1084fn resolve_node_session_id(node: &WorkflowNode) -> String {
1085    if let Some(explicit) = node
1086        .raw_model_policy
1087        .as_ref()
1088        .and_then(|v| v.as_dict())
1089        .and_then(|d| d.get("session_id"))
1090        .and_then(|v| match v {
1091            VmValue::String(s) if !s.trim().is_empty() => Some(s.to_string()),
1092            _ => None,
1093        })
1094    {
1095        return explicit;
1096    }
1097    format!("workflow_stage_{}", uuid::Uuid::now_v7())
1098}
1099
1100pub async fn execute_stage_node(
1101    node_id: &str,
1102    node: &WorkflowNode,
1103    task: &str,
1104    artifacts: &[ArtifactRecord],
1105) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
1106    let mut selection_policy = node.context_policy.clone();
1107    if selection_policy.include_kinds.is_empty() && !node.input_contract.input_kinds.is_empty() {
1108        selection_policy.include_kinds = node.input_contract.input_kinds.clone();
1109    }
1110    let selected = super::select_artifacts_adaptive(artifacts.to_vec(), &selection_policy);
1111    let rendered_context = super::render_artifacts_context(&selected, &node.context_policy);
1112    let verification_contracts = super::stage_verification_contracts(node_id, node)?;
1113    let rendered_verification = super::render_verification_context(&verification_contracts);
1114    let stage_session_id = resolve_node_session_id(node);
1115    if node.input_contract.require_transcript && !crate::agent_sessions::exists(&stage_session_id) {
1116        return Err(VmError::Runtime(format!(
1117            "workflow stage {node_id} requires an existing session \
1118             (call agent_session_open and feed session_id through model_policy \
1119             before entering this stage)"
1120        )));
1121    }
1122    if let Some(min_inputs) = node.input_contract.min_inputs {
1123        if selected.len() < min_inputs {
1124            return Err(VmError::Runtime(format!(
1125                "workflow stage {node_id} requires at least {min_inputs} input artifacts"
1126            )));
1127        }
1128    }
1129    if let Some(max_inputs) = node.input_contract.max_inputs {
1130        if selected.len() > max_inputs {
1131            return Err(VmError::Runtime(format!(
1132                "workflow stage {node_id} accepts at most {max_inputs} input artifacts"
1133            )));
1134        }
1135    }
1136    let prompt = super::render_workflow_prompt(
1137        task,
1138        node.task_label.as_deref(),
1139        &rendered_verification,
1140        &rendered_context,
1141    );
1142
1143    // Precedence for the tool-calling contract format:
1144    //   1. explicit `model_policy.tool_format` on the node
1145    //   2. `HARN_AGENT_TOOL_FORMAT` env override
1146    //   3. provider/model default
1147    // Mirrors the top-level agent_loop / llm_call resolution so workflow
1148    // authors can pin `tool_format: "native"` per-stage and have it
1149    // reach the inner agent loop.
1150    let tool_format = node
1151        .model_policy
1152        .tool_format
1153        .clone()
1154        .filter(|value| !value.trim().is_empty())
1155        .or_else(|| {
1156            std::env::var("HARN_AGENT_TOOL_FORMAT")
1157                .ok()
1158                .filter(|value| !value.trim().is_empty())
1159        })
1160        .unwrap_or_else(|| {
1161            let model = std::env::var("HARN_LLM_MODEL").unwrap_or_default();
1162            let provider = std::env::var("HARN_LLM_PROVIDER").unwrap_or_default();
1163            crate::llm_config::default_tool_format(&model, &provider)
1164        });
1165    let mut llm_result = if node.kind == "verify" {
1166        if let Some(command) = node
1167            .verify
1168            .as_ref()
1169            .and_then(|verify| verify.as_object())
1170            .and_then(|verify| verify.get("command"))
1171            .and_then(|value| value.as_str())
1172            .map(str::trim)
1173            .filter(|value| !value.is_empty())
1174        {
1175            let mut process = if cfg!(target_os = "windows") {
1176                let mut cmd = tokio::process::Command::new("cmd");
1177                cmd.arg("/C").arg(command);
1178                cmd
1179            } else {
1180                let mut cmd = tokio::process::Command::new("/bin/sh");
1181                cmd.arg("-lc").arg(command);
1182                cmd
1183            };
1184            process.stdin(std::process::Stdio::null());
1185            if let Some(context) = crate::stdlib::process::current_execution_context() {
1186                if let Some(cwd) = context.cwd.filter(|cwd| !cwd.is_empty()) {
1187                    process.current_dir(cwd);
1188                }
1189                if !context.env.is_empty() {
1190                    process.envs(context.env);
1191                }
1192            }
1193            let output = process
1194                .output()
1195                .await
1196                .map_err(|e| VmError::Runtime(format!("workflow verify exec failed: {e}")))?;
1197            let stdout = String::from_utf8_lossy(&output.stdout).to_string();
1198            let stderr = String::from_utf8_lossy(&output.stderr).to_string();
1199            let combined = if stderr.is_empty() {
1200                stdout.clone()
1201            } else if stdout.is_empty() {
1202                stderr.clone()
1203            } else {
1204                format!("{stdout}\n{stderr}")
1205            };
1206            serde_json::json!({
1207                "status": "completed",
1208                "text": combined,
1209                "visible_text": combined,
1210                "command": command,
1211                "stdout": stdout,
1212                "stderr": stderr,
1213                "exit_status": output.status.code().unwrap_or(-1),
1214                "success": output.status.success(),
1215            })
1216        } else {
1217            serde_json::json!({
1218                "status": "completed",
1219                "text": "",
1220                "visible_text": "",
1221            })
1222        }
1223    } else {
1224        let mut options = BTreeMap::new();
1225        if let Some(provider) = &node.model_policy.provider {
1226            options.insert(
1227                "provider".to_string(),
1228                VmValue::String(Rc::from(provider.clone())),
1229            );
1230        }
1231        if let Some(model) = &node.model_policy.model {
1232            options.insert(
1233                "model".to_string(),
1234                VmValue::String(Rc::from(model.clone())),
1235            );
1236        }
1237        if let Some(model_tier) = &node.model_policy.model_tier {
1238            options.insert(
1239                "model_tier".to_string(),
1240                VmValue::String(Rc::from(model_tier.clone())),
1241            );
1242        }
1243        if let Some(temperature) = node.model_policy.temperature {
1244            options.insert("temperature".to_string(), VmValue::Float(temperature));
1245        }
1246        if let Some(max_tokens) = node.model_policy.max_tokens {
1247            options.insert("max_tokens".to_string(), VmValue::Int(max_tokens));
1248        }
1249        let tool_names = workflow_tool_names(&node.tools);
1250        let tools_value = node.raw_tools.clone().or_else(|| {
1251            if matches!(node.tools, serde_json::Value::Null) {
1252                None
1253            } else {
1254                Some(crate::stdlib::json_to_vm_value(&node.tools))
1255            }
1256        });
1257        if tools_value.is_some() && !tool_names.is_empty() {
1258            options.insert("tools".to_string(), tools_value.unwrap_or(VmValue::Nil));
1259        }
1260        options.insert(
1261            "session_id".to_string(),
1262            VmValue::String(Rc::from(stage_session_id.clone())),
1263        );
1264
1265        let args = vec![
1266            VmValue::String(Rc::from(prompt.clone())),
1267            node.system
1268                .clone()
1269                .map(|s| VmValue::String(Rc::from(s)))
1270                .unwrap_or(VmValue::Nil),
1271            VmValue::Dict(Rc::new(options)),
1272        ];
1273        let mut opts = extract_llm_options(&args)?;
1274
1275        if node.mode.as_deref() == Some("agent") || !tool_names.is_empty() {
1276            let tool_policy = workflow_tool_policy_from_tools(&node.tools);
1277            let effective_policy = tool_policy
1278                .intersect(&node.capability_policy)
1279                .map_err(VmError::Runtime)?;
1280            let auto_compact = if node.auto_compact.enabled {
1281                let mut ac = crate::orchestration::AutoCompactConfig::default();
1282                if let Some(v) = node.auto_compact.token_threshold {
1283                    ac.token_threshold = v;
1284                }
1285                if let Some(v) = node.auto_compact.tool_output_max_chars {
1286                    ac.tool_output_max_chars = v;
1287                }
1288                if let Some(ref strategy) = node.auto_compact.compact_strategy {
1289                    if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
1290                        ac.compact_strategy = s;
1291                    }
1292                }
1293                if let Some(v) = node.auto_compact.hard_limit_tokens {
1294                    ac.hard_limit_tokens = Some(v);
1295                }
1296                if let Some(ref strategy) = node.auto_compact.hard_limit_strategy {
1297                    if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
1298                        ac.hard_limit_strategy = s;
1299                    }
1300                }
1301                // Closure fields can't round-trip through serde, so extract them
1302                // directly from the raw VmValue dict.
1303                if let Some(ref raw_ac) = node.raw_auto_compact {
1304                    if let Some(dict) = raw_ac.as_dict() {
1305                        if let Some(cb) = dict.get("compress_callback") {
1306                            ac.compress_callback = Some(cb.clone());
1307                        }
1308                        if let Some(cb) = dict.get("mask_callback") {
1309                            ac.mask_callback = Some(cb.clone());
1310                        }
1311                        if let Some(cb) = dict.get("custom_compactor") {
1312                            ac.custom_compactor = Some(cb.clone());
1313                        }
1314                    }
1315                }
1316                {
1317                    let user_specified_threshold = node.auto_compact.token_threshold.is_some();
1318                    let user_specified_hard_limit = node.auto_compact.hard_limit_tokens.is_some();
1319                    crate::llm::api::adapt_auto_compact_to_provider(
1320                        &mut ac,
1321                        user_specified_threshold,
1322                        user_specified_hard_limit,
1323                        &opts.provider,
1324                        &opts.model,
1325                        &opts.api_key,
1326                    )
1327                    .await;
1328                }
1329                Some(ac)
1330            } else {
1331                None
1332            };
1333            crate::llm::run_agent_loop_internal(
1334                &mut opts,
1335                crate::llm::AgentLoopConfig {
1336                    persistent: true,
1337                    max_iterations: node.model_policy.max_iterations.unwrap_or(16),
1338                    max_nudges: node.model_policy.max_nudges.unwrap_or(3),
1339                    nudge: node.model_policy.nudge.clone(),
1340                    done_sentinel: node.done_sentinel.clone(),
1341                    break_unless_phase: None,
1342                    tool_retries: 0,
1343                    tool_backoff_ms: 1000,
1344                    tool_format: tool_format.clone(),
1345                    auto_compact,
1346                    policy: Some(effective_policy),
1347                    approval_policy: Some(node.approval_policy.clone()),
1348                    daemon: false,
1349                    daemon_config: Default::default(),
1350                    llm_retries: 2,
1351                    llm_backoff_ms: 2000,
1352                    token_budget: None,
1353                    exit_when_verified: node.exit_when_verified,
1354                    loop_detect_warn: 2,
1355                    loop_detect_block: 3,
1356                    loop_detect_skip: 4,
1357                    tool_examples: node.model_policy.tool_examples.clone(),
1358                    turn_policy: node.model_policy.turn_policy.clone(),
1359                    stop_after_successful_tools: node
1360                        .model_policy
1361                        .stop_after_successful_tools
1362                        .clone(),
1363                    require_successful_tools: node.model_policy.require_successful_tools.clone(),
1364                    // Use the same session id resolved for the stage so
1365                    // agent_subscribe handlers keyed on it, and session
1366                    // storage lookups in the agent loop, stay consistent.
1367                    session_id: stage_session_id.clone(),
1368                    event_sink: None,
1369                    // Seed from the stage's explicit deliverables/ledger so the
1370                    // graph carries a task-wide plan through map branches and
1371                    // nested stages. Empty ledger means no gate.
1372                    task_ledger: node
1373                        .raw_model_policy
1374                        .as_ref()
1375                        .and_then(|v| v.as_dict())
1376                        .and_then(|d| d.get("task_ledger"))
1377                        .map(crate::llm::helpers::vm_value_to_json)
1378                        .and_then(|json| serde_json::from_value(json).ok())
1379                        .unwrap_or_default(),
1380                    post_turn_callback: node
1381                        .raw_model_policy
1382                        .as_ref()
1383                        .and_then(|v| v.as_dict())
1384                        .and_then(|d| d.get("post_turn_callback"))
1385                        .filter(|v| matches!(v, crate::value::VmValue::Closure(_)))
1386                        .cloned(),
1387                    // Inherit the workflow-level skill wiring installed
1388                    // by `workflow_execute`. Per-node `model_policy.skills`
1389                    // (optional) overrides, letting authors scope a skill
1390                    // set to one stage without affecting siblings. Empty
1391                    // thread-local = no skills configured (direct
1392                    // `execute_stage_node` callers outside a workflow).
1393                    skill_registry: resolve_stage_skill_registry(node),
1394                    skill_match: resolve_stage_skill_match(node),
1395                    working_files: Vec::new(),
1396                },
1397            )
1398            .await?
1399        } else {
1400            let result = vm_call_llm_full(&opts).await?;
1401            crate::llm::agent_loop_result_from_llm(&result, opts)
1402        }
1403    };
1404    if let Some(payload) = llm_result.as_object_mut() {
1405        payload.insert("prompt".to_string(), serde_json::json!(prompt));
1406        payload.insert(
1407            "system_prompt".to_string(),
1408            serde_json::json!(node.system.clone().unwrap_or_default()),
1409        );
1410        payload.insert(
1411            "rendered_context".to_string(),
1412            serde_json::json!(rendered_context),
1413        );
1414        if !verification_contracts.is_empty() {
1415            payload.insert(
1416                "verification_contracts".to_string(),
1417                serde_json::to_value(&verification_contracts).unwrap_or_default(),
1418            );
1419            payload.insert(
1420                "rendered_verification_context".to_string(),
1421                serde_json::json!(rendered_verification),
1422            );
1423        }
1424        payload.insert(
1425            "selected_artifact_ids".to_string(),
1426            serde_json::json!(selected
1427                .iter()
1428                .map(|artifact| artifact.id.clone())
1429                .collect::<Vec<_>>()),
1430        );
1431        payload.insert(
1432            "selected_artifact_titles".to_string(),
1433            serde_json::json!(selected
1434                .iter()
1435                .map(|artifact| artifact.title.clone())
1436                .collect::<Vec<_>>()),
1437        );
1438        payload.insert(
1439            "tool_calling_mode".to_string(),
1440            serde_json::json!(tool_format.clone()),
1441        );
1442    }
1443
1444    let visible_text = llm_result["text"].as_str().unwrap_or_default().to_string();
1445    // Non-LLM stages (verify command, condition, fork, join, ...) don't produce
1446    // a "transcript" field; fall back to the input so cross-stage conversation
1447    // state survives transitions.
1448    let result_transcript = llm_result
1449        .get("transcript")
1450        .cloned()
1451        .map(|value| crate::stdlib::json_to_vm_value(&value));
1452    let session_transcript = crate::agent_sessions::snapshot(&stage_session_id);
1453    let transcript = result_transcript
1454        .or(session_transcript)
1455        .and_then(|value| redact_transcript_visibility(&value, node.output_visibility.as_deref()));
1456    let output_kind = node
1457        .output_contract
1458        .output_kinds
1459        .first()
1460        .cloned()
1461        .unwrap_or_else(|| {
1462            if node.kind == "verify" {
1463                "verification_result".to_string()
1464            } else {
1465                "artifact".to_string()
1466            }
1467        });
1468    let mut metadata = BTreeMap::new();
1469    metadata.insert(
1470        "input_artifact_ids".to_string(),
1471        serde_json::json!(selected
1472            .iter()
1473            .map(|artifact| artifact.id.clone())
1474            .collect::<Vec<_>>()),
1475    );
1476    metadata.insert("node_kind".to_string(), serde_json::json!(node.kind));
1477    let artifact = ArtifactRecord {
1478        type_name: "artifact".to_string(),
1479        id: new_id("artifact"),
1480        kind: output_kind,
1481        title: Some(format!("stage {node_id} output")),
1482        text: Some(visible_text),
1483        data: Some(llm_result.clone()),
1484        source: Some(node_id.to_string()),
1485        created_at: now_rfc3339(),
1486        freshness: Some("fresh".to_string()),
1487        priority: None,
1488        lineage: selected
1489            .iter()
1490            .map(|artifact| artifact.id.clone())
1491            .collect(),
1492        relevance: Some(1.0),
1493        estimated_tokens: None,
1494        stage: Some(node_id.to_string()),
1495        metadata,
1496    }
1497    .normalize();
1498
1499    Ok((llm_result, vec![artifact], transcript))
1500}
1501
1502pub fn next_nodes_for(
1503    graph: &WorkflowGraph,
1504    current: &str,
1505    branch: Option<&str>,
1506) -> Vec<WorkflowEdge> {
1507    let mut matching: Vec<WorkflowEdge> = graph
1508        .edges
1509        .iter()
1510        .filter(|edge| edge.from == current && edge.branch.as_deref() == branch)
1511        .cloned()
1512        .collect();
1513    if matching.is_empty() {
1514        matching = graph
1515            .edges
1516            .iter()
1517            .filter(|edge| edge.from == current && edge.branch.is_none())
1518            .cloned()
1519            .collect();
1520    }
1521    matching
1522}
1523
1524pub fn next_node_for(graph: &WorkflowGraph, current: &str, branch: &str) -> Option<String> {
1525    next_nodes_for(graph, current, Some(branch))
1526        .into_iter()
1527        .next()
1528        .map(|edge| edge.to)
1529}
1530
1531pub fn append_audit_entry(
1532    graph: &mut WorkflowGraph,
1533    op: &str,
1534    node_id: Option<String>,
1535    reason: Option<String>,
1536    metadata: BTreeMap<String, serde_json::Value>,
1537) {
1538    graph.audit_log.push(WorkflowAuditEntry {
1539        id: new_id("audit"),
1540        op: op.to_string(),
1541        node_id,
1542        timestamp: now_rfc3339(),
1543        reason,
1544        metadata,
1545    });
1546}