Skip to main content

harn_vm/orchestration/
workflow.rs

1//! Workflow graph types, normalization, validation, and execution.
2
3use std::collections::{BTreeMap, BTreeSet};
4
5use serde::{Deserialize, Serialize};
6
7use super::{
8    new_id, now_rfc3339, redact_transcript_visibility, ArtifactRecord, AutoCompactPolicy,
9    BranchSemantics, CapabilityPolicy, ContextPolicy, EscalationPolicy, JoinPolicy, MapPolicy,
10    ModelPolicy, ReducePolicy, RetryPolicy, StageContract,
11};
12use crate::llm::{extract_llm_options, vm_call_llm_full, vm_value_to_json};
13use crate::tool_surface::{tool_capability_policy_from_spec, tool_names_from_spec};
14use crate::value::{VmError, VmValue};
15
16pub const WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY: &str = "workflow_verification_contracts";
17pub const WORKFLOW_VERIFICATION_SCOPE_METADATA_KEY: &str = "workflow_verification_scope";
18
19#[derive(Clone, Debug, Default, Serialize, Deserialize)]
20#[serde(default)]
21pub struct WorkflowNode {
22    pub id: Option<String>,
23    pub kind: String,
24    pub mode: Option<String>,
25    pub prompt: Option<String>,
26    pub system: Option<String>,
27    pub task_label: Option<String>,
28    pub done_sentinel: Option<String>,
29    pub tools: serde_json::Value,
30    pub model_policy: ModelPolicy,
31    /// Per-stage auto-compaction settings for the agent loop's context
32    /// window. Lifecycle operations (reset, fork, trim, compact) are NOT
33    /// expressible here — call the `agent_session_*` builtins before the
34    /// stage or in a prior stage.
35    pub auto_compact: AutoCompactPolicy,
36    /// Output visibility filter applied to the transcript after the
37    /// stage's agent loop exits. `"public"` / `"public_only"` drops
38    /// `tool_result` messages and non-public events. `None` or any
39    /// unknown string is a no-op.
40    #[serde(default)]
41    pub output_visibility: Option<String>,
42    pub context_policy: ContextPolicy,
43    pub retry_policy: RetryPolicy,
44    pub capability_policy: CapabilityPolicy,
45    pub approval_policy: super::ToolApprovalPolicy,
46    pub input_contract: StageContract,
47    pub output_contract: StageContract,
48    pub branch_semantics: BranchSemantics,
49    pub map_policy: MapPolicy,
50    pub join_policy: JoinPolicy,
51    pub reduce_policy: ReducePolicy,
52    pub escalation_policy: EscalationPolicy,
53    pub verify: Option<serde_json::Value>,
54    /// When true, the stage's agent loop gates the done sentinel on the most
55    /// recent `run()` tool call exiting cleanly (`exit_code == 0`). Use for
56    /// persistent execute stages that fold verification into the loop via a
57    /// shell-exec tool the model invokes explicitly.
58    #[serde(default)]
59    pub exit_when_verified: bool,
60    pub metadata: BTreeMap<String, serde_json::Value>,
61    #[serde(skip)]
62    pub raw_tools: Option<VmValue>,
63    /// Raw auto_compact VmValue dict — preserved for extracting closure
64    /// fields (compress_callback, mask_callback, custom_compactor) that
65    /// can't go through serde.
66    #[serde(skip)]
67    pub raw_auto_compact: Option<VmValue>,
68    /// Raw model_policy VmValue dict — preserved for extracting closure
69    /// fields (post_turn_callback) that can't go through serde.
70    #[serde(skip)]
71    pub raw_model_policy: Option<VmValue>,
72    /// Raw context_assembler VmValue dict — when set, the stage's
73    /// artifact context is packed through `assemble_context` before
74    /// rendering the system prompt. Closure fields (`ranker_callback`)
75    /// are preserved here because they can't round-trip through serde.
76    #[serde(skip)]
77    pub raw_context_assembler: Option<VmValue>,
78}
79
80impl PartialEq for WorkflowNode {
81    fn eq(&self, other: &Self) -> bool {
82        serde_json::to_value(self).ok() == serde_json::to_value(other).ok()
83    }
84}
85
86#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
87#[serde(default)]
88pub struct VerificationRequirement {
89    pub kind: String,
90    pub value: String,
91    pub note: Option<String>,
92}
93
94#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
95#[serde(default)]
96pub struct VerificationContract {
97    pub source_node: Option<String>,
98    pub summary: Option<String>,
99    pub command: Option<String>,
100    pub expect_status: Option<i64>,
101    pub assert_text: Option<String>,
102    pub expect_text: Option<String>,
103    pub required_identifiers: Vec<String>,
104    pub required_paths: Vec<String>,
105    pub required_text: Vec<String>,
106    pub notes: Vec<String>,
107    pub checks: Vec<VerificationRequirement>,
108}
109
110impl VerificationContract {
111    fn is_empty(&self) -> bool {
112        self.summary.is_none()
113            && self.command.is_none()
114            && self.expect_status.is_none()
115            && self.assert_text.is_none()
116            && self.expect_text.is_none()
117            && self.required_identifiers.is_empty()
118            && self.required_paths.is_empty()
119            && self.required_text.is_empty()
120            && self.notes.is_empty()
121            && self.checks.is_empty()
122    }
123}
124
125fn push_unique_string(values: &mut Vec<String>, value: &str) {
126    let trimmed = value.trim();
127    if trimmed.is_empty() {
128        return;
129    }
130    if !values.iter().any(|existing| existing == trimmed) {
131        values.push(trimmed.to_string());
132    }
133}
134
135fn push_unique_requirement(
136    values: &mut Vec<VerificationRequirement>,
137    kind: &str,
138    value: &str,
139    note: Option<&str>,
140) {
141    let trimmed_kind = kind.trim();
142    let trimmed_value = value.trim();
143    let trimmed_note = note
144        .map(str::trim)
145        .filter(|candidate| !candidate.is_empty())
146        .map(|candidate| candidate.to_string());
147    if trimmed_kind.is_empty() || trimmed_value.is_empty() {
148        return;
149    }
150    let candidate = VerificationRequirement {
151        kind: trimmed_kind.to_string(),
152        value: trimmed_value.to_string(),
153        note: trimmed_note,
154    };
155    if !values.iter().any(|existing| existing == &candidate) {
156        values.push(candidate);
157    }
158}
159
160fn json_string_list(value: Option<&serde_json::Value>) -> Vec<String> {
161    match value {
162        Some(serde_json::Value::String(text)) => {
163            let mut values = Vec::new();
164            push_unique_string(&mut values, text);
165            values
166        }
167        Some(serde_json::Value::Array(items)) => {
168            let mut values = Vec::new();
169            for item in items {
170                if let Some(text) = item.as_str() {
171                    push_unique_string(&mut values, text);
172                }
173            }
174            values
175        }
176        _ => Vec::new(),
177    }
178}
179
180fn merge_verification_requirement_list(
181    target: &mut Vec<VerificationRequirement>,
182    value: Option<&serde_json::Value>,
183) {
184    let Some(items) = value.and_then(|raw| raw.as_array()) else {
185        return;
186    };
187    for item in items {
188        let Some(object) = item.as_object() else {
189            continue;
190        };
191        let kind = object
192            .get("kind")
193            .and_then(|value| value.as_str())
194            .unwrap_or_default();
195        let value = object
196            .get("value")
197            .and_then(|value| value.as_str())
198            .unwrap_or_default();
199        let note = object
200            .get("note")
201            .or_else(|| object.get("description"))
202            .or_else(|| object.get("reason"))
203            .and_then(|value| value.as_str());
204        push_unique_requirement(target, kind, value, note);
205    }
206}
207
208fn merge_verification_contract_fields(
209    target: &mut VerificationContract,
210    object: &serde_json::Map<String, serde_json::Value>,
211) {
212    if target.summary.is_none() {
213        target.summary = object
214            .get("summary")
215            .and_then(|value| value.as_str())
216            .map(str::trim)
217            .filter(|value| !value.is_empty())
218            .map(|value| value.to_string());
219    }
220    if target.command.is_none() {
221        target.command = object
222            .get("command")
223            .and_then(|value| value.as_str())
224            .map(str::trim)
225            .filter(|value| !value.is_empty())
226            .map(|value| value.to_string());
227    }
228    if target.expect_status.is_none() {
229        target.expect_status = object.get("expect_status").and_then(|value| value.as_i64());
230    }
231    if target.assert_text.is_none() {
232        target.assert_text = object
233            .get("assert_text")
234            .and_then(|value| value.as_str())
235            .map(str::trim)
236            .filter(|value| !value.is_empty())
237            .map(|value| value.to_string());
238    }
239    if target.expect_text.is_none() {
240        target.expect_text = object
241            .get("expect_text")
242            .and_then(|value| value.as_str())
243            .map(str::trim)
244            .filter(|value| !value.is_empty())
245            .map(|value| value.to_string());
246    }
247
248    for value in json_string_list(
249        object
250            .get("required_identifiers")
251            .or_else(|| object.get("identifiers")),
252    ) {
253        push_unique_string(&mut target.required_identifiers, &value);
254    }
255    for value in json_string_list(object.get("required_paths").or_else(|| object.get("paths"))) {
256        push_unique_string(&mut target.required_paths, &value);
257    }
258    for value in json_string_list(
259        object
260            .get("required_text")
261            .or_else(|| object.get("exact_text"))
262            .or_else(|| object.get("required_strings")),
263    ) {
264        push_unique_string(&mut target.required_text, &value);
265    }
266    for value in json_string_list(object.get("notes")) {
267        push_unique_string(&mut target.notes, &value);
268    }
269    merge_verification_requirement_list(&mut target.checks, object.get("checks"));
270}
271
272fn load_verification_contract_file(path: &str) -> Result<serde_json::Value, VmError> {
273    let resolved = crate::stdlib::process::resolve_source_asset_path(path);
274    let contents = std::fs::read_to_string(&resolved).map_err(|error| {
275        VmError::Runtime(format!(
276            "workflow verification contract read failed for {}: {error}",
277            resolved.display()
278        ))
279    })?;
280    serde_json::from_str(&contents).map_err(|error| {
281        VmError::Runtime(format!(
282            "workflow verification contract parse failed for {}: {error}",
283            resolved.display()
284        ))
285    })
286}
287
288fn resolve_verification_contract_path(
289    verify: &serde_json::Map<String, serde_json::Value>,
290) -> Result<Option<serde_json::Value>, VmError> {
291    let Some(path) = verify
292        .get("contract_path")
293        .or_else(|| verify.get("verification_contract_path"))
294        .and_then(|value| value.as_str())
295        .map(str::trim)
296        .filter(|value| !value.is_empty())
297    else {
298        return Ok(None);
299    };
300    Ok(Some(load_verification_contract_file(path)?))
301}
302
303pub fn verification_contract_from_verify(
304    node_id: &str,
305    verify: Option<&serde_json::Value>,
306) -> Result<Option<VerificationContract>, VmError> {
307    let Some(verify_object) = verify.and_then(|value| value.as_object()) else {
308        return Ok(None);
309    };
310
311    let mut contract = VerificationContract {
312        source_node: Some(node_id.to_string()),
313        ..Default::default()
314    };
315
316    if let Some(file_contract) = resolve_verification_contract_path(verify_object)? {
317        let Some(object) = file_contract.as_object() else {
318            return Err(VmError::Runtime(
319                "workflow verification contract file must parse to a JSON object".to_string(),
320            ));
321        };
322        merge_verification_contract_fields(&mut contract, object);
323    }
324
325    if let Some(inline_contract) = verify_object.get("contract") {
326        let Some(object) = inline_contract.as_object() else {
327            return Err(VmError::Runtime(
328                "workflow verify.contract must be an object".to_string(),
329            ));
330        };
331        merge_verification_contract_fields(&mut contract, object);
332    }
333
334    merge_verification_contract_fields(&mut contract, verify_object);
335
336    if let Some(assert_text) = contract.assert_text.clone() {
337        push_unique_requirement(
338            &mut contract.checks,
339            "visible_text_contains",
340            &assert_text,
341            Some("verify stage requires visible output to contain this text"),
342        );
343    }
344    if let Some(expect_text) = contract.expect_text.clone() {
345        push_unique_requirement(
346            &mut contract.checks,
347            "combined_output_contains",
348            &expect_text,
349            Some("verify command requires combined stdout/stderr to contain this text"),
350        );
351    }
352    if let Some(expect_status) = contract.expect_status {
353        push_unique_requirement(
354            &mut contract.checks,
355            "expect_status",
356            &expect_status.to_string(),
357            Some("verify command exit status must match exactly"),
358        );
359    }
360    for identifier in contract.required_identifiers.clone() {
361        push_unique_requirement(
362            &mut contract.checks,
363            "identifier",
364            &identifier,
365            Some("use this exact identifier spelling"),
366        );
367    }
368    for path in contract.required_paths.clone() {
369        push_unique_requirement(
370            &mut contract.checks,
371            "path",
372            &path,
373            Some("preserve this exact path"),
374        );
375    }
376    for text in contract.required_text.clone() {
377        push_unique_requirement(
378            &mut contract.checks,
379            "text",
380            &text,
381            Some("required exact text or wiring snippet"),
382        );
383    }
384
385    if contract.is_empty() {
386        return Ok(None);
387    }
388    Ok(Some(contract))
389}
390
391fn push_unique_contract(values: &mut Vec<VerificationContract>, candidate: VerificationContract) {
392    if !values.iter().any(|existing| existing == &candidate) {
393        values.push(candidate);
394    }
395}
396
397pub fn workflow_verification_contracts(
398    graph: &WorkflowGraph,
399) -> Result<Vec<VerificationContract>, VmError> {
400    let mut contracts = Vec::new();
401    for (node_id, node) in &graph.nodes {
402        if let Some(contract) = verification_contract_from_verify(node_id, node.verify.as_ref())? {
403            push_unique_contract(&mut contracts, contract);
404        }
405    }
406    Ok(contracts)
407}
408
409pub fn inject_workflow_verification_contracts(
410    node: &mut WorkflowNode,
411    contracts: &[VerificationContract],
412) {
413    if contracts.is_empty() {
414        return;
415    }
416    node.metadata.insert(
417        WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY.to_string(),
418        serde_json::to_value(contracts).unwrap_or_default(),
419    );
420}
421
422pub fn stage_verification_contracts(
423    node_id: &str,
424    node: &WorkflowNode,
425) -> Result<Vec<VerificationContract>, VmError> {
426    let local_contract = verification_contract_from_verify(node_id, node.verify.as_ref())?;
427    let local_only = matches!(
428        node.metadata
429            .get(WORKFLOW_VERIFICATION_SCOPE_METADATA_KEY)
430            .and_then(|value| value.as_str()),
431        Some("local_only")
432    );
433    if local_only {
434        return Ok(local_contract.into_iter().collect());
435    }
436
437    let mut contracts = node
438        .metadata
439        .get(WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY)
440        .cloned()
441        .map(|value| {
442            serde_json::from_value::<Vec<VerificationContract>>(value).map_err(|error| {
443                VmError::Runtime(format!(
444                    "workflow stage {node_id} verification contract metadata parse failed: {error}"
445                ))
446            })
447        })
448        .transpose()?
449        .unwrap_or_default();
450
451    if let Some(local_contract) = local_contract {
452        push_unique_contract(&mut contracts, local_contract);
453    }
454    Ok(contracts)
455}
456
457#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
458#[serde(default)]
459pub struct WorkflowEdge {
460    pub from: String,
461    pub to: String,
462    pub branch: Option<String>,
463    pub label: Option<String>,
464}
465
466#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
467#[serde(default)]
468pub struct WorkflowGraph {
469    #[serde(rename = "_type")]
470    pub type_name: String,
471    pub id: String,
472    pub name: Option<String>,
473    pub version: usize,
474    pub entry: String,
475    pub nodes: BTreeMap<String, WorkflowNode>,
476    pub edges: Vec<WorkflowEdge>,
477    pub capability_policy: CapabilityPolicy,
478    pub approval_policy: super::ToolApprovalPolicy,
479    pub metadata: BTreeMap<String, serde_json::Value>,
480    pub audit_log: Vec<WorkflowAuditEntry>,
481}
482
483#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
484#[serde(default)]
485pub struct WorkflowAuditEntry {
486    pub id: String,
487    pub op: String,
488    pub node_id: Option<String>,
489    pub timestamp: String,
490    pub reason: Option<String>,
491    pub metadata: BTreeMap<String, serde_json::Value>,
492}
493
494#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
495#[serde(default)]
496pub struct WorkflowValidationReport {
497    pub valid: bool,
498    pub errors: Vec<String>,
499    pub warnings: Vec<String>,
500    pub reachable_nodes: Vec<String>,
501}
502
503pub fn parse_workflow_node_value(value: &VmValue, label: &str) -> Result<WorkflowNode, VmError> {
504    let mut node: WorkflowNode = super::parse_json_payload(vm_value_to_json(value), label)?;
505    let dict = value.as_dict();
506    node.raw_tools = dict.and_then(|d| d.get("tools")).cloned();
507    node.raw_auto_compact = dict.and_then(|d| d.get("auto_compact")).cloned();
508    node.raw_model_policy = dict.and_then(|d| d.get("model_policy")).cloned();
509    node.raw_context_assembler = dict.and_then(|d| d.get("context_assembler")).cloned();
510    Ok(node)
511}
512
513pub fn parse_workflow_node_json(
514    json: serde_json::Value,
515    label: &str,
516) -> Result<WorkflowNode, VmError> {
517    super::parse_json_payload(json, label)
518}
519
520pub fn parse_workflow_edge_json(
521    json: serde_json::Value,
522    label: &str,
523) -> Result<WorkflowEdge, VmError> {
524    super::parse_json_payload(json, label)
525}
526
527pub fn normalize_workflow_value(value: &VmValue) -> Result<WorkflowGraph, VmError> {
528    let mut graph: WorkflowGraph = super::parse_json_value(value)?;
529    let as_dict = value.as_dict().cloned().unwrap_or_default();
530
531    if graph.nodes.is_empty() {
532        for key in ["act", "verify", "repair"] {
533            if let Some(node_value) = as_dict.get(key) {
534                let mut node = parse_workflow_node_value(node_value, "orchestration")?;
535                let raw_node = node_value.as_dict().cloned().unwrap_or_default();
536                node.id = Some(key.to_string());
537                if node.kind.is_empty() {
538                    node.kind = if key == "verify" {
539                        "verify".to_string()
540                    } else {
541                        "stage".to_string()
542                    };
543                }
544                if node.model_policy.provider.is_none() {
545                    node.model_policy.provider = as_dict
546                        .get("provider")
547                        .map(|value| value.display())
548                        .filter(|value| !value.is_empty());
549                }
550                if node.model_policy.model.is_none() {
551                    node.model_policy.model = as_dict
552                        .get("model")
553                        .map(|value| value.display())
554                        .filter(|value| !value.is_empty());
555                }
556                if node.model_policy.model_tier.is_none() {
557                    node.model_policy.model_tier = as_dict
558                        .get("model_tier")
559                        .or_else(|| as_dict.get("tier"))
560                        .map(|value| value.display())
561                        .filter(|value| !value.is_empty());
562                }
563                if node.model_policy.temperature.is_none() {
564                    node.model_policy.temperature = as_dict.get("temperature").and_then(|value| {
565                        if let VmValue::Float(number) = value {
566                            Some(*number)
567                        } else {
568                            value.as_int().map(|number| number as f64)
569                        }
570                    });
571                }
572                if node.model_policy.max_tokens.is_none() {
573                    node.model_policy.max_tokens =
574                        as_dict.get("max_tokens").and_then(|value| value.as_int());
575                }
576                if node.mode.is_none() {
577                    node.mode = as_dict
578                        .get("mode")
579                        .map(|value| value.display())
580                        .filter(|value| !value.is_empty());
581                }
582                if node.done_sentinel.is_none() {
583                    node.done_sentinel = as_dict
584                        .get("done_sentinel")
585                        .map(|value| value.display())
586                        .filter(|value| !value.is_empty());
587                }
588                if key == "verify"
589                    && node.verify.is_none()
590                    && (raw_node.contains_key("assert_text")
591                        || raw_node.contains_key("command")
592                        || raw_node.contains_key("expect_status")
593                        || raw_node.contains_key("expect_text"))
594                {
595                    node.verify = Some(serde_json::json!({
596                        "assert_text": raw_node.get("assert_text").map(vm_value_to_json),
597                        "command": raw_node.get("command").map(vm_value_to_json),
598                        "expect_status": raw_node.get("expect_status").map(vm_value_to_json),
599                        "expect_text": raw_node.get("expect_text").map(vm_value_to_json),
600                    }));
601                }
602                graph.nodes.insert(key.to_string(), node);
603            }
604        }
605        if graph.entry.is_empty() && graph.nodes.contains_key("act") {
606            graph.entry = "act".to_string();
607        }
608        if graph.edges.is_empty() && graph.nodes.contains_key("act") {
609            if graph.nodes.contains_key("verify") {
610                graph.edges.push(WorkflowEdge {
611                    from: "act".to_string(),
612                    to: "verify".to_string(),
613                    branch: None,
614                    label: None,
615                });
616            }
617            if graph.nodes.contains_key("repair") {
618                graph.edges.push(WorkflowEdge {
619                    from: "verify".to_string(),
620                    to: "repair".to_string(),
621                    branch: Some("failed".to_string()),
622                    label: None,
623                });
624                graph.edges.push(WorkflowEdge {
625                    from: "repair".to_string(),
626                    to: "verify".to_string(),
627                    branch: Some("retry".to_string()),
628                    label: None,
629                });
630            }
631        }
632    }
633
634    if graph.type_name.is_empty() {
635        graph.type_name = "workflow_graph".to_string();
636    }
637    if graph.id.is_empty() {
638        graph.id = new_id("workflow");
639    }
640    if graph.version == 0 {
641        graph.version = 1;
642    }
643    if graph.entry.is_empty() {
644        graph.entry = graph
645            .nodes
646            .keys()
647            .next()
648            .cloned()
649            .unwrap_or_else(|| "act".to_string());
650    }
651    for (node_id, node) in &mut graph.nodes {
652        if node.raw_tools.is_none() {
653            node.raw_tools = as_dict
654                .get("nodes")
655                .and_then(|nodes| nodes.as_dict())
656                .and_then(|nodes| nodes.get(node_id))
657                .and_then(|node_value| node_value.as_dict())
658                .and_then(|raw_node| raw_node.get("tools"))
659                .cloned();
660        }
661        if node.id.is_none() {
662            node.id = Some(node_id.clone());
663        }
664        if node.kind.is_empty() {
665            node.kind = "stage".to_string();
666        }
667        if node.join_policy.strategy.is_empty() {
668            node.join_policy.strategy = "all".to_string();
669        }
670        if node.reduce_policy.strategy.is_empty() {
671            node.reduce_policy.strategy = "concat".to_string();
672        }
673        if node.output_contract.output_kinds.is_empty() {
674            node.output_contract.output_kinds = vec![match node.kind.as_str() {
675                "verify" => "verification_result".to_string(),
676                "reduce" => node
677                    .reduce_policy
678                    .output_kind
679                    .clone()
680                    .unwrap_or_else(|| "summary".to_string()),
681                "map" => node
682                    .map_policy
683                    .output_kind
684                    .clone()
685                    .unwrap_or_else(|| "artifact".to_string()),
686                "escalation" => "plan".to_string(),
687                _ => "artifact".to_string(),
688            }];
689        }
690        if node.retry_policy.max_attempts == 0 {
691            node.retry_policy.max_attempts = 1;
692        }
693    }
694    Ok(graph)
695}
696
697pub fn validate_workflow(
698    graph: &WorkflowGraph,
699    ceiling: Option<&CapabilityPolicy>,
700) -> WorkflowValidationReport {
701    let mut errors = Vec::new();
702    let mut warnings = Vec::new();
703
704    if !graph.nodes.contains_key(&graph.entry) {
705        errors.push(format!("entry node does not exist: {}", graph.entry));
706    }
707
708    let node_ids: BTreeSet<String> = graph.nodes.keys().cloned().collect();
709    for edge in &graph.edges {
710        if !node_ids.contains(&edge.from) {
711            errors.push(format!("edge.from references unknown node: {}", edge.from));
712        }
713        if !node_ids.contains(&edge.to) {
714            errors.push(format!("edge.to references unknown node: {}", edge.to));
715        }
716    }
717
718    let reachable_nodes = reachable_nodes(graph);
719    for node_id in &node_ids {
720        if !reachable_nodes.contains(node_id) {
721            warnings.push(format!("node is unreachable: {node_id}"));
722        }
723    }
724
725    for (node_id, node) in &graph.nodes {
726        let incoming = graph
727            .edges
728            .iter()
729            .filter(|edge| edge.to == *node_id)
730            .count();
731        let outgoing: Vec<&WorkflowEdge> = graph
732            .edges
733            .iter()
734            .filter(|edge| edge.from == *node_id)
735            .collect();
736        if let Some(min_inputs) = node.input_contract.min_inputs {
737            if let Some(max_inputs) = node.input_contract.max_inputs {
738                if min_inputs > max_inputs {
739                    errors.push(format!(
740                        "node {node_id}: input contract min_inputs exceeds max_inputs"
741                    ));
742                }
743            }
744        }
745        match node.kind.as_str() {
746            "condition" => {
747                let has_true = outgoing
748                    .iter()
749                    .any(|edge| edge.branch.as_deref() == Some("true"));
750                let has_false = outgoing
751                    .iter()
752                    .any(|edge| edge.branch.as_deref() == Some("false"));
753                if !has_true || !has_false {
754                    errors.push(format!(
755                        "node {node_id}: condition nodes require both 'true' and 'false' branch edges"
756                    ));
757                }
758            }
759            "fork" if outgoing.len() < 2 => {
760                errors.push(format!(
761                    "node {node_id}: fork nodes require at least two outgoing edges"
762                ));
763            }
764            "join" if incoming < 2 => {
765                warnings.push(format!(
766                    "node {node_id}: join node has fewer than two incoming edges"
767                ));
768            }
769            "map"
770                if node.map_policy.items.is_empty()
771                    && node.map_policy.item_artifact_kind.is_none()
772                    && node.input_contract.input_kinds.is_empty() =>
773            {
774                errors.push(format!(
775                    "node {node_id}: map nodes require items, item_artifact_kind, or input_contract.input_kinds"
776                ));
777            }
778            "reduce" if node.input_contract.input_kinds.is_empty() => {
779                warnings.push(format!(
780                    "node {node_id}: reduce node has no input_contract.input_kinds; it will consume all available artifacts"
781                ));
782            }
783            _ => {}
784        }
785    }
786
787    if let Some(ceiling) = ceiling {
788        if let Err(error) = ceiling.intersect(&graph.capability_policy) {
789            errors.push(error);
790        }
791        for (node_id, node) in &graph.nodes {
792            if let Err(error) = ceiling.intersect(&node.capability_policy) {
793                errors.push(format!("node {node_id}: {error}"));
794            }
795        }
796    }
797
798    for diagnostic in crate::tool_surface::validate_workflow_graph(graph) {
799        let message = format!("{}: {}", diagnostic.code, diagnostic.message);
800        match diagnostic.severity {
801            crate::tool_surface::ToolSurfaceSeverity::Error => errors.push(message),
802            crate::tool_surface::ToolSurfaceSeverity::Warning => warnings.push(message),
803        }
804    }
805
806    WorkflowValidationReport {
807        valid: errors.is_empty(),
808        errors,
809        warnings,
810        reachable_nodes: reachable_nodes.into_iter().collect(),
811    }
812}
813
814fn reachable_nodes(graph: &WorkflowGraph) -> BTreeSet<String> {
815    let mut seen = BTreeSet::new();
816    let mut stack = vec![graph.entry.clone()];
817    while let Some(node_id) = stack.pop() {
818        if !seen.insert(node_id.clone()) {
819            continue;
820        }
821        for edge in graph.edges.iter().filter(|edge| edge.from == node_id) {
822            stack.push(edge.to.clone());
823        }
824    }
825    seen
826}
827
828/// Pick the session id a stage should run under. Prefers an explicit
829/// `session_id` on the node's `model_policy` dict (so pipelines with
830/// `agent_session_open` / `agent_session_fork` flowing through a graph
831/// line up); falls back to a stable, node-derived id so multi-stage
832/// graphs with no explicit session share a conversation across stages.
833fn resolve_node_session_id(node: &WorkflowNode) -> String {
834    if let Some(explicit) = node
835        .raw_model_policy
836        .as_ref()
837        .and_then(|v| v.as_dict())
838        .and_then(|d| d.get("session_id"))
839        .and_then(|v| match v {
840            VmValue::String(s) if !s.trim().is_empty() => Some(s.to_string()),
841            _ => None,
842        })
843    {
844        return explicit;
845    }
846    if let Some(persisted) = node
847        .metadata
848        .get("worker_session_id")
849        .and_then(|value| value.as_str())
850        .filter(|value| !value.trim().is_empty())
851    {
852        return persisted.to_string();
853    }
854    format!("workflow_stage_{}", uuid::Uuid::now_v7())
855}
856
857fn raw_auto_compact_dict(
858    node: &WorkflowNode,
859) -> Option<&std::collections::BTreeMap<String, VmValue>> {
860    node.raw_auto_compact
861        .as_ref()
862        .and_then(|value| value.as_dict())
863}
864
865fn raw_auto_compact_int(node: &WorkflowNode, key: &str) -> Option<usize> {
866    raw_auto_compact_dict(node)
867        .and_then(|dict| dict.get(key))
868        .and_then(|value| value.as_int())
869        .filter(|value| *value >= 0)
870        .map(|value| value as usize)
871}
872
873fn raw_auto_compact_string(node: &WorkflowNode, key: &str) -> Option<String> {
874    raw_auto_compact_dict(node)
875        .and_then(|dict| dict.get(key))
876        .and_then(|value| match value {
877            VmValue::String(text) if !text.trim().is_empty() => Some(text.to_string()),
878            _ => None,
879        })
880}
881
882fn raw_model_policy_dict(node: &WorkflowNode) -> Option<&BTreeMap<String, VmValue>> {
883    node.raw_model_policy
884        .as_ref()
885        .and_then(|value| value.as_dict())
886}
887
888fn insert_json_vm_option<T: Serialize>(
889    options: &mut BTreeMap<String, VmValue>,
890    key: &str,
891    value: &T,
892) -> Result<(), VmError> {
893    let json = serde_json::to_value(value).map_err(|error| {
894        VmError::Runtime(format!("workflow stage option encode error: {error}"))
895    })?;
896    options.insert(key.to_string(), crate::stdlib::json_to_vm_value(&json));
897    Ok(())
898}
899
900fn merge_raw_model_policy_options(options: &mut BTreeMap<String, VmValue>, node: &WorkflowNode) {
901    if let Some(raw) = raw_model_policy_dict(node) {
902        for (key, value) in raw {
903            if !matches!(value, VmValue::Nil) {
904                options.insert(key.clone(), value.clone());
905            }
906        }
907    }
908}
909
910fn preserve_nested_command_policy(options: &mut BTreeMap<String, VmValue>, node: &WorkflowNode) {
911    if options.contains_key("command_policy") {
912        return;
913    }
914    let Some(command_policy) = raw_model_policy_dict(node)
915        .and_then(|dict| dict.get("policy"))
916        .and_then(|value| value.as_dict())
917        .and_then(|policy| policy.get("command_policy"))
918    else {
919        return;
920    };
921    options.insert("command_policy".to_string(), command_policy.clone());
922}
923
924fn stage_tools_value(node: &WorkflowNode) -> Option<VmValue> {
925    node.raw_tools.clone().or_else(|| {
926        if matches!(node.tools, serde_json::Value::Null) {
927            None
928        } else {
929            Some(crate::stdlib::json_to_vm_value(&node.tools))
930        }
931    })
932}
933
934fn add_stage_tools_option(
935    options: &mut BTreeMap<String, VmValue>,
936    tools_value: &Option<VmValue>,
937    tool_names: &[String],
938) {
939    if !tool_names.is_empty() {
940        if let Some(value) = tools_value.clone() {
941            options.insert("tools".to_string(), value);
942        }
943    }
944}
945
946fn workflow_stage_llm_options(
947    node: &WorkflowNode,
948    stage_session_id: &str,
949    tools_value: &Option<VmValue>,
950    tool_names: &[String],
951    stage_agent_options: &super::WorkflowStageAgentOptions,
952) -> BTreeMap<String, VmValue> {
953    let mut options = stage_agent_options.llm_options_vm_dict();
954    merge_raw_model_policy_options(&mut options, node);
955    options.insert(
956        "session_id".to_string(),
957        VmValue::String(std::sync::Arc::from(stage_session_id.to_string())),
958    );
959    options.insert(
960        "tool_format".to_string(),
961        VmValue::String(std::sync::Arc::from(
962            stage_agent_options.tool_format.clone(),
963        )),
964    );
965    add_stage_tools_option(&mut options, tools_value, tool_names);
966    options
967}
968
969fn add_workflow_agent_compaction_options(
970    options: &mut BTreeMap<String, VmValue>,
971    node: &WorkflowNode,
972) {
973    if !node.auto_compact.enabled {
974        options.insert("auto_compact".to_string(), VmValue::Bool(false));
975        return;
976    }
977    options.insert("auto_compact".to_string(), VmValue::Bool(true));
978    if let Some(value) = node.auto_compact.token_threshold {
979        options.insert("compact_threshold".to_string(), VmValue::Int(value as i64));
980    }
981    if let Some(value) = node.auto_compact.tool_output_max_chars {
982        options.insert(
983            "tool_output_max_chars".to_string(),
984            VmValue::Int(value as i64),
985        );
986    }
987    if let Some(value) = node.auto_compact.hard_limit_tokens {
988        options.insert("hard_limit_tokens".to_string(), VmValue::Int(value as i64));
989    }
990    if let Some(strategy) = node.auto_compact.compact_strategy.as_ref() {
991        options.insert(
992            "compact_strategy".to_string(),
993            VmValue::String(std::sync::Arc::from(strategy.clone())),
994        );
995    }
996    if let Some(strategy) = node.auto_compact.hard_limit_strategy.as_ref() {
997        options.insert(
998            "hard_limit_strategy".to_string(),
999            VmValue::String(std::sync::Arc::from(strategy.clone())),
1000        );
1001    }
1002    if let Some(value) = raw_auto_compact_int(node, "compact_keep_last")
1003        .or_else(|| raw_auto_compact_int(node, "keep_last"))
1004    {
1005        options.insert("compact_keep_last".to_string(), VmValue::Int(value as i64));
1006    }
1007    if let Some(prompt) = raw_auto_compact_string(node, "summarize_prompt") {
1008        options.insert(
1009            "summarize_prompt".to_string(),
1010            VmValue::String(std::sync::Arc::from(prompt)),
1011        );
1012    }
1013    if let Some(dict) = raw_auto_compact_dict(node) {
1014        for key in ["compress_callback", "mask_callback"] {
1015            if let Some(callback) = dict.get(key) {
1016                options.insert(key.to_string(), callback.clone());
1017            }
1018        }
1019        if let Some(callback) = dict.get("custom_compactor") {
1020            options.insert("compact_callback".to_string(), callback.clone());
1021        }
1022    }
1023}
1024
1025fn workflow_stage_agent_loop_options(
1026    node: &WorkflowNode,
1027    stage_session_id: &str,
1028    tools_value: &Option<VmValue>,
1029    tool_names: &[String],
1030    stage_agent_options: &super::WorkflowStageAgentOptions,
1031) -> Result<BTreeMap<String, VmValue>, VmError> {
1032    let mut options = stage_agent_options.agent_loop_options_vm_dict();
1033    merge_raw_model_policy_options(&mut options, node);
1034    if let Some(context) = crate::orchestration::current_workflow_skill_context() {
1035        if !options.contains_key("skills") {
1036            if let Some(registry) = context.registry {
1037                options.insert("skills".to_string(), registry);
1038            }
1039        }
1040        if !options.contains_key("skill_match") {
1041            if let Some(match_config) = context.match_config {
1042                options.insert("skill_match".to_string(), match_config);
1043            }
1044        }
1045    }
1046    preserve_nested_command_policy(&mut options, node);
1047    add_workflow_agent_compaction_options(&mut options, node);
1048    add_stage_tools_option(&mut options, tools_value, tool_names);
1049    let tool_policy = tool_capability_policy_from_spec(&node.tools);
1050    let effective_policy = tool_policy
1051        .intersect(&node.capability_policy)
1052        .map_err(VmError::Runtime)?;
1053    insert_json_vm_option(&mut options, "policy", &effective_policy)?;
1054    insert_json_vm_option(&mut options, "approval_policy", &node.approval_policy)?;
1055    options.insert(
1056        "session_id".to_string(),
1057        VmValue::String(std::sync::Arc::from(stage_session_id.to_string())),
1058    );
1059    options.insert(
1060        "tool_format".to_string(),
1061        VmValue::String(std::sync::Arc::from(
1062            stage_agent_options.tool_format.clone(),
1063        )),
1064    );
1065    let stage_label = node
1066        .id
1067        .clone()
1068        .unwrap_or_else(|| stage_session_id.to_string());
1069    crate::orchestration::annotate_nested_execution_options(
1070        &mut options,
1071        crate::orchestration::NestedExecutionKind::WorkflowStage,
1072        &stage_label,
1073    );
1074    Ok(options)
1075}
1076
1077#[derive(Clone, Debug)]
1078pub struct PreparedWorkflowStageNode {
1079    pub prompt: String,
1080    pub system: Option<String>,
1081    pub run_agent_loop: bool,
1082    pub llm_options: BTreeMap<String, VmValue>,
1083    pub agent_loop_options: BTreeMap<String, VmValue>,
1084    pub result: Option<serde_json::Value>,
1085    pub selected: Vec<ArtifactRecord>,
1086    pub rendered_context: String,
1087    pub rendered_verification: String,
1088    pub verification_contracts: Vec<VerificationContract>,
1089    pub tool_format: String,
1090    pub stage_session_id: String,
1091}
1092
1093pub async fn prepare_stage_node(
1094    ctx: &crate::vm::AsyncBuiltinCtx,
1095    node_id: &str,
1096    node: &WorkflowNode,
1097    task: &str,
1098    artifacts: &[ArtifactRecord],
1099) -> Result<PreparedWorkflowStageNode, VmError> {
1100    let selected_stage = super::select_workflow_stage_artifacts(
1101        ctx,
1102        artifacts,
1103        &node.context_policy,
1104        &node.input_contract,
1105    )
1106    .await?;
1107    let selected = selected_stage.artifacts;
1108    let context_policy = selected_stage.context_policy;
1109    let rendered_context_override = if let Some(assembler) = node.raw_context_assembler.as_ref() {
1110        let assembled =
1111            crate::stdlib::assemble::assemble_from_options(ctx, &selected, assembler).await?;
1112        Some(super::render_assembled_chunks(&assembled))
1113    } else {
1114        None
1115    };
1116    let verification_contracts = super::stage_verification_contracts(node_id, node)?;
1117    let stage_session_id = resolve_node_session_id(node);
1118    if node.input_contract.require_transcript && !crate::agent_sessions::exists(&stage_session_id) {
1119        return Err(VmError::Runtime(format!(
1120            "workflow stage {node_id} requires an existing session \
1121             (call agent_session_open and feed session_id through model_policy \
1122             before entering this stage)"
1123        )));
1124    }
1125    if let Some(min_inputs) = node.input_contract.min_inputs {
1126        if selected.len() < min_inputs {
1127            return Err(VmError::Runtime(format!(
1128                "workflow stage {node_id} requires at least {min_inputs} input artifacts"
1129            )));
1130        }
1131    }
1132    if let Some(max_inputs) = node.input_contract.max_inputs {
1133        if selected.len() > max_inputs {
1134            return Err(VmError::Runtime(format!(
1135                "workflow stage {node_id} accepts at most {max_inputs} input artifacts"
1136            )));
1137        }
1138    }
1139    let prepared_prompt = super::prepare_workflow_stage_prompt(
1140        ctx,
1141        task,
1142        node.task_label.as_deref(),
1143        &selected,
1144        &context_policy,
1145        rendered_context_override.as_deref(),
1146        &verification_contracts,
1147    )
1148    .await?;
1149    let prompt = prepared_prompt.prompt;
1150    let rendered_context = prepared_prompt.rendered_context;
1151    let rendered_verification = prepared_prompt.rendered_verification;
1152
1153    let tool_names = tool_names_from_spec(&node.tools);
1154    let stage_agent_options = super::prepare_workflow_stage_agent_options(
1155        ctx,
1156        node,
1157        &stage_session_id,
1158        !tool_names.is_empty(),
1159    )
1160    .await?;
1161    let tool_format = stage_agent_options.tool_format.clone();
1162    let result = if node.kind == "verify" {
1163        if let Some(command) = node
1164            .verify
1165            .as_ref()
1166            .and_then(|verify| verify.as_object())
1167            .and_then(|verify| verify.get("command"))
1168            .and_then(|value| value.as_str())
1169            .map(str::trim)
1170            .filter(|value| !value.is_empty())
1171        {
1172            let (program, args) = if cfg!(target_os = "windows") {
1173                ("cmd", vec!["/C".to_string(), command.to_string()])
1174            } else {
1175                // Do not use a login shell here. On macOS, `/bin/sh -l`
1176                // reads user dotfiles such as `~/.profile`, which makes
1177                // sandboxed verification depend on out-of-worktree state.
1178                ("/bin/sh", vec!["-c".to_string(), command.to_string()])
1179            };
1180            let mut process_config = crate::stdlib::sandbox::ProcessCommandConfig {
1181                stdin_null: true,
1182                ..Default::default()
1183            };
1184            if let Some(context) = crate::stdlib::process::current_execution_context() {
1185                if let Some(cwd) = context.cwd.filter(|cwd| !cwd.is_empty()) {
1186                    crate::stdlib::sandbox::enforce_process_cwd(std::path::Path::new(&cwd))?;
1187                    process_config.cwd = Some(std::path::PathBuf::from(cwd));
1188                }
1189                if !context.env.is_empty() {
1190                    process_config.env.extend(context.env);
1191                }
1192            }
1193            let output = crate::stdlib::sandbox::command_output(program, &args, &process_config)?;
1194            let stdout = String::from_utf8_lossy(&output.stdout).to_string();
1195            let stderr = String::from_utf8_lossy(&output.stderr).to_string();
1196            let combined = if stderr.is_empty() {
1197                stdout.clone()
1198            } else if stdout.is_empty() {
1199                stderr.clone()
1200            } else {
1201                format!("{stdout}\n{stderr}")
1202            };
1203            serde_json::json!({
1204                "status": "completed",
1205                "text": combined,
1206                "visible_text": combined,
1207                "command": command,
1208                "stdout": stdout,
1209                "stderr": stderr,
1210                "exit_status": output.status.code().unwrap_or(-1),
1211                "success": output.status.success(),
1212            })
1213        } else {
1214            serde_json::json!({
1215                "status": "completed",
1216                "text": "",
1217                "visible_text": "",
1218            })
1219        }
1220    } else {
1221        let tools_value = stage_tools_value(node);
1222        let llm_options = workflow_stage_llm_options(
1223            node,
1224            &stage_session_id,
1225            &tools_value,
1226            &tool_names,
1227            &stage_agent_options,
1228        );
1229        let agent_loop_options = if stage_agent_options.run_agent_loop {
1230            workflow_stage_agent_loop_options(
1231                node,
1232                &stage_session_id,
1233                &tools_value,
1234                &tool_names,
1235                &stage_agent_options,
1236            )?
1237        } else {
1238            BTreeMap::new()
1239        };
1240        return Ok(PreparedWorkflowStageNode {
1241            prompt,
1242            system: node.system.clone(),
1243            run_agent_loop: stage_agent_options.run_agent_loop,
1244            llm_options,
1245            agent_loop_options,
1246            result: None,
1247            selected,
1248            rendered_context,
1249            rendered_verification,
1250            verification_contracts,
1251            tool_format,
1252            stage_session_id,
1253        });
1254    };
1255
1256    Ok(PreparedWorkflowStageNode {
1257        prompt,
1258        system: node.system.clone(),
1259        run_agent_loop: false,
1260        llm_options: BTreeMap::new(),
1261        agent_loop_options: BTreeMap::new(),
1262        result: Some(result),
1263        selected,
1264        rendered_context,
1265        rendered_verification,
1266        verification_contracts,
1267        tool_format,
1268        stage_session_id,
1269    })
1270}
1271
1272pub fn complete_prepared_stage_node(
1273    node_id: &str,
1274    node: &WorkflowNode,
1275    prepared: &PreparedWorkflowStageNode,
1276    mut llm_result: serde_json::Value,
1277) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
1278    if let Some(payload) = llm_result.as_object_mut() {
1279        payload.insert(
1280            "prompt".to_string(),
1281            serde_json::json!(prepared.prompt.clone()),
1282        );
1283        payload.insert(
1284            "system_prompt".to_string(),
1285            serde_json::json!(node.system.clone().unwrap_or_default()),
1286        );
1287        payload.insert(
1288            "rendered_context".to_string(),
1289            serde_json::json!(prepared.rendered_context.clone()),
1290        );
1291        if !prepared.verification_contracts.is_empty() {
1292            payload.insert(
1293                "verification_contracts".to_string(),
1294                serde_json::to_value(&prepared.verification_contracts).unwrap_or_default(),
1295            );
1296            payload.insert(
1297                "rendered_verification_context".to_string(),
1298                serde_json::json!(prepared.rendered_verification.clone()),
1299            );
1300        }
1301        payload.insert(
1302            "selected_artifact_ids".to_string(),
1303            serde_json::json!(prepared
1304                .selected
1305                .iter()
1306                .map(|artifact| artifact.id.clone())
1307                .collect::<Vec<_>>()),
1308        );
1309        payload.insert(
1310            "selected_artifact_titles".to_string(),
1311            serde_json::json!(prepared
1312                .selected
1313                .iter()
1314                .map(|artifact| artifact.title.clone())
1315                .collect::<Vec<_>>()),
1316        );
1317        match payload
1318            .entry("tools".to_string())
1319            .or_insert_with(|| serde_json::json!({}))
1320        {
1321            serde_json::Value::Object(tools) => {
1322                tools.insert(
1323                    "mode".to_string(),
1324                    serde_json::json!(prepared.tool_format.clone()),
1325                );
1326            }
1327            slot => {
1328                *slot = serde_json::json!({ "mode": prepared.tool_format.clone() });
1329            }
1330        }
1331    }
1332
1333    let visible_text = llm_result["text"].as_str().unwrap_or_default().to_string();
1334    // Non-LLM stages (verify command, condition, fork, join, ...) don't produce
1335    // a "transcript" field; fall back to the input so cross-stage conversation
1336    // state survives transitions.
1337    let result_transcript = llm_result
1338        .get("transcript")
1339        .cloned()
1340        .map(|value| crate::stdlib::json_to_vm_value(&value));
1341    let session_transcript = crate::agent_sessions::snapshot(&prepared.stage_session_id);
1342    let transcript = result_transcript
1343        .or(session_transcript)
1344        .and_then(|value| redact_transcript_visibility(&value, node.output_visibility.as_deref()));
1345    let output_kind = node
1346        .output_contract
1347        .output_kinds
1348        .first()
1349        .cloned()
1350        .unwrap_or_else(|| {
1351            if node.kind == "verify" {
1352                "verification_result".to_string()
1353            } else {
1354                "artifact".to_string()
1355            }
1356        });
1357    let mut metadata = BTreeMap::new();
1358    metadata.insert(
1359        "input_artifact_ids".to_string(),
1360        serde_json::json!(prepared
1361            .selected
1362            .iter()
1363            .map(|artifact| artifact.id.clone())
1364            .collect::<Vec<_>>()),
1365    );
1366    metadata.insert("node_kind".to_string(), serde_json::json!(node.kind));
1367    if !node.approval_policy.write_path_allowlist.is_empty() {
1368        metadata.insert(
1369            "changed_paths".to_string(),
1370            serde_json::json!(node.approval_policy.write_path_allowlist),
1371        );
1372    }
1373    let artifact = ArtifactRecord {
1374        type_name: "artifact".to_string(),
1375        id: new_id("artifact"),
1376        kind: output_kind,
1377        title: Some(format!("stage {node_id} output")),
1378        text: Some(visible_text),
1379        data: Some(llm_result.clone()),
1380        source: Some(node_id.to_string()),
1381        created_at: now_rfc3339(),
1382        freshness: Some("fresh".to_string()),
1383        priority: None,
1384        lineage: prepared
1385            .selected
1386            .iter()
1387            .map(|artifact| artifact.id.clone())
1388            .collect(),
1389        relevance: Some(1.0),
1390        estimated_tokens: None,
1391        stage: Some(node_id.to_string()),
1392        metadata,
1393    }
1394    .normalize();
1395
1396    Ok((llm_result, vec![artifact], transcript))
1397}
1398
1399pub async fn execute_stage_node(
1400    ctx: &crate::vm::AsyncBuiltinCtx,
1401    node_id: &str,
1402    node: &WorkflowNode,
1403    task: &str,
1404    artifacts: &[ArtifactRecord],
1405) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
1406    let prepared = prepare_stage_node(ctx, node_id, node, task, artifacts).await?;
1407    let llm_result = if let Some(result) = prepared.result.clone() {
1408        result
1409    } else if prepared.run_agent_loop {
1410        let result = crate::stdlib::harn_entry::call_agent_loop(
1411            ctx,
1412            prepared.prompt.clone(),
1413            prepared.system.clone(),
1414            prepared.agent_loop_options.clone(),
1415        )
1416        .await?;
1417        crate::llm::vm_value_to_json(&result)
1418    } else {
1419        let args = vec![
1420            VmValue::String(std::sync::Arc::from(prepared.prompt.clone())),
1421            prepared
1422                .system
1423                .clone()
1424                .map(|s| VmValue::String(std::sync::Arc::from(s)))
1425                .unwrap_or(VmValue::Nil),
1426            VmValue::Dict(std::sync::Arc::new(prepared.llm_options.clone())),
1427        ];
1428        let opts = extract_llm_options(&args)?;
1429        let result = vm_call_llm_full(&opts).await?;
1430        crate::llm::agent_loop_result_from_llm(&result, opts)
1431    };
1432    complete_prepared_stage_node(node_id, node, &prepared, llm_result)
1433}
1434
1435pub fn append_audit_entry(
1436    graph: &mut WorkflowGraph,
1437    op: &str,
1438    node_id: Option<String>,
1439    reason: Option<String>,
1440    metadata: BTreeMap<String, serde_json::Value>,
1441) {
1442    graph.audit_log.push(WorkflowAuditEntry {
1443        id: new_id("audit"),
1444        op: op.to_string(),
1445        node_id,
1446        timestamp: now_rfc3339(),
1447        reason,
1448        metadata,
1449    });
1450}