Skip to main content

harn_vm/orchestration/
workflow.rs

1//! Workflow graph types, normalization, validation, and execution.
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::rc::Rc;
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9    new_id, now_rfc3339, redact_transcript_visibility, ArtifactRecord, AutoCompactPolicy,
10    BranchSemantics, CapabilityPolicy, ContextPolicy, EscalationPolicy, JoinPolicy, MapPolicy,
11    ModelPolicy, ReducePolicy, RetryPolicy, StageContract,
12};
13use crate::llm::{extract_llm_options, vm_call_llm_full, vm_value_to_json};
14use crate::tool_annotations::{SideEffectLevel, ToolAnnotations, ToolArgSchema, ToolKind};
15use crate::value::{VmError, VmValue};
16
17pub const WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY: &str = "workflow_verification_contracts";
18pub const WORKFLOW_VERIFICATION_SCOPE_METADATA_KEY: &str = "workflow_verification_scope";
19
20#[derive(Clone, Debug, Default, Serialize, Deserialize)]
21#[serde(default)]
22pub struct WorkflowNode {
23    pub id: Option<String>,
24    pub kind: String,
25    pub mode: Option<String>,
26    pub prompt: Option<String>,
27    pub system: Option<String>,
28    pub task_label: Option<String>,
29    pub done_sentinel: Option<String>,
30    pub tools: serde_json::Value,
31    pub model_policy: ModelPolicy,
32    /// Per-stage auto-compaction settings for the agent loop's context
33    /// window. Lifecycle operations (reset, fork, trim, compact) are NOT
34    /// expressible here — call the `agent_session_*` builtins before the
35    /// stage or in a prior stage.
36    pub auto_compact: AutoCompactPolicy,
37    /// Output visibility filter applied to the transcript after the
38    /// stage's agent loop exits. `"public"` / `"public_only"` drops
39    /// `tool_result` messages and non-public events. `None` or any
40    /// unknown string is a no-op.
41    #[serde(default)]
42    pub output_visibility: Option<String>,
43    pub context_policy: ContextPolicy,
44    pub retry_policy: RetryPolicy,
45    pub capability_policy: CapabilityPolicy,
46    pub approval_policy: super::ToolApprovalPolicy,
47    pub input_contract: StageContract,
48    pub output_contract: StageContract,
49    pub branch_semantics: BranchSemantics,
50    pub map_policy: MapPolicy,
51    pub join_policy: JoinPolicy,
52    pub reduce_policy: ReducePolicy,
53    pub escalation_policy: EscalationPolicy,
54    pub verify: Option<serde_json::Value>,
55    /// When true, the stage's agent loop gates the done sentinel on the most
56    /// recent `run()` tool call exiting cleanly (`exit_code == 0`). Use for
57    /// persistent execute stages that fold verification into the loop via a
58    /// shell-exec tool the model invokes explicitly.
59    #[serde(default)]
60    pub exit_when_verified: bool,
61    pub metadata: BTreeMap<String, serde_json::Value>,
62    #[serde(skip)]
63    pub raw_tools: Option<VmValue>,
64    /// Raw auto_compact VmValue dict — preserved for extracting closure
65    /// fields (compress_callback, mask_callback, custom_compactor) that
66    /// can't go through serde.
67    #[serde(skip)]
68    pub raw_auto_compact: Option<VmValue>,
69    /// Raw model_policy VmValue dict — preserved for extracting closure
70    /// fields (post_turn_callback) that can't go through serde.
71    #[serde(skip)]
72    pub raw_model_policy: Option<VmValue>,
73    /// Raw context_assembler VmValue dict — when set, the stage's
74    /// artifact context is packed through `assemble_context` before
75    /// rendering the system prompt. Closure fields (`ranker_callback`)
76    /// are preserved here because they can't round-trip through serde.
77    #[serde(skip)]
78    pub raw_context_assembler: Option<VmValue>,
79}
80
81impl PartialEq for WorkflowNode {
82    fn eq(&self, other: &Self) -> bool {
83        serde_json::to_value(self).ok() == serde_json::to_value(other).ok()
84    }
85}
86
87#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
88#[serde(default)]
89pub struct VerificationRequirement {
90    pub kind: String,
91    pub value: String,
92    pub note: Option<String>,
93}
94
95#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
96#[serde(default)]
97pub struct VerificationContract {
98    pub source_node: Option<String>,
99    pub summary: Option<String>,
100    pub command: Option<String>,
101    pub expect_status: Option<i64>,
102    pub assert_text: Option<String>,
103    pub expect_text: Option<String>,
104    pub required_identifiers: Vec<String>,
105    pub required_paths: Vec<String>,
106    pub required_text: Vec<String>,
107    pub notes: Vec<String>,
108    pub checks: Vec<VerificationRequirement>,
109}
110
111impl VerificationContract {
112    fn is_empty(&self) -> bool {
113        self.summary.is_none()
114            && self.command.is_none()
115            && self.expect_status.is_none()
116            && self.assert_text.is_none()
117            && self.expect_text.is_none()
118            && self.required_identifiers.is_empty()
119            && self.required_paths.is_empty()
120            && self.required_text.is_empty()
121            && self.notes.is_empty()
122            && self.checks.is_empty()
123    }
124}
125
126fn push_unique_string(values: &mut Vec<String>, value: &str) {
127    let trimmed = value.trim();
128    if trimmed.is_empty() {
129        return;
130    }
131    if !values.iter().any(|existing| existing == trimmed) {
132        values.push(trimmed.to_string());
133    }
134}
135
136fn push_unique_requirement(
137    values: &mut Vec<VerificationRequirement>,
138    kind: &str,
139    value: &str,
140    note: Option<&str>,
141) {
142    let trimmed_kind = kind.trim();
143    let trimmed_value = value.trim();
144    let trimmed_note = note
145        .map(str::trim)
146        .filter(|candidate| !candidate.is_empty())
147        .map(|candidate| candidate.to_string());
148    if trimmed_kind.is_empty() || trimmed_value.is_empty() {
149        return;
150    }
151    let candidate = VerificationRequirement {
152        kind: trimmed_kind.to_string(),
153        value: trimmed_value.to_string(),
154        note: trimmed_note,
155    };
156    if !values.iter().any(|existing| existing == &candidate) {
157        values.push(candidate);
158    }
159}
160
161fn json_string_list(value: Option<&serde_json::Value>) -> Vec<String> {
162    match value {
163        Some(serde_json::Value::String(text)) => {
164            let mut values = Vec::new();
165            push_unique_string(&mut values, text);
166            values
167        }
168        Some(serde_json::Value::Array(items)) => {
169            let mut values = Vec::new();
170            for item in items {
171                if let Some(text) = item.as_str() {
172                    push_unique_string(&mut values, text);
173                }
174            }
175            values
176        }
177        _ => Vec::new(),
178    }
179}
180
181fn merge_verification_requirement_list(
182    target: &mut Vec<VerificationRequirement>,
183    value: Option<&serde_json::Value>,
184) {
185    let Some(items) = value.and_then(|raw| raw.as_array()) else {
186        return;
187    };
188    for item in items {
189        let Some(object) = item.as_object() else {
190            continue;
191        };
192        let kind = object
193            .get("kind")
194            .and_then(|value| value.as_str())
195            .unwrap_or_default();
196        let value = object
197            .get("value")
198            .and_then(|value| value.as_str())
199            .unwrap_or_default();
200        let note = object
201            .get("note")
202            .or_else(|| object.get("description"))
203            .or_else(|| object.get("reason"))
204            .and_then(|value| value.as_str());
205        push_unique_requirement(target, kind, value, note);
206    }
207}
208
209fn merge_verification_contract_fields(
210    target: &mut VerificationContract,
211    object: &serde_json::Map<String, serde_json::Value>,
212) {
213    if target.summary.is_none() {
214        target.summary = object
215            .get("summary")
216            .and_then(|value| value.as_str())
217            .map(str::trim)
218            .filter(|value| !value.is_empty())
219            .map(|value| value.to_string());
220    }
221    if target.command.is_none() {
222        target.command = object
223            .get("command")
224            .and_then(|value| value.as_str())
225            .map(str::trim)
226            .filter(|value| !value.is_empty())
227            .map(|value| value.to_string());
228    }
229    if target.expect_status.is_none() {
230        target.expect_status = object.get("expect_status").and_then(|value| value.as_i64());
231    }
232    if target.assert_text.is_none() {
233        target.assert_text = object
234            .get("assert_text")
235            .and_then(|value| value.as_str())
236            .map(str::trim)
237            .filter(|value| !value.is_empty())
238            .map(|value| value.to_string());
239    }
240    if target.expect_text.is_none() {
241        target.expect_text = object
242            .get("expect_text")
243            .and_then(|value| value.as_str())
244            .map(str::trim)
245            .filter(|value| !value.is_empty())
246            .map(|value| value.to_string());
247    }
248
249    for value in json_string_list(
250        object
251            .get("required_identifiers")
252            .or_else(|| object.get("identifiers")),
253    ) {
254        push_unique_string(&mut target.required_identifiers, &value);
255    }
256    for value in json_string_list(object.get("required_paths").or_else(|| object.get("paths"))) {
257        push_unique_string(&mut target.required_paths, &value);
258    }
259    for value in json_string_list(
260        object
261            .get("required_text")
262            .or_else(|| object.get("exact_text"))
263            .or_else(|| object.get("required_strings")),
264    ) {
265        push_unique_string(&mut target.required_text, &value);
266    }
267    for value in json_string_list(object.get("notes")) {
268        push_unique_string(&mut target.notes, &value);
269    }
270    merge_verification_requirement_list(&mut target.checks, object.get("checks"));
271}
272
273fn load_verification_contract_file(path: &str) -> Result<serde_json::Value, VmError> {
274    let resolved = crate::stdlib::process::resolve_source_asset_path(path);
275    let contents = std::fs::read_to_string(&resolved).map_err(|error| {
276        VmError::Runtime(format!(
277            "workflow verification contract read failed for {}: {error}",
278            resolved.display()
279        ))
280    })?;
281    serde_json::from_str(&contents).map_err(|error| {
282        VmError::Runtime(format!(
283            "workflow verification contract parse failed for {}: {error}",
284            resolved.display()
285        ))
286    })
287}
288
289fn resolve_verification_contract_path(
290    verify: &serde_json::Map<String, serde_json::Value>,
291) -> Result<Option<serde_json::Value>, VmError> {
292    let Some(path) = verify
293        .get("contract_path")
294        .or_else(|| verify.get("verification_contract_path"))
295        .and_then(|value| value.as_str())
296        .map(str::trim)
297        .filter(|value| !value.is_empty())
298    else {
299        return Ok(None);
300    };
301    Ok(Some(load_verification_contract_file(path)?))
302}
303
304pub fn verification_contract_from_verify(
305    node_id: &str,
306    verify: Option<&serde_json::Value>,
307) -> Result<Option<VerificationContract>, VmError> {
308    let Some(verify_object) = verify.and_then(|value| value.as_object()) else {
309        return Ok(None);
310    };
311
312    let mut contract = VerificationContract {
313        source_node: Some(node_id.to_string()),
314        ..Default::default()
315    };
316
317    if let Some(file_contract) = resolve_verification_contract_path(verify_object)? {
318        let Some(object) = file_contract.as_object() else {
319            return Err(VmError::Runtime(
320                "workflow verification contract file must parse to a JSON object".to_string(),
321            ));
322        };
323        merge_verification_contract_fields(&mut contract, object);
324    }
325
326    if let Some(inline_contract) = verify_object.get("contract") {
327        let Some(object) = inline_contract.as_object() else {
328            return Err(VmError::Runtime(
329                "workflow verify.contract must be an object".to_string(),
330            ));
331        };
332        merge_verification_contract_fields(&mut contract, object);
333    }
334
335    merge_verification_contract_fields(&mut contract, verify_object);
336
337    if let Some(assert_text) = contract.assert_text.clone() {
338        push_unique_requirement(
339            &mut contract.checks,
340            "visible_text_contains",
341            &assert_text,
342            Some("verify stage requires visible output to contain this text"),
343        );
344    }
345    if let Some(expect_text) = contract.expect_text.clone() {
346        push_unique_requirement(
347            &mut contract.checks,
348            "combined_output_contains",
349            &expect_text,
350            Some("verify command requires combined stdout/stderr to contain this text"),
351        );
352    }
353    if let Some(expect_status) = contract.expect_status {
354        push_unique_requirement(
355            &mut contract.checks,
356            "expect_status",
357            &expect_status.to_string(),
358            Some("verify command exit status must match exactly"),
359        );
360    }
361    for identifier in contract.required_identifiers.clone() {
362        push_unique_requirement(
363            &mut contract.checks,
364            "identifier",
365            &identifier,
366            Some("use this exact identifier spelling"),
367        );
368    }
369    for path in contract.required_paths.clone() {
370        push_unique_requirement(
371            &mut contract.checks,
372            "path",
373            &path,
374            Some("preserve this exact path"),
375        );
376    }
377    for text in contract.required_text.clone() {
378        push_unique_requirement(
379            &mut contract.checks,
380            "text",
381            &text,
382            Some("required exact text or wiring snippet"),
383        );
384    }
385
386    if contract.is_empty() {
387        return Ok(None);
388    }
389    Ok(Some(contract))
390}
391
392fn push_unique_contract(values: &mut Vec<VerificationContract>, candidate: VerificationContract) {
393    if !values.iter().any(|existing| existing == &candidate) {
394        values.push(candidate);
395    }
396}
397
398pub fn workflow_verification_contracts(
399    graph: &WorkflowGraph,
400) -> Result<Vec<VerificationContract>, VmError> {
401    let mut contracts = Vec::new();
402    for (node_id, node) in &graph.nodes {
403        if let Some(contract) = verification_contract_from_verify(node_id, node.verify.as_ref())? {
404            push_unique_contract(&mut contracts, contract);
405        }
406    }
407    Ok(contracts)
408}
409
410pub fn inject_workflow_verification_contracts(
411    node: &mut WorkflowNode,
412    contracts: &[VerificationContract],
413) {
414    if contracts.is_empty() {
415        return;
416    }
417    node.metadata.insert(
418        WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY.to_string(),
419        serde_json::to_value(contracts).unwrap_or_default(),
420    );
421}
422
423pub fn stage_verification_contracts(
424    node_id: &str,
425    node: &WorkflowNode,
426) -> Result<Vec<VerificationContract>, VmError> {
427    let local_contract = verification_contract_from_verify(node_id, node.verify.as_ref())?;
428    let local_only = matches!(
429        node.metadata
430            .get(WORKFLOW_VERIFICATION_SCOPE_METADATA_KEY)
431            .and_then(|value| value.as_str()),
432        Some("local_only")
433    );
434    if local_only {
435        return Ok(local_contract.into_iter().collect());
436    }
437
438    let mut contracts = node
439        .metadata
440        .get(WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY)
441        .cloned()
442        .map(|value| {
443            serde_json::from_value::<Vec<VerificationContract>>(value).map_err(|error| {
444                VmError::Runtime(format!(
445                    "workflow stage {node_id} verification contract metadata parse failed: {error}"
446                ))
447            })
448        })
449        .transpose()?
450        .unwrap_or_default();
451
452    if let Some(local_contract) = local_contract {
453        push_unique_contract(&mut contracts, local_contract);
454    }
455    Ok(contracts)
456}
457
458pub fn workflow_tool_names(value: &serde_json::Value) -> Vec<String> {
459    match value {
460        serde_json::Value::Null => Vec::new(),
461        serde_json::Value::Array(items) => items
462            .iter()
463            .filter_map(|item| match item {
464                serde_json::Value::Object(map) => map
465                    .get("name")
466                    .and_then(|value| value.as_str())
467                    .filter(|name| !name.is_empty())
468                    .map(|name| name.to_string()),
469                _ => None,
470            })
471            .collect(),
472        serde_json::Value::Object(map) => {
473            if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
474                return map
475                    .get("tools")
476                    .map(workflow_tool_names)
477                    .unwrap_or_default();
478            }
479            map.get("name")
480                .and_then(|value| value.as_str())
481                .filter(|name| !name.is_empty())
482                .map(|name| vec![name.to_string()])
483                .unwrap_or_default()
484        }
485        _ => Vec::new(),
486    }
487}
488
489fn max_side_effect_level(levels: impl Iterator<Item = String>) -> Option<String> {
490    fn rank(v: &str) -> usize {
491        match v {
492            "none" => 0,
493            "read_only" => 1,
494            "workspace_write" => 2,
495            "process_exec" => 3,
496            "network" => 4,
497            _ => 5,
498        }
499    }
500    levels.max_by_key(|level| rank(level))
501}
502
503fn parse_tool_kind(value: Option<&serde_json::Value>) -> ToolKind {
504    match value.and_then(|v| v.as_str()).unwrap_or("") {
505        "read" => ToolKind::Read,
506        "edit" => ToolKind::Edit,
507        "delete" => ToolKind::Delete,
508        "move" => ToolKind::Move,
509        "search" => ToolKind::Search,
510        "execute" => ToolKind::Execute,
511        "think" => ToolKind::Think,
512        "fetch" => ToolKind::Fetch,
513        _ => ToolKind::Other,
514    }
515}
516
517fn parse_tool_annotations(map: &serde_json::Map<String, serde_json::Value>) -> ToolAnnotations {
518    let policy = map
519        .get("policy")
520        .and_then(|value| value.as_object())
521        .cloned()
522        .unwrap_or_default();
523
524    let capabilities = policy
525        .get("capabilities")
526        .and_then(|value| value.as_object())
527        .map(|caps| {
528            caps.iter()
529                .map(|(capability, ops)| {
530                    let values = ops
531                        .as_array()
532                        .map(|items| {
533                            items
534                                .iter()
535                                .filter_map(|item| item.as_str().map(|s| s.to_string()))
536                                .collect::<Vec<_>>()
537                        })
538                        .unwrap_or_default();
539                    (capability.clone(), values)
540                })
541                .collect::<BTreeMap<_, _>>()
542        })
543        .unwrap_or_default();
544
545    // Accept both the structured `policy.arg_schema` object and the legacy
546    // flat fields on `policy` so pipelines can migrate gradually.
547    let arg_schema = if let Some(schema) = policy.get("arg_schema") {
548        serde_json::from_value::<ToolArgSchema>(schema.clone()).unwrap_or_default()
549    } else {
550        ToolArgSchema {
551            path_params: policy
552                .get("path_params")
553                .and_then(|value| value.as_array())
554                .map(|items| {
555                    items
556                        .iter()
557                        .filter_map(|item| item.as_str().map(|s| s.to_string()))
558                        .collect::<Vec<_>>()
559                })
560                .unwrap_or_default(),
561            arg_aliases: policy
562                .get("arg_aliases")
563                .and_then(|value| value.as_object())
564                .map(|aliases| {
565                    aliases
566                        .iter()
567                        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
568                        .collect::<BTreeMap<_, _>>()
569                })
570                .unwrap_or_default(),
571            required: policy
572                .get("required")
573                .and_then(|value| value.as_array())
574                .map(|items| {
575                    items
576                        .iter()
577                        .filter_map(|item| item.as_str().map(|s| s.to_string()))
578                        .collect::<Vec<_>>()
579                })
580                .unwrap_or_default(),
581        }
582    };
583
584    let kind = parse_tool_kind(policy.get("kind"));
585    let side_effect_level = policy
586        .get("side_effect_level")
587        .and_then(|value| value.as_str())
588        .map(SideEffectLevel::parse)
589        .unwrap_or_default();
590
591    ToolAnnotations {
592        kind,
593        side_effect_level,
594        arg_schema,
595        capabilities,
596        emits_artifacts: policy
597            .get("emits_artifacts")
598            .and_then(|value| value.as_bool())
599            .unwrap_or(false),
600        result_readers: policy
601            .get("result_readers")
602            .or_else(|| policy.get("readable_result_routes"))
603            .and_then(|value| value.as_array())
604            .map(|items| {
605                items
606                    .iter()
607                    .filter_map(|item| item.as_str().map(|s| s.to_string()))
608                    .collect::<Vec<_>>()
609            })
610            .unwrap_or_default(),
611        inline_result: policy
612            .get("inline_result")
613            .and_then(|value| value.as_bool())
614            .unwrap_or(false),
615    }
616}
617
618pub fn workflow_tool_annotations(value: &serde_json::Value) -> BTreeMap<String, ToolAnnotations> {
619    match value {
620        serde_json::Value::Null => BTreeMap::new(),
621        serde_json::Value::Array(items) => items
622            .iter()
623            .filter_map(|item| match item {
624                serde_json::Value::Object(map) => map
625                    .get("name")
626                    .and_then(|value| value.as_str())
627                    .filter(|name| !name.is_empty())
628                    .map(|name| (name.to_string(), parse_tool_annotations(map))),
629                _ => None,
630            })
631            .collect(),
632        serde_json::Value::Object(map) => {
633            if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
634                return map
635                    .get("tools")
636                    .map(workflow_tool_annotations)
637                    .unwrap_or_default();
638            }
639            map.get("name")
640                .and_then(|value| value.as_str())
641                .filter(|name| !name.is_empty())
642                .map(|name| {
643                    let mut annotations = BTreeMap::new();
644                    annotations.insert(name.to_string(), parse_tool_annotations(map));
645                    annotations
646                })
647                .unwrap_or_default()
648        }
649        _ => BTreeMap::new(),
650    }
651}
652
653pub fn workflow_tool_policy_from_tools(value: &serde_json::Value) -> CapabilityPolicy {
654    let tools = workflow_tool_names(value);
655    let tool_annotations = workflow_tool_annotations(value);
656    let mut capabilities: BTreeMap<String, Vec<String>> = BTreeMap::new();
657    for annotations in tool_annotations.values() {
658        for (capability, ops) in &annotations.capabilities {
659            let entry = capabilities.entry(capability.clone()).or_default();
660            for op in ops {
661                if !entry.contains(op) {
662                    entry.push(op.clone());
663                }
664            }
665            entry.sort();
666        }
667    }
668    let side_effect_level = max_side_effect_level(
669        tool_annotations
670            .values()
671            .map(|annotations| annotations.side_effect_level.as_str().to_string())
672            .filter(|level| level != "none"),
673    );
674    CapabilityPolicy {
675        tools,
676        capabilities,
677        workspace_roots: Vec::new(),
678        side_effect_level,
679        recursion_limit: None,
680        tool_arg_constraints: Vec::new(),
681        tool_annotations,
682    }
683}
684
685#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
686#[serde(default)]
687pub struct WorkflowEdge {
688    pub from: String,
689    pub to: String,
690    pub branch: Option<String>,
691    pub label: Option<String>,
692}
693
694#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
695#[serde(default)]
696pub struct WorkflowGraph {
697    #[serde(rename = "_type")]
698    pub type_name: String,
699    pub id: String,
700    pub name: Option<String>,
701    pub version: usize,
702    pub entry: String,
703    pub nodes: BTreeMap<String, WorkflowNode>,
704    pub edges: Vec<WorkflowEdge>,
705    pub capability_policy: CapabilityPolicy,
706    pub approval_policy: super::ToolApprovalPolicy,
707    pub metadata: BTreeMap<String, serde_json::Value>,
708    pub audit_log: Vec<WorkflowAuditEntry>,
709}
710
711#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
712#[serde(default)]
713pub struct WorkflowAuditEntry {
714    pub id: String,
715    pub op: String,
716    pub node_id: Option<String>,
717    pub timestamp: String,
718    pub reason: Option<String>,
719    pub metadata: BTreeMap<String, serde_json::Value>,
720}
721
722#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
723#[serde(default)]
724pub struct WorkflowValidationReport {
725    pub valid: bool,
726    pub errors: Vec<String>,
727    pub warnings: Vec<String>,
728    pub reachable_nodes: Vec<String>,
729}
730
731pub fn parse_workflow_node_value(value: &VmValue, label: &str) -> Result<WorkflowNode, VmError> {
732    let mut node: WorkflowNode = super::parse_json_payload(vm_value_to_json(value), label)?;
733    let dict = value.as_dict();
734    node.raw_tools = dict.and_then(|d| d.get("tools")).cloned();
735    node.raw_auto_compact = dict.and_then(|d| d.get("auto_compact")).cloned();
736    node.raw_model_policy = dict.and_then(|d| d.get("model_policy")).cloned();
737    node.raw_context_assembler = dict.and_then(|d| d.get("context_assembler")).cloned();
738    Ok(node)
739}
740
741pub fn parse_workflow_node_json(
742    json: serde_json::Value,
743    label: &str,
744) -> Result<WorkflowNode, VmError> {
745    super::parse_json_payload(json, label)
746}
747
748pub fn parse_workflow_edge_json(
749    json: serde_json::Value,
750    label: &str,
751) -> Result<WorkflowEdge, VmError> {
752    super::parse_json_payload(json, label)
753}
754
755pub fn normalize_workflow_value(value: &VmValue) -> Result<WorkflowGraph, VmError> {
756    let mut graph: WorkflowGraph = super::parse_json_value(value)?;
757    let as_dict = value.as_dict().cloned().unwrap_or_default();
758
759    if graph.nodes.is_empty() {
760        for key in ["act", "verify", "repair"] {
761            if let Some(node_value) = as_dict.get(key) {
762                let mut node = parse_workflow_node_value(node_value, "orchestration")?;
763                let raw_node = node_value.as_dict().cloned().unwrap_or_default();
764                node.id = Some(key.to_string());
765                if node.kind.is_empty() {
766                    node.kind = if key == "verify" {
767                        "verify".to_string()
768                    } else {
769                        "stage".to_string()
770                    };
771                }
772                if node.model_policy.provider.is_none() {
773                    node.model_policy.provider = as_dict
774                        .get("provider")
775                        .map(|value| value.display())
776                        .filter(|value| !value.is_empty());
777                }
778                if node.model_policy.model.is_none() {
779                    node.model_policy.model = as_dict
780                        .get("model")
781                        .map(|value| value.display())
782                        .filter(|value| !value.is_empty());
783                }
784                if node.model_policy.model_tier.is_none() {
785                    node.model_policy.model_tier = as_dict
786                        .get("model_tier")
787                        .or_else(|| as_dict.get("tier"))
788                        .map(|value| value.display())
789                        .filter(|value| !value.is_empty());
790                }
791                if node.model_policy.temperature.is_none() {
792                    node.model_policy.temperature = as_dict.get("temperature").and_then(|value| {
793                        if let VmValue::Float(number) = value {
794                            Some(*number)
795                        } else {
796                            value.as_int().map(|number| number as f64)
797                        }
798                    });
799                }
800                if node.model_policy.max_tokens.is_none() {
801                    node.model_policy.max_tokens =
802                        as_dict.get("max_tokens").and_then(|value| value.as_int());
803                }
804                if node.mode.is_none() {
805                    node.mode = as_dict
806                        .get("mode")
807                        .map(|value| value.display())
808                        .filter(|value| !value.is_empty());
809                }
810                if node.done_sentinel.is_none() {
811                    node.done_sentinel = as_dict
812                        .get("done_sentinel")
813                        .map(|value| value.display())
814                        .filter(|value| !value.is_empty());
815                }
816                if key == "verify"
817                    && node.verify.is_none()
818                    && (raw_node.contains_key("assert_text")
819                        || raw_node.contains_key("command")
820                        || raw_node.contains_key("expect_status")
821                        || raw_node.contains_key("expect_text"))
822                {
823                    node.verify = Some(serde_json::json!({
824                        "assert_text": raw_node.get("assert_text").map(vm_value_to_json),
825                        "command": raw_node.get("command").map(vm_value_to_json),
826                        "expect_status": raw_node.get("expect_status").map(vm_value_to_json),
827                        "expect_text": raw_node.get("expect_text").map(vm_value_to_json),
828                    }));
829                }
830                graph.nodes.insert(key.to_string(), node);
831            }
832        }
833        if graph.entry.is_empty() && graph.nodes.contains_key("act") {
834            graph.entry = "act".to_string();
835        }
836        if graph.edges.is_empty() && graph.nodes.contains_key("act") {
837            if graph.nodes.contains_key("verify") {
838                graph.edges.push(WorkflowEdge {
839                    from: "act".to_string(),
840                    to: "verify".to_string(),
841                    branch: None,
842                    label: None,
843                });
844            }
845            if graph.nodes.contains_key("repair") {
846                graph.edges.push(WorkflowEdge {
847                    from: "verify".to_string(),
848                    to: "repair".to_string(),
849                    branch: Some("failed".to_string()),
850                    label: None,
851                });
852                graph.edges.push(WorkflowEdge {
853                    from: "repair".to_string(),
854                    to: "verify".to_string(),
855                    branch: Some("retry".to_string()),
856                    label: None,
857                });
858            }
859        }
860    }
861
862    if graph.type_name.is_empty() {
863        graph.type_name = "workflow_graph".to_string();
864    }
865    if graph.id.is_empty() {
866        graph.id = new_id("workflow");
867    }
868    if graph.version == 0 {
869        graph.version = 1;
870    }
871    if graph.entry.is_empty() {
872        graph.entry = graph
873            .nodes
874            .keys()
875            .next()
876            .cloned()
877            .unwrap_or_else(|| "act".to_string());
878    }
879    for (node_id, node) in &mut graph.nodes {
880        if node.raw_tools.is_none() {
881            node.raw_tools = as_dict
882                .get("nodes")
883                .and_then(|nodes| nodes.as_dict())
884                .and_then(|nodes| nodes.get(node_id))
885                .and_then(|node_value| node_value.as_dict())
886                .and_then(|raw_node| raw_node.get("tools"))
887                .cloned();
888        }
889        if node.id.is_none() {
890            node.id = Some(node_id.clone());
891        }
892        if node.kind.is_empty() {
893            node.kind = "stage".to_string();
894        }
895        if node.join_policy.strategy.is_empty() {
896            node.join_policy.strategy = "all".to_string();
897        }
898        if node.reduce_policy.strategy.is_empty() {
899            node.reduce_policy.strategy = "concat".to_string();
900        }
901        if node.output_contract.output_kinds.is_empty() {
902            node.output_contract.output_kinds = vec![match node.kind.as_str() {
903                "verify" => "verification_result".to_string(),
904                "reduce" => node
905                    .reduce_policy
906                    .output_kind
907                    .clone()
908                    .unwrap_or_else(|| "summary".to_string()),
909                "map" => node
910                    .map_policy
911                    .output_kind
912                    .clone()
913                    .unwrap_or_else(|| "artifact".to_string()),
914                "escalation" => "plan".to_string(),
915                _ => "artifact".to_string(),
916            }];
917        }
918        if node.retry_policy.max_attempts == 0 {
919            node.retry_policy.max_attempts = 1;
920        }
921    }
922    Ok(graph)
923}
924
925pub fn validate_workflow(
926    graph: &WorkflowGraph,
927    ceiling: Option<&CapabilityPolicy>,
928) -> WorkflowValidationReport {
929    let mut errors = Vec::new();
930    let mut warnings = Vec::new();
931
932    if !graph.nodes.contains_key(&graph.entry) {
933        errors.push(format!("entry node does not exist: {}", graph.entry));
934    }
935
936    let node_ids: BTreeSet<String> = graph.nodes.keys().cloned().collect();
937    for edge in &graph.edges {
938        if !node_ids.contains(&edge.from) {
939            errors.push(format!("edge.from references unknown node: {}", edge.from));
940        }
941        if !node_ids.contains(&edge.to) {
942            errors.push(format!("edge.to references unknown node: {}", edge.to));
943        }
944    }
945
946    let reachable_nodes = reachable_nodes(graph);
947    for node_id in &node_ids {
948        if !reachable_nodes.contains(node_id) {
949            warnings.push(format!("node is unreachable: {node_id}"));
950        }
951    }
952
953    for (node_id, node) in &graph.nodes {
954        let incoming = graph
955            .edges
956            .iter()
957            .filter(|edge| edge.to == *node_id)
958            .count();
959        let outgoing: Vec<&WorkflowEdge> = graph
960            .edges
961            .iter()
962            .filter(|edge| edge.from == *node_id)
963            .collect();
964        if let Some(min_inputs) = node.input_contract.min_inputs {
965            if let Some(max_inputs) = node.input_contract.max_inputs {
966                if min_inputs > max_inputs {
967                    errors.push(format!(
968                        "node {node_id}: input contract min_inputs exceeds max_inputs"
969                    ));
970                }
971            }
972        }
973        match node.kind.as_str() {
974            "condition" => {
975                let has_true = outgoing
976                    .iter()
977                    .any(|edge| edge.branch.as_deref() == Some("true"));
978                let has_false = outgoing
979                    .iter()
980                    .any(|edge| edge.branch.as_deref() == Some("false"));
981                if !has_true || !has_false {
982                    errors.push(format!(
983                        "node {node_id}: condition nodes require both 'true' and 'false' branch edges"
984                    ));
985                }
986            }
987            "fork" if outgoing.len() < 2 => {
988                errors.push(format!(
989                    "node {node_id}: fork nodes require at least two outgoing edges"
990                ));
991            }
992            "join" if incoming < 2 => {
993                warnings.push(format!(
994                    "node {node_id}: join node has fewer than two incoming edges"
995                ));
996            }
997            "map"
998                if node.map_policy.items.is_empty()
999                    && node.map_policy.item_artifact_kind.is_none()
1000                    && node.input_contract.input_kinds.is_empty() =>
1001            {
1002                errors.push(format!(
1003                    "node {node_id}: map nodes require items, item_artifact_kind, or input_contract.input_kinds"
1004                ));
1005            }
1006            "reduce" if node.input_contract.input_kinds.is_empty() => {
1007                warnings.push(format!(
1008                    "node {node_id}: reduce node has no input_contract.input_kinds; it will consume all available artifacts"
1009                ));
1010            }
1011            _ => {}
1012        }
1013    }
1014
1015    if let Some(ceiling) = ceiling {
1016        if let Err(error) = ceiling.intersect(&graph.capability_policy) {
1017            errors.push(error);
1018        }
1019        for (node_id, node) in &graph.nodes {
1020            if let Err(error) = ceiling.intersect(&node.capability_policy) {
1021                errors.push(format!("node {node_id}: {error}"));
1022            }
1023        }
1024    }
1025
1026    for diagnostic in crate::tool_surface::validate_workflow_graph(graph) {
1027        let message = format!("{}: {}", diagnostic.code, diagnostic.message);
1028        match diagnostic.severity {
1029            crate::tool_surface::ToolSurfaceSeverity::Error => errors.push(message),
1030            crate::tool_surface::ToolSurfaceSeverity::Warning => warnings.push(message),
1031        }
1032    }
1033
1034    WorkflowValidationReport {
1035        valid: errors.is_empty(),
1036        errors,
1037        warnings,
1038        reachable_nodes: reachable_nodes.into_iter().collect(),
1039    }
1040}
1041
1042fn reachable_nodes(graph: &WorkflowGraph) -> BTreeSet<String> {
1043    let mut seen = BTreeSet::new();
1044    let mut stack = vec![graph.entry.clone()];
1045    while let Some(node_id) = stack.pop() {
1046        if !seen.insert(node_id.clone()) {
1047            continue;
1048        }
1049        for edge in graph.edges.iter().filter(|edge| edge.from == node_id) {
1050            stack.push(edge.to.clone());
1051        }
1052    }
1053    seen
1054}
1055
1056/// Pick the session id a stage should run under. Prefers an explicit
1057/// `session_id` on the node's `model_policy` dict (so pipelines with
1058/// `agent_session_open` / `agent_session_fork` flowing through a graph
1059/// line up); falls back to a stable, node-derived id so multi-stage
1060/// graphs with no explicit session share a conversation across stages.
1061/// Per-stage skill registry. Per-node `model_policy.skills` takes
1062/// precedence over the workflow-level `run_options.skills` — authors
1063/// can scope a skill set to one stage without affecting siblings. When
1064/// neither is set, returns `None` so the agent loop runs without
1065/// skill matching (preserves pre-Gap-2 behavior for callers that
1066/// didn't opt in).
1067fn resolve_stage_skill_registry(node: &WorkflowNode) -> Option<VmValue> {
1068    let per_node = node
1069        .raw_model_policy
1070        .as_ref()
1071        .and_then(|v| v.as_dict())
1072        .and_then(|d| d.get("skills"))
1073        .cloned()
1074        .and_then(normalize_inline_registry);
1075    if per_node.is_some() {
1076        return per_node;
1077    }
1078    super::current_workflow_skill_context().and_then(|ctx| ctx.registry)
1079}
1080
1081/// Mirror of `resolve_stage_skill_registry` for the match config:
1082/// per-node `model_policy.skill_match` wins, falling back to the
1083/// workflow-level setting.
1084fn resolve_stage_skill_match(node: &WorkflowNode) -> crate::llm::SkillMatchConfig {
1085    let per_node = node
1086        .raw_model_policy
1087        .as_ref()
1088        .and_then(|v| v.as_dict())
1089        .and_then(|d| d.get("skill_match"))
1090        .and_then(|v| v.as_dict().cloned());
1091    if let Some(dict) = per_node {
1092        return crate::llm::parse_skill_match_config_dict(&dict);
1093    }
1094    super::current_workflow_skill_context()
1095        .and_then(|ctx| ctx.match_config)
1096        .and_then(|v| v.as_dict().cloned())
1097        .map(|d| crate::llm::parse_skill_match_config_dict(&d))
1098        .unwrap_or_default()
1099}
1100
1101/// Accept both a validated `skill_registry` dict and a bare list of
1102/// skill entries. The workflow-level parser in `register.rs` does the
1103/// same — we duplicate here so per-node `model_policy.skills` settings
1104/// (not routed through that parser) also benefit.
1105fn normalize_inline_registry(value: VmValue) -> Option<VmValue> {
1106    use std::collections::BTreeMap;
1107    use std::rc::Rc;
1108    match &value {
1109        VmValue::Dict(d)
1110            if d.get("_type")
1111                .map(|v| v.display() == "skill_registry")
1112                .unwrap_or(false) =>
1113        {
1114            Some(value)
1115        }
1116        VmValue::List(list) => {
1117            let mut dict = BTreeMap::new();
1118            dict.insert(
1119                "_type".to_string(),
1120                VmValue::String(Rc::from("skill_registry")),
1121            );
1122            dict.insert("skills".to_string(), VmValue::List(list.clone()));
1123            Some(VmValue::Dict(Rc::new(dict)))
1124        }
1125        _ => None,
1126    }
1127}
1128
1129fn resolve_node_session_id(node: &WorkflowNode) -> String {
1130    if let Some(explicit) = node
1131        .raw_model_policy
1132        .as_ref()
1133        .and_then(|v| v.as_dict())
1134        .and_then(|d| d.get("session_id"))
1135        .and_then(|v| match v {
1136            VmValue::String(s) if !s.trim().is_empty() => Some(s.to_string()),
1137            _ => None,
1138        })
1139    {
1140        return explicit;
1141    }
1142    if let Some(persisted) = node
1143        .metadata
1144        .get("worker_session_id")
1145        .and_then(|value| value.as_str())
1146        .filter(|value| !value.trim().is_empty())
1147    {
1148        return persisted.to_string();
1149    }
1150    format!("workflow_stage_{}", uuid::Uuid::now_v7())
1151}
1152
1153fn raw_auto_compact_dict(
1154    node: &WorkflowNode,
1155) -> Option<&std::collections::BTreeMap<String, VmValue>> {
1156    node.raw_auto_compact
1157        .as_ref()
1158        .and_then(|value| value.as_dict())
1159}
1160
1161fn raw_auto_compact_int(node: &WorkflowNode, key: &str) -> Option<usize> {
1162    raw_auto_compact_dict(node)
1163        .and_then(|dict| dict.get(key))
1164        .and_then(|value| value.as_int())
1165        .filter(|value| *value >= 0)
1166        .map(|value| value as usize)
1167}
1168
1169fn raw_auto_compact_string(node: &WorkflowNode, key: &str) -> Option<String> {
1170    raw_auto_compact_dict(node)
1171        .and_then(|dict| dict.get(key))
1172        .and_then(|value| match value {
1173            VmValue::String(text) if !text.trim().is_empty() => Some(text.to_string()),
1174            _ => None,
1175        })
1176}
1177
1178pub(crate) async fn resolve_stage_auto_compact(
1179    node: &WorkflowNode,
1180    opts: &crate::llm::api::LlmCallOptions,
1181) -> Result<Option<crate::orchestration::AutoCompactConfig>, VmError> {
1182    if !node.auto_compact.enabled {
1183        return Ok(None);
1184    }
1185
1186    let mut ac = crate::orchestration::AutoCompactConfig::default();
1187    if let Some(v) = node.auto_compact.token_threshold {
1188        ac.token_threshold = v;
1189    }
1190    if let Some(v) = node.auto_compact.tool_output_max_chars {
1191        ac.tool_output_max_chars = v;
1192    }
1193    if let Some(ref strategy) = node.auto_compact.compact_strategy {
1194        if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
1195            ac.compact_strategy = s;
1196        }
1197    }
1198    if let Some(v) = node.auto_compact.hard_limit_tokens {
1199        ac.hard_limit_tokens = Some(v);
1200    }
1201    if let Some(ref strategy) = node.auto_compact.hard_limit_strategy {
1202        if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
1203            ac.hard_limit_strategy = s;
1204        }
1205    }
1206
1207    // Workflow nodes keep the richer agent-loop-only compaction knobs in the
1208    // raw dict because the typed policy shape intentionally models only the
1209    // common workflow fields.
1210    if let Some(v) = raw_auto_compact_int(node, "compact_keep_last")
1211        .or_else(|| raw_auto_compact_int(node, "keep_last"))
1212    {
1213        ac.keep_last = v;
1214    }
1215    if let Some(prompt) = raw_auto_compact_string(node, "summarize_prompt") {
1216        ac.summarize_prompt = Some(prompt);
1217    }
1218
1219    // Closure fields can't round-trip through serde, so extract them directly
1220    // from the raw VmValue dict.
1221    if let Some(dict) = raw_auto_compact_dict(node) {
1222        if let Some(cb) = dict.get("compress_callback") {
1223            ac.compress_callback = Some(cb.clone());
1224        }
1225        if let Some(cb) = dict.get("mask_callback") {
1226            ac.mask_callback = Some(cb.clone());
1227        }
1228        if let Some(cb) = dict.get("custom_compactor") {
1229            ac.custom_compactor = Some(cb.clone());
1230        }
1231    }
1232
1233    let user_specified_threshold = node.auto_compact.token_threshold.is_some();
1234    let user_specified_hard_limit = node.auto_compact.hard_limit_tokens.is_some();
1235    crate::llm::api::adapt_auto_compact_to_provider(
1236        &mut ac,
1237        user_specified_threshold,
1238        user_specified_hard_limit,
1239        &opts.provider,
1240        &opts.model,
1241        &opts.api_key,
1242    )
1243    .await;
1244
1245    Ok(Some(ac))
1246}
1247
1248pub async fn execute_stage_node(
1249    node_id: &str,
1250    node: &WorkflowNode,
1251    task: &str,
1252    artifacts: &[ArtifactRecord],
1253) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
1254    let mut selection_policy = node.context_policy.clone();
1255    if selection_policy.include_kinds.is_empty() && !node.input_contract.input_kinds.is_empty() {
1256        selection_policy.include_kinds = node.input_contract.input_kinds.clone();
1257    }
1258    let selected = super::select_artifacts_adaptive(artifacts.to_vec(), &selection_policy);
1259    let rendered_context = if let Some(assembler) = node.raw_context_assembler.as_ref() {
1260        let assembled =
1261            crate::stdlib::assemble::assemble_from_options(&selected, assembler).await?;
1262        super::render_assembled_chunks(&assembled)
1263    } else {
1264        super::render_artifacts_context(&selected, &node.context_policy)
1265    };
1266    let verification_contracts = super::stage_verification_contracts(node_id, node)?;
1267    let rendered_verification = super::render_verification_context(&verification_contracts);
1268    let stage_session_id = resolve_node_session_id(node);
1269    if node.input_contract.require_transcript && !crate::agent_sessions::exists(&stage_session_id) {
1270        return Err(VmError::Runtime(format!(
1271            "workflow stage {node_id} requires an existing session \
1272             (call agent_session_open and feed session_id through model_policy \
1273             before entering this stage)"
1274        )));
1275    }
1276    if let Some(min_inputs) = node.input_contract.min_inputs {
1277        if selected.len() < min_inputs {
1278            return Err(VmError::Runtime(format!(
1279                "workflow stage {node_id} requires at least {min_inputs} input artifacts"
1280            )));
1281        }
1282    }
1283    if let Some(max_inputs) = node.input_contract.max_inputs {
1284        if selected.len() > max_inputs {
1285            return Err(VmError::Runtime(format!(
1286                "workflow stage {node_id} accepts at most {max_inputs} input artifacts"
1287            )));
1288        }
1289    }
1290    let prompt = super::render_workflow_prompt(
1291        task,
1292        node.task_label.as_deref(),
1293        &rendered_verification,
1294        &rendered_context,
1295    );
1296
1297    // Precedence for the tool-calling contract format:
1298    //   1. explicit `model_policy.tool_format` on the node
1299    //   2. `HARN_AGENT_TOOL_FORMAT` env override
1300    //   3. provider/model default
1301    // Mirrors the top-level agent_loop / llm_call resolution so workflow
1302    // authors can pin `tool_format: "native"` per-stage and have it
1303    // reach the inner agent loop.
1304    let tool_format = node
1305        .model_policy
1306        .tool_format
1307        .clone()
1308        .filter(|value| !value.trim().is_empty())
1309        .or_else(|| {
1310            std::env::var("HARN_AGENT_TOOL_FORMAT")
1311                .ok()
1312                .filter(|value| !value.trim().is_empty())
1313        })
1314        .unwrap_or_else(|| {
1315            let model = std::env::var("HARN_LLM_MODEL").unwrap_or_default();
1316            let provider = std::env::var("HARN_LLM_PROVIDER").unwrap_or_default();
1317            crate::llm_config::default_tool_format(&model, &provider)
1318        });
1319    let mut llm_result = if node.kind == "verify" {
1320        if let Some(command) = node
1321            .verify
1322            .as_ref()
1323            .and_then(|verify| verify.as_object())
1324            .and_then(|verify| verify.get("command"))
1325            .and_then(|value| value.as_str())
1326            .map(str::trim)
1327            .filter(|value| !value.is_empty())
1328        {
1329            let (program, args) = if cfg!(target_os = "windows") {
1330                ("cmd", vec!["/C".to_string(), command.to_string()])
1331            } else {
1332                ("/bin/sh", vec!["-lc".to_string(), command.to_string()])
1333            };
1334            let mut process_config = crate::stdlib::sandbox::ProcessCommandConfig {
1335                stdin_null: true,
1336                ..Default::default()
1337            };
1338            if let Some(context) = crate::stdlib::process::current_execution_context() {
1339                if let Some(cwd) = context.cwd.filter(|cwd| !cwd.is_empty()) {
1340                    crate::stdlib::sandbox::enforce_process_cwd(std::path::Path::new(&cwd))?;
1341                    process_config.cwd = Some(std::path::PathBuf::from(cwd));
1342                }
1343                if !context.env.is_empty() {
1344                    process_config.env.extend(context.env);
1345                }
1346            }
1347            let output = crate::stdlib::sandbox::command_output(program, &args, &process_config)?;
1348            let stdout = String::from_utf8_lossy(&output.stdout).to_string();
1349            let stderr = String::from_utf8_lossy(&output.stderr).to_string();
1350            let combined = if stderr.is_empty() {
1351                stdout.clone()
1352            } else if stdout.is_empty() {
1353                stderr.clone()
1354            } else {
1355                format!("{stdout}\n{stderr}")
1356            };
1357            serde_json::json!({
1358                "status": "completed",
1359                "text": combined,
1360                "visible_text": combined,
1361                "command": command,
1362                "stdout": stdout,
1363                "stderr": stderr,
1364                "exit_status": output.status.code().unwrap_or(-1),
1365                "success": output.status.success(),
1366            })
1367        } else {
1368            serde_json::json!({
1369                "status": "completed",
1370                "text": "",
1371                "visible_text": "",
1372            })
1373        }
1374    } else {
1375        let mut options = BTreeMap::new();
1376        if let Some(provider) = &node.model_policy.provider {
1377            options.insert(
1378                "provider".to_string(),
1379                VmValue::String(Rc::from(provider.clone())),
1380            );
1381        }
1382        if let Some(model) = &node.model_policy.model {
1383            options.insert(
1384                "model".to_string(),
1385                VmValue::String(Rc::from(model.clone())),
1386            );
1387        }
1388        if let Some(model_tier) = &node.model_policy.model_tier {
1389            options.insert(
1390                "model_tier".to_string(),
1391                VmValue::String(Rc::from(model_tier.clone())),
1392            );
1393        }
1394        if let Some(temperature) = node.model_policy.temperature {
1395            options.insert("temperature".to_string(), VmValue::Float(temperature));
1396        }
1397        if let Some(max_tokens) = node.model_policy.max_tokens {
1398            options.insert("max_tokens".to_string(), VmValue::Int(max_tokens));
1399        }
1400        let tool_names = workflow_tool_names(&node.tools);
1401        let tools_value = node.raw_tools.clone().or_else(|| {
1402            if matches!(node.tools, serde_json::Value::Null) {
1403                None
1404            } else {
1405                Some(crate::stdlib::json_to_vm_value(&node.tools))
1406            }
1407        });
1408        if tools_value.is_some() && !tool_names.is_empty() {
1409            options.insert("tools".to_string(), tools_value.unwrap_or(VmValue::Nil));
1410        }
1411        options.insert(
1412            "session_id".to_string(),
1413            VmValue::String(Rc::from(stage_session_id.clone())),
1414        );
1415
1416        let args = vec![
1417            VmValue::String(Rc::from(prompt.clone())),
1418            node.system
1419                .clone()
1420                .map(|s| VmValue::String(Rc::from(s)))
1421                .unwrap_or(VmValue::Nil),
1422            VmValue::Dict(Rc::new(options)),
1423        ];
1424        let mut opts = extract_llm_options(&args)?;
1425        let auto_compact = resolve_stage_auto_compact(node, &opts).await?;
1426
1427        if node.mode.as_deref() == Some("agent") || !tool_names.is_empty() {
1428            let tool_policy = workflow_tool_policy_from_tools(&node.tools);
1429            let effective_policy = tool_policy
1430                .intersect(&node.capability_policy)
1431                .map_err(VmError::Runtime)?;
1432            let permissions = crate::llm::permissions::parse_dynamic_permission_policy(
1433                node.raw_model_policy
1434                    .as_ref()
1435                    .and_then(|value| value.as_dict())
1436                    .and_then(|dict| dict.get("permissions")),
1437                "workflow model_policy",
1438            )?;
1439            crate::llm::run_agent_loop_internal(
1440                &mut opts,
1441                crate::llm::AgentLoopConfig {
1442                    persistent: true,
1443                    max_iterations: node.model_policy.max_iterations.unwrap_or(16),
1444                    max_nudges: node.model_policy.max_nudges.unwrap_or(3),
1445                    nudge: node.model_policy.nudge.clone(),
1446                    done_sentinel: node.done_sentinel.clone(),
1447                    break_unless_phase: None,
1448                    tool_retries: 0,
1449                    tool_backoff_ms: 1000,
1450                    tool_format: tool_format.clone(),
1451                    native_tool_fallback: node.model_policy.native_tool_fallback,
1452                    auto_compact,
1453                    policy: Some(effective_policy),
1454                    command_policy: crate::orchestration::parse_command_policy_value(
1455                        node.raw_model_policy
1456                            .as_ref()
1457                            .and_then(|value| value.as_dict())
1458                            .and_then(|dict| dict.get("command_policy"))
1459                            .or_else(|| {
1460                                node.raw_model_policy
1461                                    .as_ref()
1462                                    .and_then(|value| value.as_dict())
1463                                    .and_then(|dict| dict.get("policy"))
1464                                    .and_then(|value| value.as_dict())
1465                                    .and_then(|policy| policy.get("command_policy"))
1466                            }),
1467                        "workflow model_policy",
1468                    )?,
1469                    permissions,
1470                    approval_policy: Some(node.approval_policy.clone()),
1471                    daemon: false,
1472                    daemon_config: Default::default(),
1473                    llm_retries: 2,
1474                    llm_backoff_ms: 2000,
1475                    token_budget: None,
1476                    exit_when_verified: node.exit_when_verified,
1477                    loop_detect_warn: 2,
1478                    loop_detect_block: 3,
1479                    loop_detect_skip: 4,
1480                    tool_examples: node.model_policy.tool_examples.clone(),
1481                    turn_policy: node.model_policy.turn_policy.clone(),
1482                    stop_after_successful_tools: node
1483                        .model_policy
1484                        .stop_after_successful_tools
1485                        .clone(),
1486                    require_successful_tools: node.model_policy.require_successful_tools.clone(),
1487                    // Use the same session id resolved for the stage so
1488                    // agent_subscribe handlers keyed on it, and session
1489                    // storage lookups in the agent loop, stay consistent.
1490                    session_id: stage_session_id.clone(),
1491                    event_sink: None,
1492                    // Seed from the stage's explicit deliverables/ledger so the
1493                    // graph carries a task-wide plan through map branches and
1494                    // nested stages. Empty ledger means no gate.
1495                    task_ledger: node
1496                        .raw_model_policy
1497                        .as_ref()
1498                        .and_then(|v| v.as_dict())
1499                        .and_then(|d| d.get("task_ledger"))
1500                        .map(crate::llm::helpers::vm_value_to_json)
1501                        .and_then(|json| serde_json::from_value(json).ok())
1502                        .unwrap_or_default(),
1503                    post_turn_callback: node
1504                        .raw_model_policy
1505                        .as_ref()
1506                        .and_then(|v| v.as_dict())
1507                        .and_then(|d| d.get("post_turn_callback"))
1508                        .filter(|v| matches!(v, crate::value::VmValue::Closure(_)))
1509                        .cloned(),
1510                    // Inherit the workflow-level skill wiring installed
1511                    // by `workflow_execute`. Per-node `model_policy.skills`
1512                    // (optional) overrides, letting authors scope a skill
1513                    // set to one stage without affecting siblings. Empty
1514                    // thread-local = no skills configured (direct
1515                    // `execute_stage_node` callers outside a workflow).
1516                    skill_registry: resolve_stage_skill_registry(node),
1517                    skill_match: resolve_stage_skill_match(node),
1518                    working_files: Vec::new(),
1519                },
1520            )
1521            .await?
1522        } else {
1523            let result = vm_call_llm_full(&opts).await?;
1524            crate::llm::agent_loop_result_from_llm(&result, opts)
1525        }
1526    };
1527    if let Some(payload) = llm_result.as_object_mut() {
1528        payload.insert("prompt".to_string(), serde_json::json!(prompt));
1529        payload.insert(
1530            "system_prompt".to_string(),
1531            serde_json::json!(node.system.clone().unwrap_or_default()),
1532        );
1533        payload.insert(
1534            "rendered_context".to_string(),
1535            serde_json::json!(rendered_context),
1536        );
1537        if !verification_contracts.is_empty() {
1538            payload.insert(
1539                "verification_contracts".to_string(),
1540                serde_json::to_value(&verification_contracts).unwrap_or_default(),
1541            );
1542            payload.insert(
1543                "rendered_verification_context".to_string(),
1544                serde_json::json!(rendered_verification),
1545            );
1546        }
1547        payload.insert(
1548            "selected_artifact_ids".to_string(),
1549            serde_json::json!(selected
1550                .iter()
1551                .map(|artifact| artifact.id.clone())
1552                .collect::<Vec<_>>()),
1553        );
1554        payload.insert(
1555            "selected_artifact_titles".to_string(),
1556            serde_json::json!(selected
1557                .iter()
1558                .map(|artifact| artifact.title.clone())
1559                .collect::<Vec<_>>()),
1560        );
1561        match payload
1562            .entry("tools".to_string())
1563            .or_insert_with(|| serde_json::json!({}))
1564        {
1565            serde_json::Value::Object(tools) => {
1566                tools.insert("mode".to_string(), serde_json::json!(tool_format.clone()));
1567            }
1568            slot => {
1569                *slot = serde_json::json!({ "mode": tool_format.clone() });
1570            }
1571        }
1572    }
1573
1574    let visible_text = llm_result["text"].as_str().unwrap_or_default().to_string();
1575    // Non-LLM stages (verify command, condition, fork, join, ...) don't produce
1576    // a "transcript" field; fall back to the input so cross-stage conversation
1577    // state survives transitions.
1578    let result_transcript = llm_result
1579        .get("transcript")
1580        .cloned()
1581        .map(|value| crate::stdlib::json_to_vm_value(&value));
1582    let session_transcript = crate::agent_sessions::snapshot(&stage_session_id);
1583    let transcript = result_transcript
1584        .or(session_transcript)
1585        .and_then(|value| redact_transcript_visibility(&value, node.output_visibility.as_deref()));
1586    let output_kind = node
1587        .output_contract
1588        .output_kinds
1589        .first()
1590        .cloned()
1591        .unwrap_or_else(|| {
1592            if node.kind == "verify" {
1593                "verification_result".to_string()
1594            } else {
1595                "artifact".to_string()
1596            }
1597        });
1598    let mut metadata = BTreeMap::new();
1599    metadata.insert(
1600        "input_artifact_ids".to_string(),
1601        serde_json::json!(selected
1602            .iter()
1603            .map(|artifact| artifact.id.clone())
1604            .collect::<Vec<_>>()),
1605    );
1606    metadata.insert("node_kind".to_string(), serde_json::json!(node.kind));
1607    if !node.approval_policy.write_path_allowlist.is_empty() {
1608        metadata.insert(
1609            "changed_paths".to_string(),
1610            serde_json::json!(node.approval_policy.write_path_allowlist),
1611        );
1612    }
1613    let artifact = ArtifactRecord {
1614        type_name: "artifact".to_string(),
1615        id: new_id("artifact"),
1616        kind: output_kind,
1617        title: Some(format!("stage {node_id} output")),
1618        text: Some(visible_text),
1619        data: Some(llm_result.clone()),
1620        source: Some(node_id.to_string()),
1621        created_at: now_rfc3339(),
1622        freshness: Some("fresh".to_string()),
1623        priority: None,
1624        lineage: selected
1625            .iter()
1626            .map(|artifact| artifact.id.clone())
1627            .collect(),
1628        relevance: Some(1.0),
1629        estimated_tokens: None,
1630        stage: Some(node_id.to_string()),
1631        metadata,
1632    }
1633    .normalize();
1634
1635    Ok((llm_result, vec![artifact], transcript))
1636}
1637
1638pub fn next_nodes_for(
1639    graph: &WorkflowGraph,
1640    current: &str,
1641    branch: Option<&str>,
1642) -> Vec<WorkflowEdge> {
1643    let mut matching: Vec<WorkflowEdge> = graph
1644        .edges
1645        .iter()
1646        .filter(|edge| edge.from == current && edge.branch.as_deref() == branch)
1647        .cloned()
1648        .collect();
1649    if matching.is_empty() {
1650        matching = graph
1651            .edges
1652            .iter()
1653            .filter(|edge| edge.from == current && edge.branch.is_none())
1654            .cloned()
1655            .collect();
1656    }
1657    matching
1658}
1659
1660pub fn next_node_for(graph: &WorkflowGraph, current: &str, branch: &str) -> Option<String> {
1661    next_nodes_for(graph, current, Some(branch))
1662        .into_iter()
1663        .next()
1664        .map(|edge| edge.to)
1665}
1666
1667pub fn append_audit_entry(
1668    graph: &mut WorkflowGraph,
1669    op: &str,
1670    node_id: Option<String>,
1671    reason: Option<String>,
1672    metadata: BTreeMap<String, serde_json::Value>,
1673) {
1674    graph.audit_log.push(WorkflowAuditEntry {
1675        id: new_id("audit"),
1676        op: op.to_string(),
1677        node_id,
1678        timestamp: now_rfc3339(),
1679        reason,
1680        metadata,
1681    });
1682}