Skip to main content

harn_vm/orchestration/
workflow.rs

1//! Workflow graph types, normalization, validation, and execution.
2
3use crate::value::VmDictExt;
4use std::collections::{BTreeMap, BTreeSet};
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9    new_id, now_rfc3339, redact_transcript_visibility, ArtifactRecord, AutoCompactPolicy,
10    BranchSemantics, CapabilityPolicy, ContextPolicy, EscalationPolicy, JoinPolicy, MapPolicy,
11    ModelPolicy, ReducePolicy, RetryPolicy, StageContract,
12};
13use crate::llm::{extract_llm_options, vm_call_llm_full, vm_value_to_json};
14use crate::tool_surface::{tool_capability_policy_from_spec, tool_names_from_spec};
15use crate::value::{VmError, VmValue};
16
17pub const WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY: &str = "workflow_verification_contracts";
18pub const WORKFLOW_VERIFICATION_SCOPE_METADATA_KEY: &str = "workflow_verification_scope";
19
20#[derive(Clone, Debug, Default, Serialize, Deserialize)]
21#[serde(default)]
22pub struct WorkflowNode {
23    pub id: Option<String>,
24    pub kind: String,
25    pub mode: Option<String>,
26    pub prompt: Option<String>,
27    pub system: Option<String>,
28    pub task_label: Option<String>,
29    pub done_sentinel: Option<String>,
30    pub tools: serde_json::Value,
31    pub model_policy: ModelPolicy,
32    /// Per-stage auto-compaction settings for the agent loop's context
33    /// window. Lifecycle operations (reset, fork, trim, compact) are NOT
34    /// expressible here — call the `agent_session_*` builtins before the
35    /// stage or in a prior stage.
36    pub auto_compact: AutoCompactPolicy,
37    /// Output visibility filter applied to the transcript after the
38    /// stage's agent loop exits. `"public"` / `"public_only"` drops
39    /// `tool_result` messages and non-public events. `None` or any
40    /// unknown string is a no-op.
41    #[serde(default)]
42    pub output_visibility: Option<String>,
43    pub context_policy: ContextPolicy,
44    pub retry_policy: RetryPolicy,
45    pub capability_policy: CapabilityPolicy,
46    pub approval_policy: super::ToolApprovalPolicy,
47    pub input_contract: StageContract,
48    pub output_contract: StageContract,
49    pub branch_semantics: BranchSemantics,
50    pub map_policy: MapPolicy,
51    pub join_policy: JoinPolicy,
52    pub reduce_policy: ReducePolicy,
53    pub escalation_policy: EscalationPolicy,
54    pub verify: Option<serde_json::Value>,
55    /// When true, the stage's agent loop gates the done sentinel on the most
56    /// recent `run()` tool call exiting cleanly (`exit_code == 0`). Use for
57    /// persistent execute stages that fold verification into the loop via a
58    /// shell-exec tool the model invokes explicitly.
59    #[serde(default)]
60    pub exit_when_verified: bool,
61    pub metadata: BTreeMap<String, serde_json::Value>,
62    #[serde(skip)]
63    pub raw_tools: Option<VmValue>,
64    /// Raw auto_compact VmValue dict — preserved for extracting closure
65    /// fields (compress_callback, mask_callback, custom_compactor) that
66    /// can't go through serde.
67    #[serde(skip)]
68    pub raw_auto_compact: Option<VmValue>,
69    /// Raw model_policy VmValue dict — preserved for extracting closure
70    /// fields (post_turn_callback) that can't go through serde.
71    #[serde(skip)]
72    pub raw_model_policy: Option<VmValue>,
73    /// Raw context_assembler VmValue dict — when set, the stage's
74    /// artifact context is packed through `assemble_context` before
75    /// rendering the system prompt. Closure fields (`ranker_callback`)
76    /// are preserved here because they can't round-trip through serde.
77    #[serde(skip)]
78    pub raw_context_assembler: Option<VmValue>,
79}
80
81impl PartialEq for WorkflowNode {
82    fn eq(&self, other: &Self) -> bool {
83        serde_json::to_value(self).ok() == serde_json::to_value(other).ok()
84    }
85}
86
87#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
88#[serde(default)]
89pub struct VerificationRequirement {
90    pub kind: String,
91    pub value: String,
92    pub note: Option<String>,
93}
94
95#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
96#[serde(default)]
97pub struct VerificationContract {
98    pub source_node: Option<String>,
99    pub summary: Option<String>,
100    pub command: Option<String>,
101    pub expect_status: Option<i64>,
102    pub assert_text: Option<String>,
103    pub expect_text: Option<String>,
104    pub required_identifiers: Vec<String>,
105    pub required_paths: Vec<String>,
106    pub required_text: Vec<String>,
107    pub notes: Vec<String>,
108    pub checks: Vec<VerificationRequirement>,
109}
110
111impl VerificationContract {
112    fn is_empty(&self) -> bool {
113        self.summary.is_none()
114            && self.command.is_none()
115            && self.expect_status.is_none()
116            && self.assert_text.is_none()
117            && self.expect_text.is_none()
118            && self.required_identifiers.is_empty()
119            && self.required_paths.is_empty()
120            && self.required_text.is_empty()
121            && self.notes.is_empty()
122            && self.checks.is_empty()
123    }
124}
125
126fn push_unique_string(values: &mut Vec<String>, value: &str) {
127    let trimmed = value.trim();
128    if trimmed.is_empty() {
129        return;
130    }
131    if !values.iter().any(|existing| existing == trimmed) {
132        values.push(trimmed.to_string());
133    }
134}
135
136fn push_unique_requirement(
137    values: &mut Vec<VerificationRequirement>,
138    kind: &str,
139    value: &str,
140    note: Option<&str>,
141) {
142    let trimmed_kind = kind.trim();
143    let trimmed_value = value.trim();
144    let trimmed_note = note
145        .map(str::trim)
146        .filter(|candidate| !candidate.is_empty())
147        .map(|candidate| candidate.to_string());
148    if trimmed_kind.is_empty() || trimmed_value.is_empty() {
149        return;
150    }
151    let candidate = VerificationRequirement {
152        kind: trimmed_kind.to_string(),
153        value: trimmed_value.to_string(),
154        note: trimmed_note,
155    };
156    if !values.iter().any(|existing| existing == &candidate) {
157        values.push(candidate);
158    }
159}
160
161fn json_string_list(value: Option<&serde_json::Value>) -> Vec<String> {
162    match value {
163        Some(serde_json::Value::String(text)) => {
164            let mut values = Vec::new();
165            push_unique_string(&mut values, text);
166            values
167        }
168        Some(serde_json::Value::Array(items)) => {
169            let mut values = Vec::new();
170            for item in items {
171                if let Some(text) = item.as_str() {
172                    push_unique_string(&mut values, text);
173                }
174            }
175            values
176        }
177        _ => Vec::new(),
178    }
179}
180
181fn merge_verification_requirement_list(
182    target: &mut Vec<VerificationRequirement>,
183    value: Option<&serde_json::Value>,
184) {
185    let Some(items) = value.and_then(|raw| raw.as_array()) else {
186        return;
187    };
188    for item in items {
189        let Some(object) = item.as_object() else {
190            continue;
191        };
192        let kind = object
193            .get("kind")
194            .and_then(|value| value.as_str())
195            .unwrap_or_default();
196        let value = object
197            .get("value")
198            .and_then(|value| value.as_str())
199            .unwrap_or_default();
200        let note = object
201            .get("note")
202            .or_else(|| object.get("description"))
203            .or_else(|| object.get("reason"))
204            .and_then(|value| value.as_str());
205        push_unique_requirement(target, kind, value, note);
206    }
207}
208
209fn merge_verification_contract_fields(
210    target: &mut VerificationContract,
211    object: &serde_json::Map<String, serde_json::Value>,
212) {
213    if target.summary.is_none() {
214        target.summary = object
215            .get("summary")
216            .and_then(|value| value.as_str())
217            .map(str::trim)
218            .filter(|value| !value.is_empty())
219            .map(|value| value.to_string());
220    }
221    if target.command.is_none() {
222        target.command = object
223            .get("command")
224            .and_then(|value| value.as_str())
225            .map(str::trim)
226            .filter(|value| !value.is_empty())
227            .map(|value| value.to_string());
228    }
229    if target.expect_status.is_none() {
230        target.expect_status = object.get("expect_status").and_then(|value| value.as_i64());
231    }
232    if target.assert_text.is_none() {
233        target.assert_text = object
234            .get("assert_text")
235            .and_then(|value| value.as_str())
236            .map(str::trim)
237            .filter(|value| !value.is_empty())
238            .map(|value| value.to_string());
239    }
240    if target.expect_text.is_none() {
241        target.expect_text = object
242            .get("expect_text")
243            .and_then(|value| value.as_str())
244            .map(str::trim)
245            .filter(|value| !value.is_empty())
246            .map(|value| value.to_string());
247    }
248
249    for value in json_string_list(
250        object
251            .get("required_identifiers")
252            .or_else(|| object.get("identifiers")),
253    ) {
254        push_unique_string(&mut target.required_identifiers, &value);
255    }
256    for value in json_string_list(object.get("required_paths").or_else(|| object.get("paths"))) {
257        push_unique_string(&mut target.required_paths, &value);
258    }
259    for value in json_string_list(
260        object
261            .get("required_text")
262            .or_else(|| object.get("exact_text"))
263            .or_else(|| object.get("required_strings")),
264    ) {
265        push_unique_string(&mut target.required_text, &value);
266    }
267    for value in json_string_list(object.get("notes")) {
268        push_unique_string(&mut target.notes, &value);
269    }
270    merge_verification_requirement_list(&mut target.checks, object.get("checks"));
271}
272
273fn load_verification_contract_file(path: &str) -> Result<serde_json::Value, VmError> {
274    let resolved = crate::stdlib::process::resolve_source_asset_path(path);
275    let contents = std::fs::read_to_string(&resolved).map_err(|error| {
276        VmError::Runtime(format!(
277            "workflow verification contract read failed for {}: {error}",
278            resolved.display()
279        ))
280    })?;
281    serde_json::from_str(&contents).map_err(|error| {
282        VmError::Runtime(format!(
283            "workflow verification contract parse failed for {}: {error}",
284            resolved.display()
285        ))
286    })
287}
288
289fn resolve_verification_contract_path(
290    verify: &serde_json::Map<String, serde_json::Value>,
291) -> Result<Option<serde_json::Value>, VmError> {
292    let Some(path) = verify
293        .get("contract_path")
294        .or_else(|| verify.get("verification_contract_path"))
295        .and_then(|value| value.as_str())
296        .map(str::trim)
297        .filter(|value| !value.is_empty())
298    else {
299        return Ok(None);
300    };
301    Ok(Some(load_verification_contract_file(path)?))
302}
303
304pub fn verification_contract_from_verify(
305    node_id: &str,
306    verify: Option<&serde_json::Value>,
307) -> Result<Option<VerificationContract>, VmError> {
308    let Some(verify_object) = verify.and_then(|value| value.as_object()) else {
309        return Ok(None);
310    };
311
312    let mut contract = VerificationContract {
313        source_node: Some(node_id.to_string()),
314        ..Default::default()
315    };
316
317    if let Some(file_contract) = resolve_verification_contract_path(verify_object)? {
318        let Some(object) = file_contract.as_object() else {
319            return Err(VmError::Runtime(
320                "workflow verification contract file must parse to a JSON object".to_string(),
321            ));
322        };
323        merge_verification_contract_fields(&mut contract, object);
324    }
325
326    if let Some(inline_contract) = verify_object.get("contract") {
327        let Some(object) = inline_contract.as_object() else {
328            return Err(VmError::Runtime(
329                "workflow verify.contract must be an object".to_string(),
330            ));
331        };
332        merge_verification_contract_fields(&mut contract, object);
333    }
334
335    merge_verification_contract_fields(&mut contract, verify_object);
336
337    if let Some(assert_text) = contract.assert_text.clone() {
338        push_unique_requirement(
339            &mut contract.checks,
340            "visible_text_contains",
341            &assert_text,
342            Some("verify stage requires visible output to contain this text"),
343        );
344    }
345    if let Some(expect_text) = contract.expect_text.clone() {
346        push_unique_requirement(
347            &mut contract.checks,
348            "combined_output_contains",
349            &expect_text,
350            Some("verify command requires combined stdout/stderr to contain this text"),
351        );
352    }
353    if let Some(expect_status) = contract.expect_status {
354        push_unique_requirement(
355            &mut contract.checks,
356            "expect_status",
357            &expect_status.to_string(),
358            Some("verify command exit status must match exactly"),
359        );
360    }
361    for identifier in contract.required_identifiers.clone() {
362        push_unique_requirement(
363            &mut contract.checks,
364            "identifier",
365            &identifier,
366            Some("use this exact identifier spelling"),
367        );
368    }
369    for path in contract.required_paths.clone() {
370        push_unique_requirement(
371            &mut contract.checks,
372            "path",
373            &path,
374            Some("preserve this exact path"),
375        );
376    }
377    for text in contract.required_text.clone() {
378        push_unique_requirement(
379            &mut contract.checks,
380            "text",
381            &text,
382            Some("required exact text or wiring snippet"),
383        );
384    }
385
386    if contract.is_empty() {
387        return Ok(None);
388    }
389    Ok(Some(contract))
390}
391
392fn push_unique_contract(values: &mut Vec<VerificationContract>, candidate: VerificationContract) {
393    if !values.iter().any(|existing| existing == &candidate) {
394        values.push(candidate);
395    }
396}
397
398pub fn workflow_verification_contracts(
399    graph: &WorkflowGraph,
400) -> Result<Vec<VerificationContract>, VmError> {
401    let mut contracts = Vec::new();
402    for (node_id, node) in &graph.nodes {
403        if let Some(contract) = verification_contract_from_verify(node_id, node.verify.as_ref())? {
404            push_unique_contract(&mut contracts, contract);
405        }
406    }
407    Ok(contracts)
408}
409
410pub fn inject_workflow_verification_contracts(
411    node: &mut WorkflowNode,
412    contracts: &[VerificationContract],
413) {
414    if contracts.is_empty() {
415        return;
416    }
417    node.metadata.insert(
418        WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY.to_string(),
419        serde_json::to_value(contracts).unwrap_or_default(),
420    );
421}
422
423pub fn stage_verification_contracts(
424    node_id: &str,
425    node: &WorkflowNode,
426) -> Result<Vec<VerificationContract>, VmError> {
427    let local_contract = verification_contract_from_verify(node_id, node.verify.as_ref())?;
428    let local_only = matches!(
429        node.metadata
430            .get(WORKFLOW_VERIFICATION_SCOPE_METADATA_KEY)
431            .and_then(|value| value.as_str()),
432        Some("local_only")
433    );
434    if local_only {
435        return Ok(local_contract.into_iter().collect());
436    }
437
438    let mut contracts = node
439        .metadata
440        .get(WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY)
441        .cloned()
442        .map(|value| {
443            serde_json::from_value::<Vec<VerificationContract>>(value).map_err(|error| {
444                VmError::Runtime(format!(
445                    "workflow stage {node_id} verification contract metadata parse failed: {error}"
446                ))
447            })
448        })
449        .transpose()?
450        .unwrap_or_default();
451
452    if let Some(local_contract) = local_contract {
453        push_unique_contract(&mut contracts, local_contract);
454    }
455    Ok(contracts)
456}
457
458#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
459#[serde(default)]
460pub struct WorkflowEdge {
461    pub from: String,
462    pub to: String,
463    pub branch: Option<String>,
464    pub label: Option<String>,
465}
466
467#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
468#[serde(default)]
469pub struct WorkflowGraph {
470    #[serde(rename = "_type")]
471    pub type_name: String,
472    pub id: String,
473    pub name: Option<String>,
474    pub version: usize,
475    pub entry: String,
476    pub nodes: BTreeMap<String, WorkflowNode>,
477    pub edges: Vec<WorkflowEdge>,
478    pub capability_policy: CapabilityPolicy,
479    pub approval_policy: super::ToolApprovalPolicy,
480    pub metadata: BTreeMap<String, serde_json::Value>,
481    pub audit_log: Vec<WorkflowAuditEntry>,
482}
483
484#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
485#[serde(default)]
486pub struct WorkflowAuditEntry {
487    pub id: String,
488    pub op: String,
489    pub node_id: Option<String>,
490    pub timestamp: String,
491    pub reason: Option<String>,
492    pub metadata: BTreeMap<String, serde_json::Value>,
493}
494
495#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
496#[serde(default)]
497pub struct WorkflowValidationReport {
498    pub valid: bool,
499    pub errors: Vec<String>,
500    pub warnings: Vec<String>,
501    pub reachable_nodes: Vec<String>,
502}
503
504pub fn parse_workflow_node_value(value: &VmValue, label: &str) -> Result<WorkflowNode, VmError> {
505    let mut node: WorkflowNode = super::parse_json_payload(vm_value_to_json(value), label)?;
506    let dict = value.as_dict();
507    node.raw_tools = dict.and_then(|d| d.get("tools")).cloned();
508    node.raw_auto_compact = dict.and_then(|d| d.get("auto_compact")).cloned();
509    node.raw_model_policy = dict.and_then(|d| d.get("model_policy")).cloned();
510    node.raw_context_assembler = dict.and_then(|d| d.get("context_assembler")).cloned();
511    Ok(node)
512}
513
514pub fn parse_workflow_node_json(
515    json: serde_json::Value,
516    label: &str,
517) -> Result<WorkflowNode, VmError> {
518    super::parse_json_payload(json, label)
519}
520
521pub fn parse_workflow_edge_json(
522    json: serde_json::Value,
523    label: &str,
524) -> Result<WorkflowEdge, VmError> {
525    super::parse_json_payload(json, label)
526}
527
528pub fn normalize_workflow_value(value: &VmValue) -> Result<WorkflowGraph, VmError> {
529    let mut graph: WorkflowGraph = super::parse_json_value(value)?;
530    let as_dict = value.as_dict().cloned().unwrap_or_default();
531
532    if graph.nodes.is_empty() {
533        for key in ["act", "verify", "repair"] {
534            if let Some(node_value) = as_dict.get(key) {
535                let mut node = parse_workflow_node_value(node_value, "orchestration")?;
536                let raw_node = node_value.as_dict().cloned().unwrap_or_default();
537                node.id = Some(key.to_string());
538                if node.kind.is_empty() {
539                    node.kind = if key == "verify" {
540                        "verify".to_string()
541                    } else {
542                        "stage".to_string()
543                    };
544                }
545                if node.model_policy.provider.is_none() {
546                    node.model_policy.provider = as_dict
547                        .get("provider")
548                        .map(|value| value.display())
549                        .filter(|value| !value.is_empty());
550                }
551                if node.model_policy.model.is_none() {
552                    node.model_policy.model = as_dict
553                        .get("model")
554                        .map(|value| value.display())
555                        .filter(|value| !value.is_empty());
556                }
557                if node.model_policy.model_tier.is_none() {
558                    node.model_policy.model_tier = as_dict
559                        .get("model_tier")
560                        .or_else(|| as_dict.get("tier"))
561                        .map(|value| value.display())
562                        .filter(|value| !value.is_empty());
563                }
564                if node.model_policy.temperature.is_none() {
565                    node.model_policy.temperature = as_dict.get("temperature").and_then(|value| {
566                        if let VmValue::Float(number) = value {
567                            Some(*number)
568                        } else {
569                            value.as_int().map(|number| number as f64)
570                        }
571                    });
572                }
573                if node.model_policy.max_tokens.is_none() {
574                    node.model_policy.max_tokens =
575                        as_dict.get("max_tokens").and_then(|value| value.as_int());
576                }
577                if node.mode.is_none() {
578                    node.mode = as_dict
579                        .get("mode")
580                        .map(|value| value.display())
581                        .filter(|value| !value.is_empty());
582                }
583                if node.done_sentinel.is_none() {
584                    node.done_sentinel = as_dict
585                        .get("done_sentinel")
586                        .map(|value| value.display())
587                        .filter(|value| !value.is_empty());
588                }
589                if key == "verify"
590                    && node.verify.is_none()
591                    && (raw_node.contains_key("assert_text")
592                        || raw_node.contains_key("command")
593                        || raw_node.contains_key("expect_status")
594                        || raw_node.contains_key("expect_text"))
595                {
596                    node.verify = Some(serde_json::json!({
597                        "assert_text": raw_node.get("assert_text").map(vm_value_to_json),
598                        "command": raw_node.get("command").map(vm_value_to_json),
599                        "expect_status": raw_node.get("expect_status").map(vm_value_to_json),
600                        "expect_text": raw_node.get("expect_text").map(vm_value_to_json),
601                    }));
602                }
603                graph.nodes.insert(key.to_string(), node);
604            }
605        }
606        if graph.entry.is_empty() && graph.nodes.contains_key("act") {
607            graph.entry = "act".to_string();
608        }
609        if graph.edges.is_empty() && graph.nodes.contains_key("act") {
610            if graph.nodes.contains_key("verify") {
611                graph.edges.push(WorkflowEdge {
612                    from: "act".to_string(),
613                    to: "verify".to_string(),
614                    branch: None,
615                    label: None,
616                });
617            }
618            if graph.nodes.contains_key("repair") {
619                graph.edges.push(WorkflowEdge {
620                    from: "verify".to_string(),
621                    to: "repair".to_string(),
622                    branch: Some("failed".to_string()),
623                    label: None,
624                });
625                graph.edges.push(WorkflowEdge {
626                    from: "repair".to_string(),
627                    to: "verify".to_string(),
628                    branch: Some("retry".to_string()),
629                    label: None,
630                });
631            }
632        }
633    }
634
635    if graph.type_name.is_empty() {
636        graph.type_name = "workflow_graph".to_string();
637    }
638    if graph.id.is_empty() {
639        graph.id = new_id("workflow");
640    }
641    if graph.version == 0 {
642        graph.version = 1;
643    }
644    if graph.entry.is_empty() {
645        graph.entry = graph
646            .nodes
647            .keys()
648            .next()
649            .cloned()
650            .unwrap_or_else(|| "act".to_string());
651    }
652    for (node_id, node) in &mut graph.nodes {
653        if node.raw_tools.is_none() {
654            node.raw_tools = as_dict
655                .get("nodes")
656                .and_then(|nodes| nodes.as_dict())
657                .and_then(|nodes| nodes.get(node_id.as_str()))
658                .and_then(|node_value| node_value.as_dict())
659                .and_then(|raw_node| raw_node.get("tools"))
660                .cloned();
661        }
662        if node.id.is_none() {
663            node.id = Some(node_id.clone());
664        }
665        if node.kind.is_empty() {
666            node.kind = "stage".to_string();
667        }
668        if node.join_policy.strategy.is_empty() {
669            node.join_policy.strategy = "all".to_string();
670        }
671        if node.reduce_policy.strategy.is_empty() {
672            node.reduce_policy.strategy = "concat".to_string();
673        }
674        if node.output_contract.output_kinds.is_empty() {
675            node.output_contract.output_kinds = vec![match node.kind.as_str() {
676                "verify" => "verification_result".to_string(),
677                "reduce" => node
678                    .reduce_policy
679                    .output_kind
680                    .clone()
681                    .unwrap_or_else(|| "summary".to_string()),
682                "map" => node
683                    .map_policy
684                    .output_kind
685                    .clone()
686                    .unwrap_or_else(|| "artifact".to_string()),
687                "escalation" => "plan".to_string(),
688                _ => "artifact".to_string(),
689            }];
690        }
691        if node.retry_policy.max_attempts == 0 {
692            node.retry_policy.max_attempts = 1;
693        }
694    }
695    Ok(graph)
696}
697
698pub fn validate_workflow(
699    graph: &WorkflowGraph,
700    ceiling: Option<&CapabilityPolicy>,
701) -> WorkflowValidationReport {
702    let mut errors = Vec::new();
703    let mut warnings = Vec::new();
704
705    if !graph.nodes.contains_key(&graph.entry) {
706        errors.push(format!("entry node does not exist: {}", graph.entry));
707    }
708
709    let node_ids: BTreeSet<String> = graph.nodes.keys().cloned().collect();
710    for edge in &graph.edges {
711        if !node_ids.contains(&edge.from) {
712            errors.push(format!("edge.from references unknown node: {}", edge.from));
713        }
714        if !node_ids.contains(&edge.to) {
715            errors.push(format!("edge.to references unknown node: {}", edge.to));
716        }
717    }
718
719    let reachable_nodes = reachable_nodes(graph);
720    for node_id in &node_ids {
721        if !reachable_nodes.contains(node_id) {
722            warnings.push(format!("node is unreachable: {node_id}"));
723        }
724    }
725
726    for (node_id, node) in &graph.nodes {
727        let incoming = graph
728            .edges
729            .iter()
730            .filter(|edge| edge.to == *node_id)
731            .count();
732        let outgoing: Vec<&WorkflowEdge> = graph
733            .edges
734            .iter()
735            .filter(|edge| edge.from == *node_id)
736            .collect();
737        if let Some(min_inputs) = node.input_contract.min_inputs {
738            if let Some(max_inputs) = node.input_contract.max_inputs {
739                if min_inputs > max_inputs {
740                    errors.push(format!(
741                        "node {node_id}: input contract min_inputs exceeds max_inputs"
742                    ));
743                }
744            }
745        }
746        match node.kind.as_str() {
747            "condition" => {
748                let has_true = outgoing
749                    .iter()
750                    .any(|edge| edge.branch.as_deref() == Some("true"));
751                let has_false = outgoing
752                    .iter()
753                    .any(|edge| edge.branch.as_deref() == Some("false"));
754                if !has_true || !has_false {
755                    errors.push(format!(
756                        "node {node_id}: condition nodes require both 'true' and 'false' branch edges"
757                    ));
758                }
759            }
760            "fork" if outgoing.len() < 2 => {
761                errors.push(format!(
762                    "node {node_id}: fork nodes require at least two outgoing edges"
763                ));
764            }
765            "join" if incoming < 2 => {
766                warnings.push(format!(
767                    "node {node_id}: join node has fewer than two incoming edges"
768                ));
769            }
770            "map"
771                if node.map_policy.items.is_empty()
772                    && node.map_policy.item_artifact_kind.is_none()
773                    && node.input_contract.input_kinds.is_empty() =>
774            {
775                errors.push(format!(
776                    "node {node_id}: map nodes require items, item_artifact_kind, or input_contract.input_kinds"
777                ));
778            }
779            "reduce" if node.input_contract.input_kinds.is_empty() => {
780                warnings.push(format!(
781                    "node {node_id}: reduce node has no input_contract.input_kinds; it will consume all available artifacts"
782                ));
783            }
784            _ => {}
785        }
786    }
787
788    if let Some(ceiling) = ceiling {
789        if let Err(error) = ceiling.intersect(&graph.capability_policy) {
790            errors.push(error);
791        }
792        for (node_id, node) in &graph.nodes {
793            if let Err(error) = ceiling.intersect(&node.capability_policy) {
794                errors.push(format!("node {node_id}: {error}"));
795            }
796        }
797    }
798
799    for diagnostic in crate::tool_surface::validate_workflow_graph(graph) {
800        let message = format!("{}: {}", diagnostic.code, diagnostic.message);
801        match diagnostic.severity {
802            crate::tool_surface::ToolSurfaceSeverity::Error => errors.push(message),
803            crate::tool_surface::ToolSurfaceSeverity::Warning => warnings.push(message),
804        }
805    }
806
807    WorkflowValidationReport {
808        valid: errors.is_empty(),
809        errors,
810        warnings,
811        reachable_nodes: reachable_nodes.into_iter().collect(),
812    }
813}
814
815fn reachable_nodes(graph: &WorkflowGraph) -> BTreeSet<String> {
816    let mut seen = BTreeSet::new();
817    let mut stack = vec![graph.entry.clone()];
818    while let Some(node_id) = stack.pop() {
819        if !seen.insert(node_id.clone()) {
820            continue;
821        }
822        for edge in graph.edges.iter().filter(|edge| edge.from == node_id) {
823            stack.push(edge.to.clone());
824        }
825    }
826    seen
827}
828
829/// Pick the session id a stage should run under. Prefers an explicit
830/// `session_id` on the node's `model_policy` dict (so pipelines with
831/// `agent_session_open` / `agent_session_fork` flowing through a graph
832/// line up); falls back to a stable, node-derived id so multi-stage
833/// graphs with no explicit session share a conversation across stages.
834fn resolve_node_session_id(node: &WorkflowNode) -> String {
835    if let Some(explicit) = node
836        .raw_model_policy
837        .as_ref()
838        .and_then(|v| v.as_dict())
839        .and_then(|d| d.get("session_id"))
840        .and_then(|v| match v {
841            VmValue::String(s) if !s.trim().is_empty() => Some(s.to_string()),
842            _ => None,
843        })
844    {
845        return explicit;
846    }
847    if let Some(persisted) = node
848        .metadata
849        .get("worker_session_id")
850        .and_then(|value| value.as_str())
851        .filter(|value| !value.trim().is_empty())
852    {
853        return persisted.to_string();
854    }
855    format!("workflow_stage_{}", uuid::Uuid::now_v7())
856}
857
858fn raw_auto_compact_dict(node: &WorkflowNode) -> Option<&crate::value::DictMap> {
859    node.raw_auto_compact
860        .as_ref()
861        .and_then(|value| value.as_dict())
862}
863
864fn raw_auto_compact_int(node: &WorkflowNode, key: &str) -> Option<usize> {
865    raw_auto_compact_dict(node)
866        .and_then(|dict| dict.get(key))
867        .and_then(|value| value.as_int())
868        .filter(|value| *value >= 0)
869        .map(|value| value as usize)
870}
871
872fn raw_auto_compact_string(node: &WorkflowNode, key: &str) -> Option<String> {
873    raw_auto_compact_dict(node)
874        .and_then(|dict| dict.get(key))
875        .and_then(|value| match value {
876            VmValue::String(text) if !text.trim().is_empty() => Some(text.to_string()),
877            _ => None,
878        })
879}
880
881fn raw_model_policy_dict(node: &WorkflowNode) -> Option<&crate::value::DictMap> {
882    node.raw_model_policy
883        .as_ref()
884        .and_then(|value| value.as_dict())
885}
886
887fn insert_json_vm_option<T: Serialize>(
888    options: &mut crate::value::DictMap,
889    key: &str,
890    value: &T,
891) -> Result<(), VmError> {
892    let json = serde_json::to_value(value).map_err(|error| {
893        VmError::Runtime(format!("workflow stage option encode error: {error}"))
894    })?;
895    options.insert(
896        crate::value::intern_key(key),
897        crate::stdlib::json_to_vm_value(&json),
898    );
899    Ok(())
900}
901
902fn merge_raw_model_policy_options(options: &mut crate::value::DictMap, node: &WorkflowNode) {
903    if let Some(raw) = raw_model_policy_dict(node) {
904        for (key, value) in raw {
905            if !matches!(value, VmValue::Nil) {
906                options.insert(key.clone(), value.clone());
907            }
908        }
909    }
910}
911
912fn preserve_nested_command_policy(options: &mut crate::value::DictMap, node: &WorkflowNode) {
913    if options.contains_key("command_policy") {
914        return;
915    }
916    let Some(command_policy) = raw_model_policy_dict(node)
917        .and_then(|dict| dict.get("policy"))
918        .and_then(|value| value.as_dict())
919        .and_then(|policy| policy.get("command_policy"))
920    else {
921        return;
922    };
923    options.insert(
924        crate::value::intern_key("command_policy"),
925        command_policy.clone(),
926    );
927}
928
929fn stage_tools_value(node: &WorkflowNode) -> Option<VmValue> {
930    node.raw_tools.clone().or_else(|| {
931        if matches!(node.tools, serde_json::Value::Null) {
932            None
933        } else {
934            Some(crate::stdlib::json_to_vm_value(&node.tools))
935        }
936    })
937}
938
939fn add_stage_tools_option(
940    options: &mut crate::value::DictMap,
941    tools_value: &Option<VmValue>,
942    tool_names: &[String],
943) {
944    if !tool_names.is_empty() {
945        if let Some(value) = tools_value.clone() {
946            options.insert(crate::value::intern_key("tools"), value);
947        }
948    }
949}
950
951fn workflow_stage_llm_options(
952    node: &WorkflowNode,
953    stage_session_id: &str,
954    tools_value: &Option<VmValue>,
955    tool_names: &[String],
956    stage_agent_options: &super::WorkflowStageAgentOptions,
957) -> crate::value::DictMap {
958    let mut options = stage_agent_options.llm_options_vm_dict();
959    merge_raw_model_policy_options(&mut options, node);
960    options.put_str("session_id", stage_session_id);
961    options.put_str("tool_format", stage_agent_options.tool_format.clone());
962    add_stage_tools_option(&mut options, tools_value, tool_names);
963    options
964}
965
966fn add_workflow_agent_compaction_options(options: &mut crate::value::DictMap, node: &WorkflowNode) {
967    if !node.auto_compact.enabled {
968        options.insert(
969            crate::value::intern_key("auto_compact"),
970            VmValue::Bool(false),
971        );
972        return;
973    }
974    options.insert(
975        crate::value::intern_key("auto_compact"),
976        VmValue::Bool(true),
977    );
978    if let Some(value) = node.auto_compact.token_threshold {
979        options.insert(
980            crate::value::intern_key("compact_threshold"),
981            VmValue::Int(value as i64),
982        );
983    }
984    if let Some(value) = node.auto_compact.tool_output_max_chars {
985        options.insert(
986            crate::value::intern_key("tool_output_max_chars"),
987            VmValue::Int(value as i64),
988        );
989    }
990    if let Some(value) = node.auto_compact.hard_limit_tokens {
991        options.insert(
992            crate::value::intern_key("hard_limit_tokens"),
993            VmValue::Int(value as i64),
994        );
995    }
996    if let Some(strategy) = node.auto_compact.compact_strategy.as_ref() {
997        options.put_str("compact_strategy", strategy.clone());
998    }
999    if let Some(strategy) = node.auto_compact.hard_limit_strategy.as_ref() {
1000        options.put_str("hard_limit_strategy", strategy.clone());
1001    }
1002    if let Some(value) = raw_auto_compact_int(node, "compact_keep_last")
1003        .or_else(|| raw_auto_compact_int(node, "keep_last"))
1004    {
1005        options.insert(
1006            crate::value::intern_key("compact_keep_last"),
1007            VmValue::Int(value as i64),
1008        );
1009    }
1010    if let Some(prompt) = raw_auto_compact_string(node, "summarize_prompt") {
1011        options.put_str("summarize_prompt", prompt);
1012    }
1013    if let Some(dict) = raw_auto_compact_dict(node) {
1014        for key in ["compress_callback", "mask_callback"] {
1015            if let Some(callback) = dict.get(key) {
1016                options.insert(crate::value::intern_key(key), callback.clone());
1017            }
1018        }
1019        if let Some(callback) = dict.get("custom_compactor") {
1020            options.insert(
1021                crate::value::intern_key("compact_callback"),
1022                callback.clone(),
1023            );
1024        }
1025    }
1026}
1027
1028fn workflow_stage_agent_loop_options(
1029    node: &WorkflowNode,
1030    stage_session_id: &str,
1031    tools_value: &Option<VmValue>,
1032    tool_names: &[String],
1033    stage_agent_options: &super::WorkflowStageAgentOptions,
1034) -> Result<crate::value::DictMap, VmError> {
1035    let mut options = stage_agent_options.agent_loop_options_vm_dict();
1036    merge_raw_model_policy_options(&mut options, node);
1037    if let Some(context) = crate::orchestration::current_workflow_skill_context() {
1038        if !options.contains_key("skills") {
1039            if let Some(registry) = context.registry {
1040                options.insert(crate::value::intern_key("skills"), registry);
1041            }
1042        }
1043        if !options.contains_key("skill_match") {
1044            if let Some(match_config) = context.match_config {
1045                options.insert(crate::value::intern_key("skill_match"), match_config);
1046            }
1047        }
1048    }
1049    preserve_nested_command_policy(&mut options, node);
1050    add_workflow_agent_compaction_options(&mut options, node);
1051    add_stage_tools_option(&mut options, tools_value, tool_names);
1052    let tool_policy = tool_capability_policy_from_spec(&node.tools);
1053    let effective_policy = tool_policy
1054        .intersect(&node.capability_policy)
1055        .map_err(VmError::Runtime)?;
1056    insert_json_vm_option(&mut options, "policy", &effective_policy)?;
1057    insert_json_vm_option(&mut options, "approval_policy", &node.approval_policy)?;
1058    options.put_str("session_id", stage_session_id);
1059    options.put_str("tool_format", stage_agent_options.tool_format.clone());
1060    let stage_label = node
1061        .id
1062        .clone()
1063        .unwrap_or_else(|| stage_session_id.to_string());
1064    crate::orchestration::annotate_nested_execution_options(
1065        &mut options,
1066        crate::orchestration::NestedExecutionKind::WorkflowStage,
1067        &stage_label,
1068    );
1069    Ok(options)
1070}
1071
1072#[derive(Clone, Debug)]
1073pub struct PreparedWorkflowStageNode {
1074    pub prompt: String,
1075    pub system: Option<String>,
1076    pub run_agent_loop: bool,
1077    pub llm_options: crate::value::DictMap,
1078    pub agent_loop_options: crate::value::DictMap,
1079    pub result: Option<serde_json::Value>,
1080    pub selected: Vec<ArtifactRecord>,
1081    pub rendered_context: String,
1082    pub rendered_verification: String,
1083    pub verification_contracts: Vec<VerificationContract>,
1084    pub tool_format: String,
1085    pub stage_session_id: String,
1086}
1087
1088pub async fn prepare_stage_node(
1089    ctx: &crate::vm::AsyncBuiltinCtx,
1090    node_id: &str,
1091    node: &WorkflowNode,
1092    task: &str,
1093    artifacts: &[ArtifactRecord],
1094) -> Result<PreparedWorkflowStageNode, VmError> {
1095    let selected_stage = super::select_workflow_stage_artifacts(
1096        ctx,
1097        artifacts,
1098        &node.context_policy,
1099        &node.input_contract,
1100    )
1101    .await?;
1102    let selected = selected_stage.artifacts;
1103    let context_policy = selected_stage.context_policy;
1104    let rendered_context_override = if let Some(assembler) = node.raw_context_assembler.as_ref() {
1105        let assembled =
1106            crate::stdlib::assemble::assemble_from_options(ctx, &selected, assembler).await?;
1107        Some(super::render_assembled_chunks(&assembled))
1108    } else {
1109        None
1110    };
1111    let verification_contracts = super::stage_verification_contracts(node_id, node)?;
1112    let stage_session_id = resolve_node_session_id(node);
1113    if node.input_contract.require_transcript && !crate::agent_sessions::exists(&stage_session_id) {
1114        return Err(VmError::Runtime(format!(
1115            "workflow stage {node_id} requires an existing session \
1116             (call agent_session_open and feed session_id through model_policy \
1117             before entering this stage)"
1118        )));
1119    }
1120    if let Some(min_inputs) = node.input_contract.min_inputs {
1121        if selected.len() < min_inputs {
1122            return Err(VmError::Runtime(format!(
1123                "workflow stage {node_id} requires at least {min_inputs} input artifacts"
1124            )));
1125        }
1126    }
1127    if let Some(max_inputs) = node.input_contract.max_inputs {
1128        if selected.len() > max_inputs {
1129            return Err(VmError::Runtime(format!(
1130                "workflow stage {node_id} accepts at most {max_inputs} input artifacts"
1131            )));
1132        }
1133    }
1134    let prepared_prompt = super::prepare_workflow_stage_prompt(
1135        ctx,
1136        task,
1137        node.task_label.as_deref(),
1138        &selected,
1139        &context_policy,
1140        rendered_context_override.as_deref(),
1141        &verification_contracts,
1142    )
1143    .await?;
1144    let prompt = prepared_prompt.prompt;
1145    let rendered_context = prepared_prompt.rendered_context;
1146    let rendered_verification = prepared_prompt.rendered_verification;
1147
1148    let tool_names = tool_names_from_spec(&node.tools);
1149    let stage_agent_options = super::prepare_workflow_stage_agent_options(
1150        ctx,
1151        node,
1152        &stage_session_id,
1153        !tool_names.is_empty(),
1154    )
1155    .await?;
1156    let tool_format = stage_agent_options.tool_format.clone();
1157    let result = if node.kind == "verify" {
1158        if let Some(command) = node
1159            .verify
1160            .as_ref()
1161            .and_then(|verify| verify.as_object())
1162            .and_then(|verify| verify.get("command"))
1163            .and_then(|value| value.as_str())
1164            .map(str::trim)
1165            .filter(|value| !value.is_empty())
1166        {
1167            let (program, args) = if cfg!(target_os = "windows") {
1168                ("cmd", vec!["/C".to_string(), command.to_string()])
1169            } else {
1170                // Do not use a login shell here. On macOS, `/bin/sh -l`
1171                // reads user dotfiles such as `~/.profile`, which makes
1172                // sandboxed verification depend on out-of-worktree state.
1173                ("/bin/sh", vec!["-c".to_string(), command.to_string()])
1174            };
1175            let mut process_config = crate::stdlib::sandbox::ProcessCommandConfig {
1176                stdin_null: true,
1177                ..Default::default()
1178            };
1179            if let Some(context) = crate::stdlib::process::current_execution_context() {
1180                if let Some(cwd) = context.cwd.filter(|cwd| !cwd.is_empty()) {
1181                    crate::stdlib::sandbox::enforce_process_cwd(std::path::Path::new(&cwd))?;
1182                    process_config.cwd = Some(std::path::PathBuf::from(cwd));
1183                }
1184                if !context.env.is_empty() {
1185                    process_config.env.extend(context.env);
1186                }
1187            }
1188            let output = crate::stdlib::sandbox::command_output(program, &args, &process_config)?;
1189            let stdout = String::from_utf8_lossy(&output.stdout).to_string();
1190            let stderr = String::from_utf8_lossy(&output.stderr).to_string();
1191            let combined = if stderr.is_empty() {
1192                stdout.clone()
1193            } else if stdout.is_empty() {
1194                stderr.clone()
1195            } else {
1196                format!("{stdout}\n{stderr}")
1197            };
1198            serde_json::json!({
1199                "status": "completed",
1200                "text": combined,
1201                "visible_text": combined,
1202                "command": command,
1203                "stdout": stdout,
1204                "stderr": stderr,
1205                "exit_status": output.status.code().unwrap_or(-1),
1206                "success": output.status.success(),
1207            })
1208        } else {
1209            serde_json::json!({
1210                "status": "completed",
1211                "text": "",
1212                "visible_text": "",
1213            })
1214        }
1215    } else {
1216        let tools_value = stage_tools_value(node);
1217        let llm_options = workflow_stage_llm_options(
1218            node,
1219            &stage_session_id,
1220            &tools_value,
1221            &tool_names,
1222            &stage_agent_options,
1223        );
1224        let agent_loop_options = if stage_agent_options.run_agent_loop {
1225            workflow_stage_agent_loop_options(
1226                node,
1227                &stage_session_id,
1228                &tools_value,
1229                &tool_names,
1230                &stage_agent_options,
1231            )?
1232        } else {
1233            crate::value::DictMap::new()
1234        };
1235        return Ok(PreparedWorkflowStageNode {
1236            prompt,
1237            system: node.system.clone(),
1238            run_agent_loop: stage_agent_options.run_agent_loop,
1239            llm_options,
1240            agent_loop_options,
1241            result: None,
1242            selected,
1243            rendered_context,
1244            rendered_verification,
1245            verification_contracts,
1246            tool_format,
1247            stage_session_id,
1248        });
1249    };
1250
1251    Ok(PreparedWorkflowStageNode {
1252        prompt,
1253        system: node.system.clone(),
1254        run_agent_loop: false,
1255        llm_options: crate::value::DictMap::new(),
1256        agent_loop_options: crate::value::DictMap::new(),
1257        result: Some(result),
1258        selected,
1259        rendered_context,
1260        rendered_verification,
1261        verification_contracts,
1262        tool_format,
1263        stage_session_id,
1264    })
1265}
1266
1267pub fn complete_prepared_stage_node(
1268    node_id: &str,
1269    node: &WorkflowNode,
1270    prepared: &PreparedWorkflowStageNode,
1271    mut llm_result: serde_json::Value,
1272) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
1273    if let Some(payload) = llm_result.as_object_mut() {
1274        payload.insert(
1275            "prompt".to_string(),
1276            serde_json::json!(prepared.prompt.clone()),
1277        );
1278        payload.insert(
1279            "system_prompt".to_string(),
1280            serde_json::json!(node.system.clone().unwrap_or_default()),
1281        );
1282        payload.insert(
1283            "rendered_context".to_string(),
1284            serde_json::json!(prepared.rendered_context.clone()),
1285        );
1286        if !prepared.verification_contracts.is_empty() {
1287            payload.insert(
1288                "verification_contracts".to_string(),
1289                serde_json::to_value(&prepared.verification_contracts).unwrap_or_default(),
1290            );
1291            payload.insert(
1292                "rendered_verification_context".to_string(),
1293                serde_json::json!(prepared.rendered_verification.clone()),
1294            );
1295        }
1296        payload.insert(
1297            "selected_artifact_ids".to_string(),
1298            serde_json::json!(prepared
1299                .selected
1300                .iter()
1301                .map(|artifact| artifact.id.clone())
1302                .collect::<Vec<_>>()),
1303        );
1304        payload.insert(
1305            "selected_artifact_titles".to_string(),
1306            serde_json::json!(prepared
1307                .selected
1308                .iter()
1309                .map(|artifact| artifact.title.clone())
1310                .collect::<Vec<_>>()),
1311        );
1312        match payload
1313            .entry("tools".to_string())
1314            .or_insert_with(|| serde_json::json!({}))
1315        {
1316            serde_json::Value::Object(tools) => {
1317                tools.insert(
1318                    "mode".to_string(),
1319                    serde_json::json!(prepared.tool_format.clone()),
1320                );
1321            }
1322            slot => {
1323                *slot = serde_json::json!({ "mode": prepared.tool_format.clone() });
1324            }
1325        }
1326    }
1327
1328    let visible_text = llm_result["text"].as_str().unwrap_or_default().to_string();
1329    // Non-LLM stages (verify command, condition, fork, join, ...) don't produce
1330    // a "transcript" field; fall back to the input so cross-stage conversation
1331    // state survives transitions.
1332    let result_transcript = llm_result
1333        .get("transcript")
1334        .cloned()
1335        .map(|value| crate::stdlib::json_to_vm_value(&value));
1336    let session_transcript = crate::agent_sessions::snapshot(&prepared.stage_session_id);
1337    let transcript = result_transcript
1338        .or(session_transcript)
1339        .and_then(|value| redact_transcript_visibility(&value, node.output_visibility.as_deref()));
1340    let output_kind = node
1341        .output_contract
1342        .output_kinds
1343        .first()
1344        .cloned()
1345        .unwrap_or_else(|| {
1346            if node.kind == "verify" {
1347                "verification_result".to_string()
1348            } else {
1349                "artifact".to_string()
1350            }
1351        });
1352    let mut metadata = BTreeMap::new();
1353    metadata.insert(
1354        "input_artifact_ids".to_string(),
1355        serde_json::json!(prepared
1356            .selected
1357            .iter()
1358            .map(|artifact| artifact.id.clone())
1359            .collect::<Vec<_>>()),
1360    );
1361    metadata.insert("node_kind".to_string(), serde_json::json!(node.kind));
1362    if !node.approval_policy.write_path_allowlist.is_empty() {
1363        metadata.insert(
1364            "changed_paths".to_string(),
1365            serde_json::json!(node.approval_policy.write_path_allowlist),
1366        );
1367    }
1368    let artifact = ArtifactRecord {
1369        type_name: "artifact".to_string(),
1370        id: new_id("artifact"),
1371        kind: output_kind,
1372        title: Some(format!("stage {node_id} output")),
1373        text: Some(visible_text),
1374        data: Some(llm_result.clone()),
1375        source: Some(node_id.to_string()),
1376        created_at: now_rfc3339(),
1377        freshness: Some("fresh".to_string()),
1378        priority: None,
1379        lineage: prepared
1380            .selected
1381            .iter()
1382            .map(|artifact| artifact.id.clone())
1383            .collect(),
1384        relevance: Some(1.0),
1385        estimated_tokens: None,
1386        stage: Some(node_id.to_string()),
1387        metadata,
1388    }
1389    .normalize();
1390
1391    Ok((llm_result, vec![artifact], transcript))
1392}
1393
1394pub async fn execute_stage_node(
1395    ctx: &crate::vm::AsyncBuiltinCtx,
1396    node_id: &str,
1397    node: &WorkflowNode,
1398    task: &str,
1399    artifacts: &[ArtifactRecord],
1400) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
1401    let prepared = prepare_stage_node(ctx, node_id, node, task, artifacts).await?;
1402    let llm_result = if let Some(result) = prepared.result.clone() {
1403        result
1404    } else if prepared.run_agent_loop {
1405        let result = crate::stdlib::harn_entry::call_agent_loop(
1406            ctx,
1407            prepared.prompt.clone(),
1408            prepared.system.clone(),
1409            prepared.agent_loop_options.clone(),
1410        )
1411        .await?;
1412        crate::llm::vm_value_to_json(&result)
1413    } else {
1414        let args = vec![
1415            VmValue::String(arcstr::ArcStr::from(prepared.prompt.clone())),
1416            prepared
1417                .system
1418                .clone()
1419                .map(|s| VmValue::String(arcstr::ArcStr::from(s)))
1420                .unwrap_or(VmValue::Nil),
1421            VmValue::dict(prepared.llm_options.clone()),
1422        ];
1423        let opts = extract_llm_options(&args)?;
1424        let result = vm_call_llm_full(&opts).await?;
1425        crate::llm::agent_loop_result_from_llm(&result, opts)
1426    };
1427    complete_prepared_stage_node(node_id, node, &prepared, llm_result)
1428}
1429
1430pub fn append_audit_entry(
1431    graph: &mut WorkflowGraph,
1432    op: &str,
1433    node_id: Option<String>,
1434    reason: Option<String>,
1435    metadata: BTreeMap<String, serde_json::Value>,
1436) {
1437    graph.audit_log.push(WorkflowAuditEntry {
1438        id: new_id("audit"),
1439        op: op.to_string(),
1440        node_id,
1441        timestamp: now_rfc3339(),
1442        reason,
1443        metadata,
1444    });
1445}