Skip to main content

harn_vm/orchestration/
workflow.rs

1//! Workflow graph types, normalization, validation, and execution.
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::rc::Rc;
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9    new_id, now_rfc3339, redact_transcript_visibility, ArtifactRecord, AutoCompactPolicy,
10    BranchSemantics, CapabilityPolicy, ContextPolicy, EscalationPolicy, JoinPolicy, MapPolicy,
11    ModelPolicy, ReducePolicy, RetryPolicy, StageContract,
12};
13use crate::llm::{extract_llm_options, vm_call_llm_full, vm_value_to_json};
14use crate::tool_annotations::{SideEffectLevel, ToolAnnotations, ToolArgSchema, ToolKind};
15use crate::value::{VmError, VmValue};
16
17pub const WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY: &str = "workflow_verification_contracts";
18pub const WORKFLOW_VERIFICATION_SCOPE_METADATA_KEY: &str = "workflow_verification_scope";
19
20#[derive(Clone, Debug, Default, Serialize, Deserialize)]
21#[serde(default)]
22pub struct WorkflowNode {
23    pub id: Option<String>,
24    pub kind: String,
25    pub mode: Option<String>,
26    pub prompt: Option<String>,
27    pub system: Option<String>,
28    pub task_label: Option<String>,
29    pub done_sentinel: Option<String>,
30    pub tools: serde_json::Value,
31    pub model_policy: ModelPolicy,
32    /// Per-stage auto-compaction settings for the agent loop's context
33    /// window. Lifecycle operations (reset, fork, trim, compact) are NOT
34    /// expressible here — call the `agent_session_*` builtins before the
35    /// stage or in a prior stage.
36    pub auto_compact: AutoCompactPolicy,
37    /// Output visibility filter applied to the transcript after the
38    /// stage's agent loop exits. `"public"` / `"public_only"` drops
39    /// `tool_result` messages and non-public events. `None` or any
40    /// unknown string is a no-op.
41    #[serde(default)]
42    pub output_visibility: Option<String>,
43    pub context_policy: ContextPolicy,
44    pub retry_policy: RetryPolicy,
45    pub capability_policy: CapabilityPolicy,
46    pub approval_policy: super::ToolApprovalPolicy,
47    pub input_contract: StageContract,
48    pub output_contract: StageContract,
49    pub branch_semantics: BranchSemantics,
50    pub map_policy: MapPolicy,
51    pub join_policy: JoinPolicy,
52    pub reduce_policy: ReducePolicy,
53    pub escalation_policy: EscalationPolicy,
54    pub verify: Option<serde_json::Value>,
55    /// When true, the stage's agent loop gates the done sentinel on the most
56    /// recent `run()` tool call exiting cleanly (`exit_code == 0`). Use for
57    /// persistent execute stages that fold verification into the loop via a
58    /// shell-exec tool the model invokes explicitly.
59    #[serde(default)]
60    pub exit_when_verified: bool,
61    pub metadata: BTreeMap<String, serde_json::Value>,
62    #[serde(skip)]
63    pub raw_tools: Option<VmValue>,
64    /// Raw auto_compact VmValue dict — preserved for extracting closure
65    /// fields (compress_callback, mask_callback, custom_compactor) that
66    /// can't go through serde.
67    #[serde(skip)]
68    pub raw_auto_compact: Option<VmValue>,
69    /// Raw model_policy VmValue dict — preserved for extracting closure
70    /// fields (post_turn_callback) that can't go through serde.
71    #[serde(skip)]
72    pub raw_model_policy: Option<VmValue>,
73    /// Raw context_assembler VmValue dict — when set, the stage's
74    /// artifact context is packed through `assemble_context` before
75    /// rendering the system prompt. Closure fields (`ranker_callback`)
76    /// are preserved here because they can't round-trip through serde.
77    #[serde(skip)]
78    pub raw_context_assembler: Option<VmValue>,
79}
80
81impl PartialEq for WorkflowNode {
82    fn eq(&self, other: &Self) -> bool {
83        serde_json::to_value(self).ok() == serde_json::to_value(other).ok()
84    }
85}
86
87#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
88#[serde(default)]
89pub struct VerificationRequirement {
90    pub kind: String,
91    pub value: String,
92    pub note: Option<String>,
93}
94
95#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
96#[serde(default)]
97pub struct VerificationContract {
98    pub source_node: Option<String>,
99    pub summary: Option<String>,
100    pub command: Option<String>,
101    pub expect_status: Option<i64>,
102    pub assert_text: Option<String>,
103    pub expect_text: Option<String>,
104    pub required_identifiers: Vec<String>,
105    pub required_paths: Vec<String>,
106    pub required_text: Vec<String>,
107    pub notes: Vec<String>,
108    pub checks: Vec<VerificationRequirement>,
109}
110
111impl VerificationContract {
112    fn is_empty(&self) -> bool {
113        self.summary.is_none()
114            && self.command.is_none()
115            && self.expect_status.is_none()
116            && self.assert_text.is_none()
117            && self.expect_text.is_none()
118            && self.required_identifiers.is_empty()
119            && self.required_paths.is_empty()
120            && self.required_text.is_empty()
121            && self.notes.is_empty()
122            && self.checks.is_empty()
123    }
124}
125
126fn push_unique_string(values: &mut Vec<String>, value: &str) {
127    let trimmed = value.trim();
128    if trimmed.is_empty() {
129        return;
130    }
131    if !values.iter().any(|existing| existing == trimmed) {
132        values.push(trimmed.to_string());
133    }
134}
135
136fn push_unique_requirement(
137    values: &mut Vec<VerificationRequirement>,
138    kind: &str,
139    value: &str,
140    note: Option<&str>,
141) {
142    let trimmed_kind = kind.trim();
143    let trimmed_value = value.trim();
144    let trimmed_note = note
145        .map(str::trim)
146        .filter(|candidate| !candidate.is_empty())
147        .map(|candidate| candidate.to_string());
148    if trimmed_kind.is_empty() || trimmed_value.is_empty() {
149        return;
150    }
151    let candidate = VerificationRequirement {
152        kind: trimmed_kind.to_string(),
153        value: trimmed_value.to_string(),
154        note: trimmed_note,
155    };
156    if !values.iter().any(|existing| existing == &candidate) {
157        values.push(candidate);
158    }
159}
160
161fn json_string_list(value: Option<&serde_json::Value>) -> Vec<String> {
162    match value {
163        Some(serde_json::Value::String(text)) => {
164            let mut values = Vec::new();
165            push_unique_string(&mut values, text);
166            values
167        }
168        Some(serde_json::Value::Array(items)) => {
169            let mut values = Vec::new();
170            for item in items {
171                if let Some(text) = item.as_str() {
172                    push_unique_string(&mut values, text);
173                }
174            }
175            values
176        }
177        _ => Vec::new(),
178    }
179}
180
181fn merge_verification_requirement_list(
182    target: &mut Vec<VerificationRequirement>,
183    value: Option<&serde_json::Value>,
184) {
185    let Some(items) = value.and_then(|raw| raw.as_array()) else {
186        return;
187    };
188    for item in items {
189        let Some(object) = item.as_object() else {
190            continue;
191        };
192        let kind = object
193            .get("kind")
194            .and_then(|value| value.as_str())
195            .unwrap_or_default();
196        let value = object
197            .get("value")
198            .and_then(|value| value.as_str())
199            .unwrap_or_default();
200        let note = object
201            .get("note")
202            .or_else(|| object.get("description"))
203            .or_else(|| object.get("reason"))
204            .and_then(|value| value.as_str());
205        push_unique_requirement(target, kind, value, note);
206    }
207}
208
209fn merge_verification_contract_fields(
210    target: &mut VerificationContract,
211    object: &serde_json::Map<String, serde_json::Value>,
212) {
213    if target.summary.is_none() {
214        target.summary = object
215            .get("summary")
216            .and_then(|value| value.as_str())
217            .map(str::trim)
218            .filter(|value| !value.is_empty())
219            .map(|value| value.to_string());
220    }
221    if target.command.is_none() {
222        target.command = object
223            .get("command")
224            .and_then(|value| value.as_str())
225            .map(str::trim)
226            .filter(|value| !value.is_empty())
227            .map(|value| value.to_string());
228    }
229    if target.expect_status.is_none() {
230        target.expect_status = object.get("expect_status").and_then(|value| value.as_i64());
231    }
232    if target.assert_text.is_none() {
233        target.assert_text = object
234            .get("assert_text")
235            .and_then(|value| value.as_str())
236            .map(str::trim)
237            .filter(|value| !value.is_empty())
238            .map(|value| value.to_string());
239    }
240    if target.expect_text.is_none() {
241        target.expect_text = object
242            .get("expect_text")
243            .and_then(|value| value.as_str())
244            .map(str::trim)
245            .filter(|value| !value.is_empty())
246            .map(|value| value.to_string());
247    }
248
249    for value in json_string_list(
250        object
251            .get("required_identifiers")
252            .or_else(|| object.get("identifiers")),
253    ) {
254        push_unique_string(&mut target.required_identifiers, &value);
255    }
256    for value in json_string_list(object.get("required_paths").or_else(|| object.get("paths"))) {
257        push_unique_string(&mut target.required_paths, &value);
258    }
259    for value in json_string_list(
260        object
261            .get("required_text")
262            .or_else(|| object.get("exact_text"))
263            .or_else(|| object.get("required_strings")),
264    ) {
265        push_unique_string(&mut target.required_text, &value);
266    }
267    for value in json_string_list(object.get("notes")) {
268        push_unique_string(&mut target.notes, &value);
269    }
270    merge_verification_requirement_list(&mut target.checks, object.get("checks"));
271}
272
273fn load_verification_contract_file(path: &str) -> Result<serde_json::Value, VmError> {
274    let resolved = crate::stdlib::process::resolve_source_asset_path(path);
275    let contents = std::fs::read_to_string(&resolved).map_err(|error| {
276        VmError::Runtime(format!(
277            "workflow verification contract read failed for {}: {error}",
278            resolved.display()
279        ))
280    })?;
281    serde_json::from_str(&contents).map_err(|error| {
282        VmError::Runtime(format!(
283            "workflow verification contract parse failed for {}: {error}",
284            resolved.display()
285        ))
286    })
287}
288
289fn resolve_verification_contract_path(
290    verify: &serde_json::Map<String, serde_json::Value>,
291) -> Result<Option<serde_json::Value>, VmError> {
292    let Some(path) = verify
293        .get("contract_path")
294        .or_else(|| verify.get("verification_contract_path"))
295        .and_then(|value| value.as_str())
296        .map(str::trim)
297        .filter(|value| !value.is_empty())
298    else {
299        return Ok(None);
300    };
301    Ok(Some(load_verification_contract_file(path)?))
302}
303
304pub fn verification_contract_from_verify(
305    node_id: &str,
306    verify: Option<&serde_json::Value>,
307) -> Result<Option<VerificationContract>, VmError> {
308    let Some(verify_object) = verify.and_then(|value| value.as_object()) else {
309        return Ok(None);
310    };
311
312    let mut contract = VerificationContract {
313        source_node: Some(node_id.to_string()),
314        ..Default::default()
315    };
316
317    if let Some(file_contract) = resolve_verification_contract_path(verify_object)? {
318        let Some(object) = file_contract.as_object() else {
319            return Err(VmError::Runtime(
320                "workflow verification contract file must parse to a JSON object".to_string(),
321            ));
322        };
323        merge_verification_contract_fields(&mut contract, object);
324    }
325
326    if let Some(inline_contract) = verify_object.get("contract") {
327        let Some(object) = inline_contract.as_object() else {
328            return Err(VmError::Runtime(
329                "workflow verify.contract must be an object".to_string(),
330            ));
331        };
332        merge_verification_contract_fields(&mut contract, object);
333    }
334
335    merge_verification_contract_fields(&mut contract, verify_object);
336
337    if let Some(assert_text) = contract.assert_text.clone() {
338        push_unique_requirement(
339            &mut contract.checks,
340            "visible_text_contains",
341            &assert_text,
342            Some("verify stage requires visible output to contain this text"),
343        );
344    }
345    if let Some(expect_text) = contract.expect_text.clone() {
346        push_unique_requirement(
347            &mut contract.checks,
348            "combined_output_contains",
349            &expect_text,
350            Some("verify command requires combined stdout/stderr to contain this text"),
351        );
352    }
353    if let Some(expect_status) = contract.expect_status {
354        push_unique_requirement(
355            &mut contract.checks,
356            "expect_status",
357            &expect_status.to_string(),
358            Some("verify command exit status must match exactly"),
359        );
360    }
361    for identifier in contract.required_identifiers.clone() {
362        push_unique_requirement(
363            &mut contract.checks,
364            "identifier",
365            &identifier,
366            Some("use this exact identifier spelling"),
367        );
368    }
369    for path in contract.required_paths.clone() {
370        push_unique_requirement(
371            &mut contract.checks,
372            "path",
373            &path,
374            Some("preserve this exact path"),
375        );
376    }
377    for text in contract.required_text.clone() {
378        push_unique_requirement(
379            &mut contract.checks,
380            "text",
381            &text,
382            Some("required exact text or wiring snippet"),
383        );
384    }
385
386    if contract.is_empty() {
387        return Ok(None);
388    }
389    Ok(Some(contract))
390}
391
392fn push_unique_contract(values: &mut Vec<VerificationContract>, candidate: VerificationContract) {
393    if !values.iter().any(|existing| existing == &candidate) {
394        values.push(candidate);
395    }
396}
397
398pub fn workflow_verification_contracts(
399    graph: &WorkflowGraph,
400) -> Result<Vec<VerificationContract>, VmError> {
401    let mut contracts = Vec::new();
402    for (node_id, node) in &graph.nodes {
403        if let Some(contract) = verification_contract_from_verify(node_id, node.verify.as_ref())? {
404            push_unique_contract(&mut contracts, contract);
405        }
406    }
407    Ok(contracts)
408}
409
410pub fn inject_workflow_verification_contracts(
411    node: &mut WorkflowNode,
412    contracts: &[VerificationContract],
413) {
414    if contracts.is_empty() {
415        return;
416    }
417    node.metadata.insert(
418        WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY.to_string(),
419        serde_json::to_value(contracts).unwrap_or_default(),
420    );
421}
422
423pub fn stage_verification_contracts(
424    node_id: &str,
425    node: &WorkflowNode,
426) -> Result<Vec<VerificationContract>, VmError> {
427    let local_contract = verification_contract_from_verify(node_id, node.verify.as_ref())?;
428    let local_only = matches!(
429        node.metadata
430            .get(WORKFLOW_VERIFICATION_SCOPE_METADATA_KEY)
431            .and_then(|value| value.as_str()),
432        Some("local_only")
433    );
434    if local_only {
435        return Ok(local_contract.into_iter().collect());
436    }
437
438    let mut contracts = node
439        .metadata
440        .get(WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY)
441        .cloned()
442        .map(|value| {
443            serde_json::from_value::<Vec<VerificationContract>>(value).map_err(|error| {
444                VmError::Runtime(format!(
445                    "workflow stage {node_id} verification contract metadata parse failed: {error}"
446                ))
447            })
448        })
449        .transpose()?
450        .unwrap_or_default();
451
452    if let Some(local_contract) = local_contract {
453        push_unique_contract(&mut contracts, local_contract);
454    }
455    Ok(contracts)
456}
457
458pub fn workflow_tool_names(value: &serde_json::Value) -> Vec<String> {
459    match value {
460        serde_json::Value::Null => Vec::new(),
461        serde_json::Value::Array(items) => items
462            .iter()
463            .filter_map(|item| match item {
464                serde_json::Value::Object(map) => map
465                    .get("name")
466                    .and_then(|value| value.as_str())
467                    .filter(|name| !name.is_empty())
468                    .map(|name| name.to_string()),
469                _ => None,
470            })
471            .collect(),
472        serde_json::Value::Object(map) => {
473            if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
474                return map
475                    .get("tools")
476                    .map(workflow_tool_names)
477                    .unwrap_or_default();
478            }
479            map.get("name")
480                .and_then(|value| value.as_str())
481                .filter(|name| !name.is_empty())
482                .map(|name| vec![name.to_string()])
483                .unwrap_or_default()
484        }
485        _ => Vec::new(),
486    }
487}
488
489fn max_side_effect_level(levels: impl Iterator<Item = String>) -> Option<String> {
490    fn rank(v: &str) -> usize {
491        match v {
492            "none" => 0,
493            "read_only" => 1,
494            "workspace_write" => 2,
495            "process_exec" => 3,
496            "network" => 4,
497            _ => 5,
498        }
499    }
500    levels.max_by_key(|level| rank(level))
501}
502
503fn parse_tool_kind(value: Option<&serde_json::Value>) -> ToolKind {
504    match value.and_then(|v| v.as_str()).unwrap_or("") {
505        "read" => ToolKind::Read,
506        "edit" => ToolKind::Edit,
507        "delete" => ToolKind::Delete,
508        "move" => ToolKind::Move,
509        "search" => ToolKind::Search,
510        "execute" => ToolKind::Execute,
511        "think" => ToolKind::Think,
512        "fetch" => ToolKind::Fetch,
513        _ => ToolKind::Other,
514    }
515}
516
517fn parse_tool_annotations(map: &serde_json::Map<String, serde_json::Value>) -> ToolAnnotations {
518    let policy = map
519        .get("policy")
520        .and_then(|value| value.as_object())
521        .cloned()
522        .unwrap_or_default();
523
524    let capabilities = policy
525        .get("capabilities")
526        .and_then(|value| value.as_object())
527        .map(|caps| {
528            caps.iter()
529                .map(|(capability, ops)| {
530                    let values = ops
531                        .as_array()
532                        .map(|items| {
533                            items
534                                .iter()
535                                .filter_map(|item| item.as_str().map(|s| s.to_string()))
536                                .collect::<Vec<_>>()
537                        })
538                        .unwrap_or_default();
539                    (capability.clone(), values)
540                })
541                .collect::<BTreeMap<_, _>>()
542        })
543        .unwrap_or_default();
544
545    // Accept both the structured `policy.arg_schema` object and the legacy
546    // flat fields on `policy` so pipelines can migrate gradually.
547    let arg_schema = if let Some(schema) = policy.get("arg_schema") {
548        serde_json::from_value::<ToolArgSchema>(schema.clone()).unwrap_or_default()
549    } else {
550        ToolArgSchema {
551            path_params: policy
552                .get("path_params")
553                .and_then(|value| value.as_array())
554                .map(|items| {
555                    items
556                        .iter()
557                        .filter_map(|item| item.as_str().map(|s| s.to_string()))
558                        .collect::<Vec<_>>()
559                })
560                .unwrap_or_default(),
561            arg_aliases: policy
562                .get("arg_aliases")
563                .and_then(|value| value.as_object())
564                .map(|aliases| {
565                    aliases
566                        .iter()
567                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
568                        .collect::<BTreeMap<_, _>>()
569                })
570                .unwrap_or_default(),
571            required: policy
572                .get("required")
573                .and_then(|value| value.as_array())
574                .map(|items| {
575                    items
576                        .iter()
577                        .filter_map(|item| item.as_str().map(|s| s.to_string()))
578                        .collect::<Vec<_>>()
579                })
580                .unwrap_or_default(),
581        }
582    };
583
584    let kind = parse_tool_kind(policy.get("kind"));
585    let side_effect_level = policy
586        .get("side_effect_level")
587        .and_then(|value| value.as_str())
588        .map(SideEffectLevel::parse)
589        .unwrap_or_default();
590
591    ToolAnnotations {
592        kind,
593        side_effect_level,
594        arg_schema,
595        capabilities,
596    }
597}
598
599pub fn workflow_tool_annotations(value: &serde_json::Value) -> BTreeMap<String, ToolAnnotations> {
600    match value {
601        serde_json::Value::Null => BTreeMap::new(),
602        serde_json::Value::Array(items) => items
603            .iter()
604            .filter_map(|item| match item {
605                serde_json::Value::Object(map) => map
606                    .get("name")
607                    .and_then(|value| value.as_str())
608                    .filter(|name| !name.is_empty())
609                    .map(|name| (name.to_string(), parse_tool_annotations(map))),
610                _ => None,
611            })
612            .collect(),
613        serde_json::Value::Object(map) => {
614            if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
615                return map
616                    .get("tools")
617                    .map(workflow_tool_annotations)
618                    .unwrap_or_default();
619            }
620            map.get("name")
621                .and_then(|value| value.as_str())
622                .filter(|name| !name.is_empty())
623                .map(|name| {
624                    let mut annotations = BTreeMap::new();
625                    annotations.insert(name.to_string(), parse_tool_annotations(map));
626                    annotations
627                })
628                .unwrap_or_default()
629        }
630        _ => BTreeMap::new(),
631    }
632}
633
634pub fn workflow_tool_policy_from_tools(value: &serde_json::Value) -> CapabilityPolicy {
635    let tools = workflow_tool_names(value);
636    let tool_annotations = workflow_tool_annotations(value);
637    let mut capabilities: BTreeMap<String, Vec<String>> = BTreeMap::new();
638    for annotations in tool_annotations.values() {
639        for (capability, ops) in &annotations.capabilities {
640            let entry = capabilities.entry(capability.clone()).or_default();
641            for op in ops {
642                if !entry.contains(op) {
643                    entry.push(op.clone());
644                }
645            }
646            entry.sort();
647        }
648    }
649    let side_effect_level = max_side_effect_level(
650        tool_annotations
651            .values()
652            .map(|annotations| annotations.side_effect_level.as_str().to_string())
653            .filter(|level| level != "none"),
654    );
655    CapabilityPolicy {
656        tools,
657        capabilities,
658        workspace_roots: Vec::new(),
659        side_effect_level,
660        recursion_limit: None,
661        tool_arg_constraints: Vec::new(),
662        tool_annotations,
663    }
664}
665
666#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
667#[serde(default)]
668pub struct WorkflowEdge {
669    pub from: String,
670    pub to: String,
671    pub branch: Option<String>,
672    pub label: Option<String>,
673}
674
675#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
676#[serde(default)]
677pub struct WorkflowGraph {
678    #[serde(rename = "_type")]
679    pub type_name: String,
680    pub id: String,
681    pub name: Option<String>,
682    pub version: usize,
683    pub entry: String,
684    pub nodes: BTreeMap<String, WorkflowNode>,
685    pub edges: Vec<WorkflowEdge>,
686    pub capability_policy: CapabilityPolicy,
687    pub approval_policy: super::ToolApprovalPolicy,
688    pub metadata: BTreeMap<String, serde_json::Value>,
689    pub audit_log: Vec<WorkflowAuditEntry>,
690}
691
692#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
693#[serde(default)]
694pub struct WorkflowAuditEntry {
695    pub id: String,
696    pub op: String,
697    pub node_id: Option<String>,
698    pub timestamp: String,
699    pub reason: Option<String>,
700    pub metadata: BTreeMap<String, serde_json::Value>,
701}
702
703#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
704#[serde(default)]
705pub struct WorkflowValidationReport {
706    pub valid: bool,
707    pub errors: Vec<String>,
708    pub warnings: Vec<String>,
709    pub reachable_nodes: Vec<String>,
710}
711
712pub fn parse_workflow_node_value(value: &VmValue, label: &str) -> Result<WorkflowNode, VmError> {
713    let mut node: WorkflowNode = super::parse_json_payload(vm_value_to_json(value), label)?;
714    let dict = value.as_dict();
715    node.raw_tools = dict.and_then(|d| d.get("tools")).cloned();
716    node.raw_auto_compact = dict.and_then(|d| d.get("auto_compact")).cloned();
717    node.raw_model_policy = dict.and_then(|d| d.get("model_policy")).cloned();
718    node.raw_context_assembler = dict.and_then(|d| d.get("context_assembler")).cloned();
719    Ok(node)
720}
721
722pub fn parse_workflow_node_json(
723    json: serde_json::Value,
724    label: &str,
725) -> Result<WorkflowNode, VmError> {
726    super::parse_json_payload(json, label)
727}
728
729pub fn parse_workflow_edge_json(
730    json: serde_json::Value,
731    label: &str,
732) -> Result<WorkflowEdge, VmError> {
733    super::parse_json_payload(json, label)
734}
735
736pub fn normalize_workflow_value(value: &VmValue) -> Result<WorkflowGraph, VmError> {
737    let mut graph: WorkflowGraph = super::parse_json_value(value)?;
738    let as_dict = value.as_dict().cloned().unwrap_or_default();
739
740    if graph.nodes.is_empty() {
741        for key in ["act", "verify", "repair"] {
742            if let Some(node_value) = as_dict.get(key) {
743                let mut node = parse_workflow_node_value(node_value, "orchestration")?;
744                let raw_node = node_value.as_dict().cloned().unwrap_or_default();
745                node.id = Some(key.to_string());
746                if node.kind.is_empty() {
747                    node.kind = if key == "verify" {
748                        "verify".to_string()
749                    } else {
750                        "stage".to_string()
751                    };
752                }
753                if node.model_policy.provider.is_none() {
754                    node.model_policy.provider = as_dict
755                        .get("provider")
756                        .map(|value| value.display())
757                        .filter(|value| !value.is_empty());
758                }
759                if node.model_policy.model.is_none() {
760                    node.model_policy.model = as_dict
761                        .get("model")
762                        .map(|value| value.display())
763                        .filter(|value| !value.is_empty());
764                }
765                if node.model_policy.model_tier.is_none() {
766                    node.model_policy.model_tier = as_dict
767                        .get("model_tier")
768                        .or_else(|| as_dict.get("tier"))
769                        .map(|value| value.display())
770                        .filter(|value| !value.is_empty());
771                }
772                if node.model_policy.temperature.is_none() {
773                    node.model_policy.temperature = as_dict.get("temperature").and_then(|value| {
774                        if let VmValue::Float(number) = value {
775                            Some(*number)
776                        } else {
777                            value.as_int().map(|number| number as f64)
778                        }
779                    });
780                }
781                if node.model_policy.max_tokens.is_none() {
782                    node.model_policy.max_tokens =
783                        as_dict.get("max_tokens").and_then(|value| value.as_int());
784                }
785                if node.mode.is_none() {
786                    node.mode = as_dict
787                        .get("mode")
788                        .map(|value| value.display())
789                        .filter(|value| !value.is_empty());
790                }
791                if node.done_sentinel.is_none() {
792                    node.done_sentinel = as_dict
793                        .get("done_sentinel")
794                        .map(|value| value.display())
795                        .filter(|value| !value.is_empty());
796                }
797                if key == "verify"
798                    && node.verify.is_none()
799                    && (raw_node.contains_key("assert_text")
800                        || raw_node.contains_key("command")
801                        || raw_node.contains_key("expect_status")
802                        || raw_node.contains_key("expect_text"))
803                {
804                    node.verify = Some(serde_json::json!({
805                        "assert_text": raw_node.get("assert_text").map(vm_value_to_json),
806                        "command": raw_node.get("command").map(vm_value_to_json),
807                        "expect_status": raw_node.get("expect_status").map(vm_value_to_json),
808                        "expect_text": raw_node.get("expect_text").map(vm_value_to_json),
809                    }));
810                }
811                graph.nodes.insert(key.to_string(), node);
812            }
813        }
814        if graph.entry.is_empty() && graph.nodes.contains_key("act") {
815            graph.entry = "act".to_string();
816        }
817        if graph.edges.is_empty() && graph.nodes.contains_key("act") {
818            if graph.nodes.contains_key("verify") {
819                graph.edges.push(WorkflowEdge {
820                    from: "act".to_string(),
821                    to: "verify".to_string(),
822                    branch: None,
823                    label: None,
824                });
825            }
826            if graph.nodes.contains_key("repair") {
827                graph.edges.push(WorkflowEdge {
828                    from: "verify".to_string(),
829                    to: "repair".to_string(),
830                    branch: Some("failed".to_string()),
831                    label: None,
832                });
833                graph.edges.push(WorkflowEdge {
834                    from: "repair".to_string(),
835                    to: "verify".to_string(),
836                    branch: Some("retry".to_string()),
837                    label: None,
838                });
839            }
840        }
841    }
842
843    if graph.type_name.is_empty() {
844        graph.type_name = "workflow_graph".to_string();
845    }
846    if graph.id.is_empty() {
847        graph.id = new_id("workflow");
848    }
849    if graph.version == 0 {
850        graph.version = 1;
851    }
852    if graph.entry.is_empty() {
853        graph.entry = graph
854            .nodes
855            .keys()
856            .next()
857            .cloned()
858            .unwrap_or_else(|| "act".to_string());
859    }
860    for (node_id, node) in &mut graph.nodes {
861        if node.raw_tools.is_none() {
862            node.raw_tools = as_dict
863                .get("nodes")
864                .and_then(|nodes| nodes.as_dict())
865                .and_then(|nodes| nodes.get(node_id))
866                .and_then(|node_value| node_value.as_dict())
867                .and_then(|raw_node| raw_node.get("tools"))
868                .cloned();
869        }
870        if node.id.is_none() {
871            node.id = Some(node_id.clone());
872        }
873        if node.kind.is_empty() {
874            node.kind = "stage".to_string();
875        }
876        if node.join_policy.strategy.is_empty() {
877            node.join_policy.strategy = "all".to_string();
878        }
879        if node.reduce_policy.strategy.is_empty() {
880            node.reduce_policy.strategy = "concat".to_string();
881        }
882        if node.output_contract.output_kinds.is_empty() {
883            node.output_contract.output_kinds = vec![match node.kind.as_str() {
884                "verify" => "verification_result".to_string(),
885                "reduce" => node
886                    .reduce_policy
887                    .output_kind
888                    .clone()
889                    .unwrap_or_else(|| "summary".to_string()),
890                "map" => node
891                    .map_policy
892                    .output_kind
893                    .clone()
894                    .unwrap_or_else(|| "artifact".to_string()),
895                "escalation" => "plan".to_string(),
896                _ => "artifact".to_string(),
897            }];
898        }
899        if node.retry_policy.max_attempts == 0 {
900            node.retry_policy.max_attempts = 1;
901        }
902    }
903    Ok(graph)
904}
905
906pub fn validate_workflow(
907    graph: &WorkflowGraph,
908    ceiling: Option<&CapabilityPolicy>,
909) -> WorkflowValidationReport {
910    let mut errors = Vec::new();
911    let mut warnings = Vec::new();
912
913    if !graph.nodes.contains_key(&graph.entry) {
914        errors.push(format!("entry node does not exist: {}", graph.entry));
915    }
916
917    let node_ids: BTreeSet<String> = graph.nodes.keys().cloned().collect();
918    for edge in &graph.edges {
919        if !node_ids.contains(&edge.from) {
920            errors.push(format!("edge.from references unknown node: {}", edge.from));
921        }
922        if !node_ids.contains(&edge.to) {
923            errors.push(format!("edge.to references unknown node: {}", edge.to));
924        }
925    }
926
927    let reachable_nodes = reachable_nodes(graph);
928    for node_id in &node_ids {
929        if !reachable_nodes.contains(node_id) {
930            warnings.push(format!("node is unreachable: {node_id}"));
931        }
932    }
933
934    for (node_id, node) in &graph.nodes {
935        let incoming = graph
936            .edges
937            .iter()
938            .filter(|edge| edge.to == *node_id)
939            .count();
940        let outgoing: Vec<&WorkflowEdge> = graph
941            .edges
942            .iter()
943            .filter(|edge| edge.from == *node_id)
944            .collect();
945        if let Some(min_inputs) = node.input_contract.min_inputs {
946            if let Some(max_inputs) = node.input_contract.max_inputs {
947                if min_inputs > max_inputs {
948                    errors.push(format!(
949                        "node {node_id}: input contract min_inputs exceeds max_inputs"
950                    ));
951                }
952            }
953        }
954        match node.kind.as_str() {
955            "condition" => {
956                let has_true = outgoing
957                    .iter()
958                    .any(|edge| edge.branch.as_deref() == Some("true"));
959                let has_false = outgoing
960                    .iter()
961                    .any(|edge| edge.branch.as_deref() == Some("false"));
962                if !has_true || !has_false {
963                    errors.push(format!(
964                        "node {node_id}: condition nodes require both 'true' and 'false' branch edges"
965                    ));
966                }
967            }
968            "fork" if outgoing.len() < 2 => {
969                errors.push(format!(
970                    "node {node_id}: fork nodes require at least two outgoing edges"
971                ));
972            }
973            "join" if incoming < 2 => {
974                warnings.push(format!(
975                    "node {node_id}: join node has fewer than two incoming edges"
976                ));
977            }
978            "map"
979                if node.map_policy.items.is_empty()
980                    && node.map_policy.item_artifact_kind.is_none()
981                    && node.input_contract.input_kinds.is_empty() =>
982            {
983                errors.push(format!(
984                    "node {node_id}: map nodes require items, item_artifact_kind, or input_contract.input_kinds"
985                ));
986            }
987            "reduce" if node.input_contract.input_kinds.is_empty() => {
988                warnings.push(format!(
989                    "node {node_id}: reduce node has no input_contract.input_kinds; it will consume all available artifacts"
990                ));
991            }
992            _ => {}
993        }
994    }
995
996    if let Some(ceiling) = ceiling {
997        if let Err(error) = ceiling.intersect(&graph.capability_policy) {
998            errors.push(error);
999        }
1000        for (node_id, node) in &graph.nodes {
1001            if let Err(error) = ceiling.intersect(&node.capability_policy) {
1002                errors.push(format!("node {node_id}: {error}"));
1003            }
1004        }
1005    }
1006
1007    WorkflowValidationReport {
1008        valid: errors.is_empty(),
1009        errors,
1010        warnings,
1011        reachable_nodes: reachable_nodes.into_iter().collect(),
1012    }
1013}
1014
1015fn reachable_nodes(graph: &WorkflowGraph) -> BTreeSet<String> {
1016    let mut seen = BTreeSet::new();
1017    let mut stack = vec![graph.entry.clone()];
1018    while let Some(node_id) = stack.pop() {
1019        if !seen.insert(node_id.clone()) {
1020            continue;
1021        }
1022        for edge in graph.edges.iter().filter(|edge| edge.from == node_id) {
1023            stack.push(edge.to.clone());
1024        }
1025    }
1026    seen
1027}
1028
1029/// Pick the session id a stage should run under. Prefers an explicit
1030/// `session_id` on the node's `model_policy` dict (so pipelines with
1031/// `agent_session_open` / `agent_session_fork` flowing through a graph
1032/// line up); falls back to a stable, node-derived id so multi-stage
1033/// graphs with no explicit session share a conversation across stages.
1034/// Per-stage skill registry. Per-node `model_policy.skills` takes
1035/// precedence over the workflow-level `run_options.skills` — authors
1036/// can scope a skill set to one stage without affecting siblings. When
1037/// neither is set, returns `None` so the agent loop runs without
1038/// skill matching (preserves pre-Gap-2 behavior for callers that
1039/// didn't opt in).
1040fn resolve_stage_skill_registry(node: &WorkflowNode) -> Option<VmValue> {
1041    let per_node = node
1042        .raw_model_policy
1043        .as_ref()
1044        .and_then(|v| v.as_dict())
1045        .and_then(|d| d.get("skills"))
1046        .cloned()
1047        .and_then(normalize_inline_registry);
1048    if per_node.is_some() {
1049        return per_node;
1050    }
1051    super::current_workflow_skill_context().and_then(|ctx| ctx.registry)
1052}
1053
1054/// Mirror of `resolve_stage_skill_registry` for the match config:
1055/// per-node `model_policy.skill_match` wins, falling back to the
1056/// workflow-level setting.
1057fn resolve_stage_skill_match(node: &WorkflowNode) -> crate::llm::SkillMatchConfig {
1058    let per_node = node
1059        .raw_model_policy
1060        .as_ref()
1061        .and_then(|v| v.as_dict())
1062        .and_then(|d| d.get("skill_match"))
1063        .and_then(|v| v.as_dict().cloned());
1064    if let Some(dict) = per_node {
1065        return crate::llm::parse_skill_match_config_dict(&dict);
1066    }
1067    super::current_workflow_skill_context()
1068        .and_then(|ctx| ctx.match_config)
1069        .and_then(|v| v.as_dict().cloned())
1070        .map(|d| crate::llm::parse_skill_match_config_dict(&d))
1071        .unwrap_or_default()
1072}
1073
1074/// Accept both a validated `skill_registry` dict and a bare list of
1075/// skill entries. The workflow-level parser in `register.rs` does the
1076/// same — we duplicate here so per-node `model_policy.skills` settings
1077/// (not routed through that parser) also benefit.
1078fn normalize_inline_registry(value: VmValue) -> Option<VmValue> {
1079    use std::collections::BTreeMap;
1080    use std::rc::Rc;
1081    match &value {
1082        VmValue::Dict(d)
1083            if d.get("_type")
1084                .map(|v| v.display() == "skill_registry")
1085                .unwrap_or(false) =>
1086        {
1087            Some(value)
1088        }
1089        VmValue::List(list) => {
1090            let mut dict = BTreeMap::new();
1091            dict.insert(
1092                "_type".to_string(),
1093                VmValue::String(Rc::from("skill_registry")),
1094            );
1095            dict.insert("skills".to_string(), VmValue::List(list.clone()));
1096            Some(VmValue::Dict(Rc::new(dict)))
1097        }
1098        _ => None,
1099    }
1100}
1101
1102fn resolve_node_session_id(node: &WorkflowNode) -> String {
1103    if let Some(explicit) = node
1104        .raw_model_policy
1105        .as_ref()
1106        .and_then(|v| v.as_dict())
1107        .and_then(|d| d.get("session_id"))
1108        .and_then(|v| match v {
1109            VmValue::String(s) if !s.trim().is_empty() => Some(s.to_string()),
1110            _ => None,
1111        })
1112    {
1113        return explicit;
1114    }
1115    if let Some(persisted) = node
1116        .metadata
1117        .get("worker_session_id")
1118        .and_then(|value| value.as_str())
1119        .filter(|value| !value.trim().is_empty())
1120    {
1121        return persisted.to_string();
1122    }
1123    format!("workflow_stage_{}", uuid::Uuid::now_v7())
1124}
1125
1126fn raw_auto_compact_dict(
1127    node: &WorkflowNode,
1128) -> Option<&std::collections::BTreeMap<String, VmValue>> {
1129    node.raw_auto_compact
1130        .as_ref()
1131        .and_then(|value| value.as_dict())
1132}
1133
1134fn raw_auto_compact_int(node: &WorkflowNode, key: &str) -> Option<usize> {
1135    raw_auto_compact_dict(node)
1136        .and_then(|dict| dict.get(key))
1137        .and_then(|value| value.as_int())
1138        .filter(|value| *value >= 0)
1139        .map(|value| value as usize)
1140}
1141
1142fn raw_auto_compact_string(node: &WorkflowNode, key: &str) -> Option<String> {
1143    raw_auto_compact_dict(node)
1144        .and_then(|dict| dict.get(key))
1145        .and_then(|value| match value {
1146            VmValue::String(text) if !text.trim().is_empty() => Some(text.to_string()),
1147            _ => None,
1148        })
1149}
1150
1151pub(crate) async fn resolve_stage_auto_compact(
1152    node: &WorkflowNode,
1153    opts: &crate::llm::api::LlmCallOptions,
1154) -> Result<Option<crate::orchestration::AutoCompactConfig>, VmError> {
1155    if !node.auto_compact.enabled {
1156        return Ok(None);
1157    }
1158
1159    let mut ac = crate::orchestration::AutoCompactConfig::default();
1160    if let Some(v) = node.auto_compact.token_threshold {
1161        ac.token_threshold = v;
1162    }
1163    if let Some(v) = node.auto_compact.tool_output_max_chars {
1164        ac.tool_output_max_chars = v;
1165    }
1166    if let Some(ref strategy) = node.auto_compact.compact_strategy {
1167        if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
1168            ac.compact_strategy = s;
1169        }
1170    }
1171    if let Some(v) = node.auto_compact.hard_limit_tokens {
1172        ac.hard_limit_tokens = Some(v);
1173    }
1174    if let Some(ref strategy) = node.auto_compact.hard_limit_strategy {
1175        if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
1176            ac.hard_limit_strategy = s;
1177        }
1178    }
1179
1180    // Workflow nodes keep the richer agent-loop-only compaction knobs in the
1181    // raw dict because the typed policy shape intentionally models only the
1182    // common workflow fields.
1183    if let Some(v) = raw_auto_compact_int(node, "compact_keep_last")
1184        .or_else(|| raw_auto_compact_int(node, "keep_last"))
1185    {
1186        ac.keep_last = v;
1187    }
1188    if let Some(prompt) = raw_auto_compact_string(node, "summarize_prompt") {
1189        ac.summarize_prompt = Some(prompt);
1190    }
1191
1192    // Closure fields can't round-trip through serde, so extract them directly
1193    // from the raw VmValue dict.
1194    if let Some(dict) = raw_auto_compact_dict(node) {
1195        if let Some(cb) = dict.get("compress_callback") {
1196            ac.compress_callback = Some(cb.clone());
1197        }
1198        if let Some(cb) = dict.get("mask_callback") {
1199            ac.mask_callback = Some(cb.clone());
1200        }
1201        if let Some(cb) = dict.get("custom_compactor") {
1202            ac.custom_compactor = Some(cb.clone());
1203        }
1204    }
1205
1206    let user_specified_threshold = node.auto_compact.token_threshold.is_some();
1207    let user_specified_hard_limit = node.auto_compact.hard_limit_tokens.is_some();
1208    crate::llm::api::adapt_auto_compact_to_provider(
1209        &mut ac,
1210        user_specified_threshold,
1211        user_specified_hard_limit,
1212        &opts.provider,
1213        &opts.model,
1214        &opts.api_key,
1215    )
1216    .await;
1217
1218    Ok(Some(ac))
1219}
1220
1221pub async fn execute_stage_node(
1222    node_id: &str,
1223    node: &WorkflowNode,
1224    task: &str,
1225    artifacts: &[ArtifactRecord],
1226) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
1227    let mut selection_policy = node.context_policy.clone();
1228    if selection_policy.include_kinds.is_empty() && !node.input_contract.input_kinds.is_empty() {
1229        selection_policy.include_kinds = node.input_contract.input_kinds.clone();
1230    }
1231    let selected = super::select_artifacts_adaptive(artifacts.to_vec(), &selection_policy);
1232    let rendered_context = if let Some(assembler) = node.raw_context_assembler.as_ref() {
1233        let assembled =
1234            crate::stdlib::assemble::assemble_from_options(&selected, assembler).await?;
1235        super::render_assembled_chunks(&assembled)
1236    } else {
1237        super::render_artifacts_context(&selected, &node.context_policy)
1238    };
1239    let verification_contracts = super::stage_verification_contracts(node_id, node)?;
1240    let rendered_verification = super::render_verification_context(&verification_contracts);
1241    let stage_session_id = resolve_node_session_id(node);
1242    if node.input_contract.require_transcript && !crate::agent_sessions::exists(&stage_session_id) {
1243        return Err(VmError::Runtime(format!(
1244            "workflow stage {node_id} requires an existing session \
1245             (call agent_session_open and feed session_id through model_policy \
1246             before entering this stage)"
1247        )));
1248    }
1249    if let Some(min_inputs) = node.input_contract.min_inputs {
1250        if selected.len() < min_inputs {
1251            return Err(VmError::Runtime(format!(
1252                "workflow stage {node_id} requires at least {min_inputs} input artifacts"
1253            )));
1254        }
1255    }
1256    if let Some(max_inputs) = node.input_contract.max_inputs {
1257        if selected.len() > max_inputs {
1258            return Err(VmError::Runtime(format!(
1259                "workflow stage {node_id} accepts at most {max_inputs} input artifacts"
1260            )));
1261        }
1262    }
1263    let prompt = super::render_workflow_prompt(
1264        task,
1265        node.task_label.as_deref(),
1266        &rendered_verification,
1267        &rendered_context,
1268    );
1269
1270    // Precedence for the tool-calling contract format:
1271    //   1. explicit `model_policy.tool_format` on the node
1272    //   2. `HARN_AGENT_TOOL_FORMAT` env override
1273    //   3. provider/model default
1274    // Mirrors the top-level agent_loop / llm_call resolution so workflow
1275    // authors can pin `tool_format: "native"` per-stage and have it
1276    // reach the inner agent loop.
1277    let tool_format = node
1278        .model_policy
1279        .tool_format
1280        .clone()
1281        .filter(|value| !value.trim().is_empty())
1282        .or_else(|| {
1283            std::env::var("HARN_AGENT_TOOL_FORMAT")
1284                .ok()
1285                .filter(|value| !value.trim().is_empty())
1286        })
1287        .unwrap_or_else(|| {
1288            let model = std::env::var("HARN_LLM_MODEL").unwrap_or_default();
1289            let provider = std::env::var("HARN_LLM_PROVIDER").unwrap_or_default();
1290            crate::llm_config::default_tool_format(&model, &provider)
1291        });
1292    let mut llm_result = if node.kind == "verify" {
1293        if let Some(command) = node
1294            .verify
1295            .as_ref()
1296            .and_then(|verify| verify.as_object())
1297            .and_then(|verify| verify.get("command"))
1298            .and_then(|value| value.as_str())
1299            .map(str::trim)
1300            .filter(|value| !value.is_empty())
1301        {
1302            let (program, args) = if cfg!(target_os = "windows") {
1303                ("cmd", vec!["/C".to_string(), command.to_string()])
1304            } else {
1305                ("/bin/sh", vec!["-lc".to_string(), command.to_string()])
1306            };
1307            let mut process_config = crate::stdlib::sandbox::ProcessCommandConfig {
1308                stdin_null: true,
1309                ..Default::default()
1310            };
1311            if let Some(context) = crate::stdlib::process::current_execution_context() {
1312                if let Some(cwd) = context.cwd.filter(|cwd| !cwd.is_empty()) {
1313                    crate::stdlib::sandbox::enforce_process_cwd(std::path::Path::new(&cwd))?;
1314                    process_config.cwd = Some(std::path::PathBuf::from(cwd));
1315                }
1316                if !context.env.is_empty() {
1317                    process_config.env.extend(context.env);
1318                }
1319            }
1320            let output = crate::stdlib::sandbox::command_output(program, &args, &process_config)?;
1321            let stdout = String::from_utf8_lossy(&output.stdout).to_string();
1322            let stderr = String::from_utf8_lossy(&output.stderr).to_string();
1323            let combined = if stderr.is_empty() {
1324                stdout.clone()
1325            } else if stdout.is_empty() {
1326                stderr.clone()
1327            } else {
1328                format!("{stdout}\n{stderr}")
1329            };
1330            serde_json::json!({
1331                "status": "completed",
1332                "text": combined,
1333                "visible_text": combined,
1334                "command": command,
1335                "stdout": stdout,
1336                "stderr": stderr,
1337                "exit_status": output.status.code().unwrap_or(-1),
1338                "success": output.status.success(),
1339            })
1340        } else {
1341            serde_json::json!({
1342                "status": "completed",
1343                "text": "",
1344                "visible_text": "",
1345            })
1346        }
1347    } else {
1348        let mut options = BTreeMap::new();
1349        if let Some(provider) = &node.model_policy.provider {
1350            options.insert(
1351                "provider".to_string(),
1352                VmValue::String(Rc::from(provider.clone())),
1353            );
1354        }
1355        if let Some(model) = &node.model_policy.model {
1356            options.insert(
1357                "model".to_string(),
1358                VmValue::String(Rc::from(model.clone())),
1359            );
1360        }
1361        if let Some(model_tier) = &node.model_policy.model_tier {
1362            options.insert(
1363                "model_tier".to_string(),
1364                VmValue::String(Rc::from(model_tier.clone())),
1365            );
1366        }
1367        if let Some(temperature) = node.model_policy.temperature {
1368            options.insert("temperature".to_string(), VmValue::Float(temperature));
1369        }
1370        if let Some(max_tokens) = node.model_policy.max_tokens {
1371            options.insert("max_tokens".to_string(), VmValue::Int(max_tokens));
1372        }
1373        let tool_names = workflow_tool_names(&node.tools);
1374        let tools_value = node.raw_tools.clone().or_else(|| {
1375            if matches!(node.tools, serde_json::Value::Null) {
1376                None
1377            } else {
1378                Some(crate::stdlib::json_to_vm_value(&node.tools))
1379            }
1380        });
1381        if tools_value.is_some() && !tool_names.is_empty() {
1382            options.insert("tools".to_string(), tools_value.unwrap_or(VmValue::Nil));
1383        }
1384        options.insert(
1385            "session_id".to_string(),
1386            VmValue::String(Rc::from(stage_session_id.clone())),
1387        );
1388
1389        let args = vec![
1390            VmValue::String(Rc::from(prompt.clone())),
1391            node.system
1392                .clone()
1393                .map(|s| VmValue::String(Rc::from(s)))
1394                .unwrap_or(VmValue::Nil),
1395            VmValue::Dict(Rc::new(options)),
1396        ];
1397        let mut opts = extract_llm_options(&args)?;
1398        let auto_compact = resolve_stage_auto_compact(node, &opts).await?;
1399
1400        if node.mode.as_deref() == Some("agent") || !tool_names.is_empty() {
1401            let tool_policy = workflow_tool_policy_from_tools(&node.tools);
1402            let effective_policy = tool_policy
1403                .intersect(&node.capability_policy)
1404                .map_err(VmError::Runtime)?;
1405            let permissions = crate::llm::permissions::parse_dynamic_permission_policy(
1406                node.raw_model_policy
1407                    .as_ref()
1408                    .and_then(|value| value.as_dict())
1409                    .and_then(|dict| dict.get("permissions")),
1410                "workflow model_policy",
1411            )?;
1412            crate::llm::run_agent_loop_internal(
1413                &mut opts,
1414                crate::llm::AgentLoopConfig {
1415                    persistent: true,
1416                    max_iterations: node.model_policy.max_iterations.unwrap_or(16),
1417                    max_nudges: node.model_policy.max_nudges.unwrap_or(3),
1418                    nudge: node.model_policy.nudge.clone(),
1419                    done_sentinel: node.done_sentinel.clone(),
1420                    break_unless_phase: None,
1421                    tool_retries: 0,
1422                    tool_backoff_ms: 1000,
1423                    tool_format: tool_format.clone(),
1424                    native_tool_fallback: node.model_policy.native_tool_fallback,
1425                    auto_compact,
1426                    policy: Some(effective_policy),
1427                    permissions,
1428                    approval_policy: Some(node.approval_policy.clone()),
1429                    daemon: false,
1430                    daemon_config: Default::default(),
1431                    llm_retries: 2,
1432                    llm_backoff_ms: 2000,
1433                    token_budget: None,
1434                    exit_when_verified: node.exit_when_verified,
1435                    loop_detect_warn: 2,
1436                    loop_detect_block: 3,
1437                    loop_detect_skip: 4,
1438                    tool_examples: node.model_policy.tool_examples.clone(),
1439                    turn_policy: node.model_policy.turn_policy.clone(),
1440                    stop_after_successful_tools: node
1441                        .model_policy
1442                        .stop_after_successful_tools
1443                        .clone(),
1444                    require_successful_tools: node.model_policy.require_successful_tools.clone(),
1445                    // Use the same session id resolved for the stage so
1446                    // agent_subscribe handlers keyed on it, and session
1447                    // storage lookups in the agent loop, stay consistent.
1448                    session_id: stage_session_id.clone(),
1449                    event_sink: None,
1450                    // Seed from the stage's explicit deliverables/ledger so the
1451                    // graph carries a task-wide plan through map branches and
1452                    // nested stages. Empty ledger means no gate.
1453                    task_ledger: node
1454                        .raw_model_policy
1455                        .as_ref()
1456                        .and_then(|v| v.as_dict())
1457                        .and_then(|d| d.get("task_ledger"))
1458                        .map(crate::llm::helpers::vm_value_to_json)
1459                        .and_then(|json| serde_json::from_value(json).ok())
1460                        .unwrap_or_default(),
1461                    post_turn_callback: node
1462                        .raw_model_policy
1463                        .as_ref()
1464                        .and_then(|v| v.as_dict())
1465                        .and_then(|d| d.get("post_turn_callback"))
1466                        .filter(|v| matches!(v, crate::value::VmValue::Closure(_)))
1467                        .cloned(),
1468                    // Inherit the workflow-level skill wiring installed
1469                    // by `workflow_execute`. Per-node `model_policy.skills`
1470                    // (optional) overrides, letting authors scope a skill
1471                    // set to one stage without affecting siblings. Empty
1472                    // thread-local = no skills configured (direct
1473                    // `execute_stage_node` callers outside a workflow).
1474                    skill_registry: resolve_stage_skill_registry(node),
1475                    skill_match: resolve_stage_skill_match(node),
1476                    working_files: Vec::new(),
1477                },
1478            )
1479            .await?
1480        } else {
1481            let result = vm_call_llm_full(&opts).await?;
1482            crate::llm::agent_loop_result_from_llm(&result, opts)
1483        }
1484    };
1485    if let Some(payload) = llm_result.as_object_mut() {
1486        payload.insert("prompt".to_string(), serde_json::json!(prompt));
1487        payload.insert(
1488            "system_prompt".to_string(),
1489            serde_json::json!(node.system.clone().unwrap_or_default()),
1490        );
1491        payload.insert(
1492            "rendered_context".to_string(),
1493            serde_json::json!(rendered_context),
1494        );
1495        if !verification_contracts.is_empty() {
1496            payload.insert(
1497                "verification_contracts".to_string(),
1498                serde_json::to_value(&verification_contracts).unwrap_or_default(),
1499            );
1500            payload.insert(
1501                "rendered_verification_context".to_string(),
1502                serde_json::json!(rendered_verification),
1503            );
1504        }
1505        payload.insert(
1506            "selected_artifact_ids".to_string(),
1507            serde_json::json!(selected
1508                .iter()
1509                .map(|artifact| artifact.id.clone())
1510                .collect::<Vec<_>>()),
1511        );
1512        payload.insert(
1513            "selected_artifact_titles".to_string(),
1514            serde_json::json!(selected
1515                .iter()
1516                .map(|artifact| artifact.title.clone())
1517                .collect::<Vec<_>>()),
1518        );
1519        match payload
1520            .entry("tools".to_string())
1521            .or_insert_with(|| serde_json::json!({}))
1522        {
1523            serde_json::Value::Object(tools) => {
1524                tools.insert("mode".to_string(), serde_json::json!(tool_format.clone()));
1525            }
1526            slot => {
1527                *slot = serde_json::json!({ "mode": tool_format.clone() });
1528            }
1529        }
1530    }
1531
1532    let visible_text = llm_result["text"].as_str().unwrap_or_default().to_string();
1533    // Non-LLM stages (verify command, condition, fork, join, ...) don't produce
1534    // a "transcript" field; fall back to the input so cross-stage conversation
1535    // state survives transitions.
1536    let result_transcript = llm_result
1537        .get("transcript")
1538        .cloned()
1539        .map(|value| crate::stdlib::json_to_vm_value(&value));
1540    let session_transcript = crate::agent_sessions::snapshot(&stage_session_id);
1541    let transcript = result_transcript
1542        .or(session_transcript)
1543        .and_then(|value| redact_transcript_visibility(&value, node.output_visibility.as_deref()));
1544    let output_kind = node
1545        .output_contract
1546        .output_kinds
1547        .first()
1548        .cloned()
1549        .unwrap_or_else(|| {
1550            if node.kind == "verify" {
1551                "verification_result".to_string()
1552            } else {
1553                "artifact".to_string()
1554            }
1555        });
1556    let mut metadata = BTreeMap::new();
1557    metadata.insert(
1558        "input_artifact_ids".to_string(),
1559        serde_json::json!(selected
1560            .iter()
1561            .map(|artifact| artifact.id.clone())
1562            .collect::<Vec<_>>()),
1563    );
1564    metadata.insert("node_kind".to_string(), serde_json::json!(node.kind));
1565    if !node.approval_policy.write_path_allowlist.is_empty() {
1566        metadata.insert(
1567            "changed_paths".to_string(),
1568            serde_json::json!(node.approval_policy.write_path_allowlist),
1569        );
1570    }
1571    let artifact = ArtifactRecord {
1572        type_name: "artifact".to_string(),
1573        id: new_id("artifact"),
1574        kind: output_kind,
1575        title: Some(format!("stage {node_id} output")),
1576        text: Some(visible_text),
1577        data: Some(llm_result.clone()),
1578        source: Some(node_id.to_string()),
1579        created_at: now_rfc3339(),
1580        freshness: Some("fresh".to_string()),
1581        priority: None,
1582        lineage: selected
1583            .iter()
1584            .map(|artifact| artifact.id.clone())
1585            .collect(),
1586        relevance: Some(1.0),
1587        estimated_tokens: None,
1588        stage: Some(node_id.to_string()),
1589        metadata,
1590    }
1591    .normalize();
1592
1593    Ok((llm_result, vec![artifact], transcript))
1594}
1595
1596pub fn next_nodes_for(
1597    graph: &WorkflowGraph,
1598    current: &str,
1599    branch: Option<&str>,
1600) -> Vec<WorkflowEdge> {
1601    let mut matching: Vec<WorkflowEdge> = graph
1602        .edges
1603        .iter()
1604        .filter(|edge| edge.from == current && edge.branch.as_deref() == branch)
1605        .cloned()
1606        .collect();
1607    if matching.is_empty() {
1608        matching = graph
1609            .edges
1610            .iter()
1611            .filter(|edge| edge.from == current && edge.branch.is_none())
1612            .cloned()
1613            .collect();
1614    }
1615    matching
1616}
1617
1618pub fn next_node_for(graph: &WorkflowGraph, current: &str, branch: &str) -> Option<String> {
1619    next_nodes_for(graph, current, Some(branch))
1620        .into_iter()
1621        .next()
1622        .map(|edge| edge.to)
1623}
1624
1625pub fn append_audit_entry(
1626    graph: &mut WorkflowGraph,
1627    op: &str,
1628    node_id: Option<String>,
1629    reason: Option<String>,
1630    metadata: BTreeMap<String, serde_json::Value>,
1631) {
1632    graph.audit_log.push(WorkflowAuditEntry {
1633        id: new_id("audit"),
1634        op: op.to_string(),
1635        node_id,
1636        timestamp: now_rfc3339(),
1637        reason,
1638        metadata,
1639    });
1640}