Skip to main content

harn_vm/orchestration/
workflow.rs

1//! Workflow graph types, normalization, validation, and execution.
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::rc::Rc;
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9    new_id, now_rfc3339, redact_transcript_visibility, ArtifactRecord, AutoCompactPolicy,
10    BranchSemantics, CapabilityPolicy, ContextPolicy, EscalationPolicy, JoinPolicy, MapPolicy,
11    ModelPolicy, ReducePolicy, RetryPolicy, StageContract,
12};
13use crate::llm::{extract_llm_options, vm_call_llm_full, vm_value_to_json};
14use crate::tool_surface::{tool_capability_policy_from_spec, tool_names_from_spec};
15use crate::value::{VmError, VmValue};
16
17pub const WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY: &str = "workflow_verification_contracts";
18pub const WORKFLOW_VERIFICATION_SCOPE_METADATA_KEY: &str = "workflow_verification_scope";
19
20#[derive(Clone, Debug, Default, Serialize, Deserialize)]
21#[serde(default)]
22pub struct WorkflowNode {
23    pub id: Option<String>,
24    pub kind: String,
25    pub mode: Option<String>,
26    pub prompt: Option<String>,
27    pub system: Option<String>,
28    pub task_label: Option<String>,
29    pub done_sentinel: Option<String>,
30    pub tools: serde_json::Value,
31    pub model_policy: ModelPolicy,
32    /// Per-stage auto-compaction settings for the agent loop's context
33    /// window. Lifecycle operations (reset, fork, trim, compact) are NOT
34    /// expressible here — call the `agent_session_*` builtins before the
35    /// stage or in a prior stage.
36    pub auto_compact: AutoCompactPolicy,
37    /// Output visibility filter applied to the transcript after the
38    /// stage's agent loop exits. `"public"` / `"public_only"` drops
39    /// `tool_result` messages and non-public events. `None` or any
40    /// unknown string is a no-op.
41    #[serde(default)]
42    pub output_visibility: Option<String>,
43    pub context_policy: ContextPolicy,
44    pub retry_policy: RetryPolicy,
45    pub capability_policy: CapabilityPolicy,
46    pub approval_policy: super::ToolApprovalPolicy,
47    pub input_contract: StageContract,
48    pub output_contract: StageContract,
49    pub branch_semantics: BranchSemantics,
50    pub map_policy: MapPolicy,
51    pub join_policy: JoinPolicy,
52    pub reduce_policy: ReducePolicy,
53    pub escalation_policy: EscalationPolicy,
54    pub verify: Option<serde_json::Value>,
55    /// When true, the stage's agent loop gates the done sentinel on the most
56    /// recent `run()` tool call exiting cleanly (`exit_code == 0`). Use for
57    /// persistent execute stages that fold verification into the loop via a
58    /// shell-exec tool the model invokes explicitly.
59    #[serde(default)]
60    pub exit_when_verified: bool,
61    pub metadata: BTreeMap<String, serde_json::Value>,
62    #[serde(skip)]
63    pub raw_tools: Option<VmValue>,
64    /// Raw auto_compact VmValue dict — preserved for extracting closure
65    /// fields (compress_callback, mask_callback, custom_compactor) that
66    /// can't go through serde.
67    #[serde(skip)]
68    pub raw_auto_compact: Option<VmValue>,
69    /// Raw model_policy VmValue dict — preserved for extracting closure
70    /// fields (post_turn_callback) that can't go through serde.
71    #[serde(skip)]
72    pub raw_model_policy: Option<VmValue>,
73    /// Raw context_assembler VmValue dict — when set, the stage's
74    /// artifact context is packed through `assemble_context` before
75    /// rendering the system prompt. Closure fields (`ranker_callback`)
76    /// are preserved here because they can't round-trip through serde.
77    #[serde(skip)]
78    pub raw_context_assembler: Option<VmValue>,
79}
80
81impl PartialEq for WorkflowNode {
82    fn eq(&self, other: &Self) -> bool {
83        serde_json::to_value(self).ok() == serde_json::to_value(other).ok()
84    }
85}
86
87#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
88#[serde(default)]
89pub struct VerificationRequirement {
90    pub kind: String,
91    pub value: String,
92    pub note: Option<String>,
93}
94
95#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
96#[serde(default)]
97pub struct VerificationContract {
98    pub source_node: Option<String>,
99    pub summary: Option<String>,
100    pub command: Option<String>,
101    pub expect_status: Option<i64>,
102    pub assert_text: Option<String>,
103    pub expect_text: Option<String>,
104    pub required_identifiers: Vec<String>,
105    pub required_paths: Vec<String>,
106    pub required_text: Vec<String>,
107    pub notes: Vec<String>,
108    pub checks: Vec<VerificationRequirement>,
109}
110
111impl VerificationContract {
112    fn is_empty(&self) -> bool {
113        self.summary.is_none()
114            && self.command.is_none()
115            && self.expect_status.is_none()
116            && self.assert_text.is_none()
117            && self.expect_text.is_none()
118            && self.required_identifiers.is_empty()
119            && self.required_paths.is_empty()
120            && self.required_text.is_empty()
121            && self.notes.is_empty()
122            && self.checks.is_empty()
123    }
124}
125
126fn push_unique_string(values: &mut Vec<String>, value: &str) {
127    let trimmed = value.trim();
128    if trimmed.is_empty() {
129        return;
130    }
131    if !values.iter().any(|existing| existing == trimmed) {
132        values.push(trimmed.to_string());
133    }
134}
135
136fn push_unique_requirement(
137    values: &mut Vec<VerificationRequirement>,
138    kind: &str,
139    value: &str,
140    note: Option<&str>,
141) {
142    let trimmed_kind = kind.trim();
143    let trimmed_value = value.trim();
144    let trimmed_note = note
145        .map(str::trim)
146        .filter(|candidate| !candidate.is_empty())
147        .map(|candidate| candidate.to_string());
148    if trimmed_kind.is_empty() || trimmed_value.is_empty() {
149        return;
150    }
151    let candidate = VerificationRequirement {
152        kind: trimmed_kind.to_string(),
153        value: trimmed_value.to_string(),
154        note: trimmed_note,
155    };
156    if !values.iter().any(|existing| existing == &candidate) {
157        values.push(candidate);
158    }
159}
160
161fn json_string_list(value: Option<&serde_json::Value>) -> Vec<String> {
162    match value {
163        Some(serde_json::Value::String(text)) => {
164            let mut values = Vec::new();
165            push_unique_string(&mut values, text);
166            values
167        }
168        Some(serde_json::Value::Array(items)) => {
169            let mut values = Vec::new();
170            for item in items {
171                if let Some(text) = item.as_str() {
172                    push_unique_string(&mut values, text);
173                }
174            }
175            values
176        }
177        _ => Vec::new(),
178    }
179}
180
181fn merge_verification_requirement_list(
182    target: &mut Vec<VerificationRequirement>,
183    value: Option<&serde_json::Value>,
184) {
185    let Some(items) = value.and_then(|raw| raw.as_array()) else {
186        return;
187    };
188    for item in items {
189        let Some(object) = item.as_object() else {
190            continue;
191        };
192        let kind = object
193            .get("kind")
194            .and_then(|value| value.as_str())
195            .unwrap_or_default();
196        let value = object
197            .get("value")
198            .and_then(|value| value.as_str())
199            .unwrap_or_default();
200        let note = object
201            .get("note")
202            .or_else(|| object.get("description"))
203            .or_else(|| object.get("reason"))
204            .and_then(|value| value.as_str());
205        push_unique_requirement(target, kind, value, note);
206    }
207}
208
209fn merge_verification_contract_fields(
210    target: &mut VerificationContract,
211    object: &serde_json::Map<String, serde_json::Value>,
212) {
213    if target.summary.is_none() {
214        target.summary = object
215            .get("summary")
216            .and_then(|value| value.as_str())
217            .map(str::trim)
218            .filter(|value| !value.is_empty())
219            .map(|value| value.to_string());
220    }
221    if target.command.is_none() {
222        target.command = object
223            .get("command")
224            .and_then(|value| value.as_str())
225            .map(str::trim)
226            .filter(|value| !value.is_empty())
227            .map(|value| value.to_string());
228    }
229    if target.expect_status.is_none() {
230        target.expect_status = object.get("expect_status").and_then(|value| value.as_i64());
231    }
232    if target.assert_text.is_none() {
233        target.assert_text = object
234            .get("assert_text")
235            .and_then(|value| value.as_str())
236            .map(str::trim)
237            .filter(|value| !value.is_empty())
238            .map(|value| value.to_string());
239    }
240    if target.expect_text.is_none() {
241        target.expect_text = object
242            .get("expect_text")
243            .and_then(|value| value.as_str())
244            .map(str::trim)
245            .filter(|value| !value.is_empty())
246            .map(|value| value.to_string());
247    }
248
249    for value in json_string_list(
250        object
251            .get("required_identifiers")
252            .or_else(|| object.get("identifiers")),
253    ) {
254        push_unique_string(&mut target.required_identifiers, &value);
255    }
256    for value in json_string_list(object.get("required_paths").or_else(|| object.get("paths"))) {
257        push_unique_string(&mut target.required_paths, &value);
258    }
259    for value in json_string_list(
260        object
261            .get("required_text")
262            .or_else(|| object.get("exact_text"))
263            .or_else(|| object.get("required_strings")),
264    ) {
265        push_unique_string(&mut target.required_text, &value);
266    }
267    for value in json_string_list(object.get("notes")) {
268        push_unique_string(&mut target.notes, &value);
269    }
270    merge_verification_requirement_list(&mut target.checks, object.get("checks"));
271}
272
273fn load_verification_contract_file(path: &str) -> Result<serde_json::Value, VmError> {
274    let resolved = crate::stdlib::process::resolve_source_asset_path(path);
275    let contents = std::fs::read_to_string(&resolved).map_err(|error| {
276        VmError::Runtime(format!(
277            "workflow verification contract read failed for {}: {error}",
278            resolved.display()
279        ))
280    })?;
281    serde_json::from_str(&contents).map_err(|error| {
282        VmError::Runtime(format!(
283            "workflow verification contract parse failed for {}: {error}",
284            resolved.display()
285        ))
286    })
287}
288
289fn resolve_verification_contract_path(
290    verify: &serde_json::Map<String, serde_json::Value>,
291) -> Result<Option<serde_json::Value>, VmError> {
292    let Some(path) = verify
293        .get("contract_path")
294        .or_else(|| verify.get("verification_contract_path"))
295        .and_then(|value| value.as_str())
296        .map(str::trim)
297        .filter(|value| !value.is_empty())
298    else {
299        return Ok(None);
300    };
301    Ok(Some(load_verification_contract_file(path)?))
302}
303
304pub fn verification_contract_from_verify(
305    node_id: &str,
306    verify: Option<&serde_json::Value>,
307) -> Result<Option<VerificationContract>, VmError> {
308    let Some(verify_object) = verify.and_then(|value| value.as_object()) else {
309        return Ok(None);
310    };
311
312    let mut contract = VerificationContract {
313        source_node: Some(node_id.to_string()),
314        ..Default::default()
315    };
316
317    if let Some(file_contract) = resolve_verification_contract_path(verify_object)? {
318        let Some(object) = file_contract.as_object() else {
319            return Err(VmError::Runtime(
320                "workflow verification contract file must parse to a JSON object".to_string(),
321            ));
322        };
323        merge_verification_contract_fields(&mut contract, object);
324    }
325
326    if let Some(inline_contract) = verify_object.get("contract") {
327        let Some(object) = inline_contract.as_object() else {
328            return Err(VmError::Runtime(
329                "workflow verify.contract must be an object".to_string(),
330            ));
331        };
332        merge_verification_contract_fields(&mut contract, object);
333    }
334
335    merge_verification_contract_fields(&mut contract, verify_object);
336
337    if let Some(assert_text) = contract.assert_text.clone() {
338        push_unique_requirement(
339            &mut contract.checks,
340            "visible_text_contains",
341            &assert_text,
342            Some("verify stage requires visible output to contain this text"),
343        );
344    }
345    if let Some(expect_text) = contract.expect_text.clone() {
346        push_unique_requirement(
347            &mut contract.checks,
348            "combined_output_contains",
349            &expect_text,
350            Some("verify command requires combined stdout/stderr to contain this text"),
351        );
352    }
353    if let Some(expect_status) = contract.expect_status {
354        push_unique_requirement(
355            &mut contract.checks,
356            "expect_status",
357            &expect_status.to_string(),
358            Some("verify command exit status must match exactly"),
359        );
360    }
361    for identifier in contract.required_identifiers.clone() {
362        push_unique_requirement(
363            &mut contract.checks,
364            "identifier",
365            &identifier,
366            Some("use this exact identifier spelling"),
367        );
368    }
369    for path in contract.required_paths.clone() {
370        push_unique_requirement(
371            &mut contract.checks,
372            "path",
373            &path,
374            Some("preserve this exact path"),
375        );
376    }
377    for text in contract.required_text.clone() {
378        push_unique_requirement(
379            &mut contract.checks,
380            "text",
381            &text,
382            Some("required exact text or wiring snippet"),
383        );
384    }
385
386    if contract.is_empty() {
387        return Ok(None);
388    }
389    Ok(Some(contract))
390}
391
392fn push_unique_contract(values: &mut Vec<VerificationContract>, candidate: VerificationContract) {
393    if !values.iter().any(|existing| existing == &candidate) {
394        values.push(candidate);
395    }
396}
397
398pub fn workflow_verification_contracts(
399    graph: &WorkflowGraph,
400) -> Result<Vec<VerificationContract>, VmError> {
401    let mut contracts = Vec::new();
402    for (node_id, node) in &graph.nodes {
403        if let Some(contract) = verification_contract_from_verify(node_id, node.verify.as_ref())? {
404            push_unique_contract(&mut contracts, contract);
405        }
406    }
407    Ok(contracts)
408}
409
410pub fn inject_workflow_verification_contracts(
411    node: &mut WorkflowNode,
412    contracts: &[VerificationContract],
413) {
414    if contracts.is_empty() {
415        return;
416    }
417    node.metadata.insert(
418        WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY.to_string(),
419        serde_json::to_value(contracts).unwrap_or_default(),
420    );
421}
422
423pub fn stage_verification_contracts(
424    node_id: &str,
425    node: &WorkflowNode,
426) -> Result<Vec<VerificationContract>, VmError> {
427    let local_contract = verification_contract_from_verify(node_id, node.verify.as_ref())?;
428    let local_only = matches!(
429        node.metadata
430            .get(WORKFLOW_VERIFICATION_SCOPE_METADATA_KEY)
431            .and_then(|value| value.as_str()),
432        Some("local_only")
433    );
434    if local_only {
435        return Ok(local_contract.into_iter().collect());
436    }
437
438    let mut contracts = node
439        .metadata
440        .get(WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY)
441        .cloned()
442        .map(|value| {
443            serde_json::from_value::<Vec<VerificationContract>>(value).map_err(|error| {
444                VmError::Runtime(format!(
445                    "workflow stage {node_id} verification contract metadata parse failed: {error}"
446                ))
447            })
448        })
449        .transpose()?
450        .unwrap_or_default();
451
452    if let Some(local_contract) = local_contract {
453        push_unique_contract(&mut contracts, local_contract);
454    }
455    Ok(contracts)
456}
457
458#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
459#[serde(default)]
460pub struct WorkflowEdge {
461    pub from: String,
462    pub to: String,
463    pub branch: Option<String>,
464    pub label: Option<String>,
465}
466
467#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
468#[serde(default)]
469pub struct WorkflowGraph {
470    #[serde(rename = "_type")]
471    pub type_name: String,
472    pub id: String,
473    pub name: Option<String>,
474    pub version: usize,
475    pub entry: String,
476    pub nodes: BTreeMap<String, WorkflowNode>,
477    pub edges: Vec<WorkflowEdge>,
478    pub capability_policy: CapabilityPolicy,
479    pub approval_policy: super::ToolApprovalPolicy,
480    pub metadata: BTreeMap<String, serde_json::Value>,
481    pub audit_log: Vec<WorkflowAuditEntry>,
482}
483
484#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
485#[serde(default)]
486pub struct WorkflowAuditEntry {
487    pub id: String,
488    pub op: String,
489    pub node_id: Option<String>,
490    pub timestamp: String,
491    pub reason: Option<String>,
492    pub metadata: BTreeMap<String, serde_json::Value>,
493}
494
495#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
496#[serde(default)]
497pub struct WorkflowValidationReport {
498    pub valid: bool,
499    pub errors: Vec<String>,
500    pub warnings: Vec<String>,
501    pub reachable_nodes: Vec<String>,
502}
503
504pub fn parse_workflow_node_value(value: &VmValue, label: &str) -> Result<WorkflowNode, VmError> {
505    let mut node: WorkflowNode = super::parse_json_payload(vm_value_to_json(value), label)?;
506    let dict = value.as_dict();
507    node.raw_tools = dict.and_then(|d| d.get("tools")).cloned();
508    node.raw_auto_compact = dict.and_then(|d| d.get("auto_compact")).cloned();
509    node.raw_model_policy = dict.and_then(|d| d.get("model_policy")).cloned();
510    node.raw_context_assembler = dict.and_then(|d| d.get("context_assembler")).cloned();
511    Ok(node)
512}
513
514pub fn parse_workflow_node_json(
515    json: serde_json::Value,
516    label: &str,
517) -> Result<WorkflowNode, VmError> {
518    super::parse_json_payload(json, label)
519}
520
521pub fn parse_workflow_edge_json(
522    json: serde_json::Value,
523    label: &str,
524) -> Result<WorkflowEdge, VmError> {
525    super::parse_json_payload(json, label)
526}
527
528pub fn normalize_workflow_value(value: &VmValue) -> Result<WorkflowGraph, VmError> {
529    let mut graph: WorkflowGraph = super::parse_json_value(value)?;
530    let as_dict = value.as_dict().cloned().unwrap_or_default();
531
532    if graph.nodes.is_empty() {
533        for key in ["act", "verify", "repair"] {
534            if let Some(node_value) = as_dict.get(key) {
535                let mut node = parse_workflow_node_value(node_value, "orchestration")?;
536                let raw_node = node_value.as_dict().cloned().unwrap_or_default();
537                node.id = Some(key.to_string());
538                if node.kind.is_empty() {
539                    node.kind = if key == "verify" {
540                        "verify".to_string()
541                    } else {
542                        "stage".to_string()
543                    };
544                }
545                if node.model_policy.provider.is_none() {
546                    node.model_policy.provider = as_dict
547                        .get("provider")
548                        .map(|value| value.display())
549                        .filter(|value| !value.is_empty());
550                }
551                if node.model_policy.model.is_none() {
552                    node.model_policy.model = as_dict
553                        .get("model")
554                        .map(|value| value.display())
555                        .filter(|value| !value.is_empty());
556                }
557                if node.model_policy.model_tier.is_none() {
558                    node.model_policy.model_tier = as_dict
559                        .get("model_tier")
560                        .or_else(|| as_dict.get("tier"))
561                        .map(|value| value.display())
562                        .filter(|value| !value.is_empty());
563                }
564                if node.model_policy.temperature.is_none() {
565                    node.model_policy.temperature = as_dict.get("temperature").and_then(|value| {
566                        if let VmValue::Float(number) = value {
567                            Some(*number)
568                        } else {
569                            value.as_int().map(|number| number as f64)
570                        }
571                    });
572                }
573                if node.model_policy.max_tokens.is_none() {
574                    node.model_policy.max_tokens =
575                        as_dict.get("max_tokens").and_then(|value| value.as_int());
576                }
577                if node.mode.is_none() {
578                    node.mode = as_dict
579                        .get("mode")
580                        .map(|value| value.display())
581                        .filter(|value| !value.is_empty());
582                }
583                if node.done_sentinel.is_none() {
584                    node.done_sentinel = as_dict
585                        .get("done_sentinel")
586                        .map(|value| value.display())
587                        .filter(|value| !value.is_empty());
588                }
589                if key == "verify"
590                    && node.verify.is_none()
591                    && (raw_node.contains_key("assert_text")
592                        || raw_node.contains_key("command")
593                        || raw_node.contains_key("expect_status")
594                        || raw_node.contains_key("expect_text"))
595                {
596                    node.verify = Some(serde_json::json!({
597                        "assert_text": raw_node.get("assert_text").map(vm_value_to_json),
598                        "command": raw_node.get("command").map(vm_value_to_json),
599                        "expect_status": raw_node.get("expect_status").map(vm_value_to_json),
600                        "expect_text": raw_node.get("expect_text").map(vm_value_to_json),
601                    }));
602                }
603                graph.nodes.insert(key.to_string(), node);
604            }
605        }
606        if graph.entry.is_empty() && graph.nodes.contains_key("act") {
607            graph.entry = "act".to_string();
608        }
609        if graph.edges.is_empty() && graph.nodes.contains_key("act") {
610            if graph.nodes.contains_key("verify") {
611                graph.edges.push(WorkflowEdge {
612                    from: "act".to_string(),
613                    to: "verify".to_string(),
614                    branch: None,
615                    label: None,
616                });
617            }
618            if graph.nodes.contains_key("repair") {
619                graph.edges.push(WorkflowEdge {
620                    from: "verify".to_string(),
621                    to: "repair".to_string(),
622                    branch: Some("failed".to_string()),
623                    label: None,
624                });
625                graph.edges.push(WorkflowEdge {
626                    from: "repair".to_string(),
627                    to: "verify".to_string(),
628                    branch: Some("retry".to_string()),
629                    label: None,
630                });
631            }
632        }
633    }
634
635    if graph.type_name.is_empty() {
636        graph.type_name = "workflow_graph".to_string();
637    }
638    if graph.id.is_empty() {
639        graph.id = new_id("workflow");
640    }
641    if graph.version == 0 {
642        graph.version = 1;
643    }
644    if graph.entry.is_empty() {
645        graph.entry = graph
646            .nodes
647            .keys()
648            .next()
649            .cloned()
650            .unwrap_or_else(|| "act".to_string());
651    }
652    for (node_id, node) in &mut graph.nodes {
653        if node.raw_tools.is_none() {
654            node.raw_tools = as_dict
655                .get("nodes")
656                .and_then(|nodes| nodes.as_dict())
657                .and_then(|nodes| nodes.get(node_id))
658                .and_then(|node_value| node_value.as_dict())
659                .and_then(|raw_node| raw_node.get("tools"))
660                .cloned();
661        }
662        if node.id.is_none() {
663            node.id = Some(node_id.clone());
664        }
665        if node.kind.is_empty() {
666            node.kind = "stage".to_string();
667        }
668        if node.join_policy.strategy.is_empty() {
669            node.join_policy.strategy = "all".to_string();
670        }
671        if node.reduce_policy.strategy.is_empty() {
672            node.reduce_policy.strategy = "concat".to_string();
673        }
674        if node.output_contract.output_kinds.is_empty() {
675            node.output_contract.output_kinds = vec![match node.kind.as_str() {
676                "verify" => "verification_result".to_string(),
677                "reduce" => node
678                    .reduce_policy
679                    .output_kind
680                    .clone()
681                    .unwrap_or_else(|| "summary".to_string()),
682                "map" => node
683                    .map_policy
684                    .output_kind
685                    .clone()
686                    .unwrap_or_else(|| "artifact".to_string()),
687                "escalation" => "plan".to_string(),
688                _ => "artifact".to_string(),
689            }];
690        }
691        if node.retry_policy.max_attempts == 0 {
692            node.retry_policy.max_attempts = 1;
693        }
694    }
695    Ok(graph)
696}
697
698pub fn validate_workflow(
699    graph: &WorkflowGraph,
700    ceiling: Option<&CapabilityPolicy>,
701) -> WorkflowValidationReport {
702    let mut errors = Vec::new();
703    let mut warnings = Vec::new();
704
705    if !graph.nodes.contains_key(&graph.entry) {
706        errors.push(format!("entry node does not exist: {}", graph.entry));
707    }
708
709    let node_ids: BTreeSet<String> = graph.nodes.keys().cloned().collect();
710    for edge in &graph.edges {
711        if !node_ids.contains(&edge.from) {
712            errors.push(format!("edge.from references unknown node: {}", edge.from));
713        }
714        if !node_ids.contains(&edge.to) {
715            errors.push(format!("edge.to references unknown node: {}", edge.to));
716        }
717    }
718
719    let reachable_nodes = reachable_nodes(graph);
720    for node_id in &node_ids {
721        if !reachable_nodes.contains(node_id) {
722            warnings.push(format!("node is unreachable: {node_id}"));
723        }
724    }
725
726    for (node_id, node) in &graph.nodes {
727        let incoming = graph
728            .edges
729            .iter()
730            .filter(|edge| edge.to == *node_id)
731            .count();
732        let outgoing: Vec<&WorkflowEdge> = graph
733            .edges
734            .iter()
735            .filter(|edge| edge.from == *node_id)
736            .collect();
737        if let Some(min_inputs) = node.input_contract.min_inputs {
738            if let Some(max_inputs) = node.input_contract.max_inputs {
739                if min_inputs > max_inputs {
740                    errors.push(format!(
741                        "node {node_id}: input contract min_inputs exceeds max_inputs"
742                    ));
743                }
744            }
745        }
746        match node.kind.as_str() {
747            "condition" => {
748                let has_true = outgoing
749                    .iter()
750                    .any(|edge| edge.branch.as_deref() == Some("true"));
751                let has_false = outgoing
752                    .iter()
753                    .any(|edge| edge.branch.as_deref() == Some("false"));
754                if !has_true || !has_false {
755                    errors.push(format!(
756                        "node {node_id}: condition nodes require both 'true' and 'false' branch edges"
757                    ));
758                }
759            }
760            "fork" if outgoing.len() < 2 => {
761                errors.push(format!(
762                    "node {node_id}: fork nodes require at least two outgoing edges"
763                ));
764            }
765            "join" if incoming < 2 => {
766                warnings.push(format!(
767                    "node {node_id}: join node has fewer than two incoming edges"
768                ));
769            }
770            "map"
771                if node.map_policy.items.is_empty()
772                    && node.map_policy.item_artifact_kind.is_none()
773                    && node.input_contract.input_kinds.is_empty() =>
774            {
775                errors.push(format!(
776                    "node {node_id}: map nodes require items, item_artifact_kind, or input_contract.input_kinds"
777                ));
778            }
779            "reduce" if node.input_contract.input_kinds.is_empty() => {
780                warnings.push(format!(
781                    "node {node_id}: reduce node has no input_contract.input_kinds; it will consume all available artifacts"
782                ));
783            }
784            _ => {}
785        }
786    }
787
788    if let Some(ceiling) = ceiling {
789        if let Err(error) = ceiling.intersect(&graph.capability_policy) {
790            errors.push(error);
791        }
792        for (node_id, node) in &graph.nodes {
793            if let Err(error) = ceiling.intersect(&node.capability_policy) {
794                errors.push(format!("node {node_id}: {error}"));
795            }
796        }
797    }
798
799    for diagnostic in crate::tool_surface::validate_workflow_graph(graph) {
800        let message = format!("{}: {}", diagnostic.code, diagnostic.message);
801        match diagnostic.severity {
802            crate::tool_surface::ToolSurfaceSeverity::Error => errors.push(message),
803            crate::tool_surface::ToolSurfaceSeverity::Warning => warnings.push(message),
804        }
805    }
806
807    WorkflowValidationReport {
808        valid: errors.is_empty(),
809        errors,
810        warnings,
811        reachable_nodes: reachable_nodes.into_iter().collect(),
812    }
813}
814
815fn reachable_nodes(graph: &WorkflowGraph) -> BTreeSet<String> {
816    let mut seen = BTreeSet::new();
817    let mut stack = vec![graph.entry.clone()];
818    while let Some(node_id) = stack.pop() {
819        if !seen.insert(node_id.clone()) {
820            continue;
821        }
822        for edge in graph.edges.iter().filter(|edge| edge.from == node_id) {
823            stack.push(edge.to.clone());
824        }
825    }
826    seen
827}
828
829/// Pick the session id a stage should run under. Prefers an explicit
830/// `session_id` on the node's `model_policy` dict (so pipelines with
831/// `agent_session_open` / `agent_session_fork` flowing through a graph
832/// line up); falls back to a stable, node-derived id so multi-stage
833/// graphs with no explicit session share a conversation across stages.
834fn resolve_node_session_id(node: &WorkflowNode) -> String {
835    if let Some(explicit) = node
836        .raw_model_policy
837        .as_ref()
838        .and_then(|v| v.as_dict())
839        .and_then(|d| d.get("session_id"))
840        .and_then(|v| match v {
841            VmValue::String(s) if !s.trim().is_empty() => Some(s.to_string()),
842            _ => None,
843        })
844    {
845        return explicit;
846    }
847    if let Some(persisted) = node
848        .metadata
849        .get("worker_session_id")
850        .and_then(|value| value.as_str())
851        .filter(|value| !value.trim().is_empty())
852    {
853        return persisted.to_string();
854    }
855    format!("workflow_stage_{}", uuid::Uuid::now_v7())
856}
857
858fn raw_auto_compact_dict(
859    node: &WorkflowNode,
860) -> Option<&std::collections::BTreeMap<String, VmValue>> {
861    node.raw_auto_compact
862        .as_ref()
863        .and_then(|value| value.as_dict())
864}
865
866fn raw_auto_compact_int(node: &WorkflowNode, key: &str) -> Option<usize> {
867    raw_auto_compact_dict(node)
868        .and_then(|dict| dict.get(key))
869        .and_then(|value| value.as_int())
870        .filter(|value| *value >= 0)
871        .map(|value| value as usize)
872}
873
874fn raw_auto_compact_string(node: &WorkflowNode, key: &str) -> Option<String> {
875    raw_auto_compact_dict(node)
876        .and_then(|dict| dict.get(key))
877        .and_then(|value| match value {
878            VmValue::String(text) if !text.trim().is_empty() => Some(text.to_string()),
879            _ => None,
880        })
881}
882
883fn raw_model_policy_dict(node: &WorkflowNode) -> Option<&BTreeMap<String, VmValue>> {
884    node.raw_model_policy
885        .as_ref()
886        .and_then(|value| value.as_dict())
887}
888
889fn insert_json_vm_option<T: Serialize>(
890    options: &mut BTreeMap<String, VmValue>,
891    key: &str,
892    value: &T,
893) -> Result<(), VmError> {
894    let json = serde_json::to_value(value).map_err(|error| {
895        VmError::Runtime(format!("workflow stage option encode error: {error}"))
896    })?;
897    options.insert(key.to_string(), crate::stdlib::json_to_vm_value(&json));
898    Ok(())
899}
900
901fn merge_raw_model_policy_options(options: &mut BTreeMap<String, VmValue>, node: &WorkflowNode) {
902    if let Some(raw) = raw_model_policy_dict(node) {
903        for (key, value) in raw {
904            if !matches!(value, VmValue::Nil) {
905                options.insert(key.clone(), value.clone());
906            }
907        }
908    }
909}
910
911fn preserve_nested_command_policy(options: &mut BTreeMap<String, VmValue>, node: &WorkflowNode) {
912    if options.contains_key("command_policy") {
913        return;
914    }
915    let Some(command_policy) = raw_model_policy_dict(node)
916        .and_then(|dict| dict.get("policy"))
917        .and_then(|value| value.as_dict())
918        .and_then(|policy| policy.get("command_policy"))
919    else {
920        return;
921    };
922    options.insert("command_policy".to_string(), command_policy.clone());
923}
924
925fn stage_tools_value(node: &WorkflowNode) -> Option<VmValue> {
926    node.raw_tools.clone().or_else(|| {
927        if matches!(node.tools, serde_json::Value::Null) {
928            None
929        } else {
930            Some(crate::stdlib::json_to_vm_value(&node.tools))
931        }
932    })
933}
934
935fn add_stage_tools_option(
936    options: &mut BTreeMap<String, VmValue>,
937    tools_value: &Option<VmValue>,
938    tool_names: &[String],
939) {
940    if !tool_names.is_empty() {
941        if let Some(value) = tools_value.clone() {
942            options.insert("tools".to_string(), value);
943        }
944    }
945}
946
947fn workflow_stage_llm_options(
948    node: &WorkflowNode,
949    stage_session_id: &str,
950    tools_value: &Option<VmValue>,
951    tool_names: &[String],
952    stage_agent_options: &super::WorkflowStageAgentOptions,
953) -> BTreeMap<String, VmValue> {
954    let mut options = stage_agent_options.llm_options_vm_dict();
955    merge_raw_model_policy_options(&mut options, node);
956    options.insert(
957        "session_id".to_string(),
958        VmValue::String(Rc::from(stage_session_id.to_string())),
959    );
960    options.insert(
961        "tool_format".to_string(),
962        VmValue::String(Rc::from(stage_agent_options.tool_format.clone())),
963    );
964    add_stage_tools_option(&mut options, tools_value, tool_names);
965    options
966}
967
968fn add_workflow_agent_compaction_options(
969    options: &mut BTreeMap<String, VmValue>,
970    node: &WorkflowNode,
971) {
972    if !node.auto_compact.enabled {
973        options.insert("auto_compact".to_string(), VmValue::Bool(false));
974        return;
975    }
976    options.insert("auto_compact".to_string(), VmValue::Bool(true));
977    if let Some(value) = node.auto_compact.token_threshold {
978        options.insert("compact_threshold".to_string(), VmValue::Int(value as i64));
979    }
980    if let Some(value) = node.auto_compact.tool_output_max_chars {
981        options.insert(
982            "tool_output_max_chars".to_string(),
983            VmValue::Int(value as i64),
984        );
985    }
986    if let Some(value) = node.auto_compact.hard_limit_tokens {
987        options.insert("hard_limit_tokens".to_string(), VmValue::Int(value as i64));
988    }
989    if let Some(strategy) = node.auto_compact.compact_strategy.as_ref() {
990        options.insert(
991            "compact_strategy".to_string(),
992            VmValue::String(Rc::from(strategy.clone())),
993        );
994    }
995    if let Some(strategy) = node.auto_compact.hard_limit_strategy.as_ref() {
996        options.insert(
997            "hard_limit_strategy".to_string(),
998            VmValue::String(Rc::from(strategy.clone())),
999        );
1000    }
1001    if let Some(value) = raw_auto_compact_int(node, "compact_keep_last")
1002        .or_else(|| raw_auto_compact_int(node, "keep_last"))
1003    {
1004        options.insert("compact_keep_last".to_string(), VmValue::Int(value as i64));
1005    }
1006    if let Some(prompt) = raw_auto_compact_string(node, "summarize_prompt") {
1007        options.insert(
1008            "summarize_prompt".to_string(),
1009            VmValue::String(Rc::from(prompt)),
1010        );
1011    }
1012    if let Some(dict) = raw_auto_compact_dict(node) {
1013        for key in ["compress_callback", "mask_callback"] {
1014            if let Some(callback) = dict.get(key) {
1015                options.insert(key.to_string(), callback.clone());
1016            }
1017        }
1018        if let Some(callback) = dict.get("custom_compactor") {
1019            options.insert("compact_callback".to_string(), callback.clone());
1020        }
1021    }
1022}
1023
1024fn workflow_stage_agent_loop_options(
1025    node: &WorkflowNode,
1026    stage_session_id: &str,
1027    tools_value: &Option<VmValue>,
1028    tool_names: &[String],
1029    stage_agent_options: &super::WorkflowStageAgentOptions,
1030) -> Result<BTreeMap<String, VmValue>, VmError> {
1031    let mut options = stage_agent_options.agent_loop_options_vm_dict();
1032    merge_raw_model_policy_options(&mut options, node);
1033    if let Some(context) = crate::orchestration::current_workflow_skill_context() {
1034        if !options.contains_key("skills") {
1035            if let Some(registry) = context.registry {
1036                options.insert("skills".to_string(), registry);
1037            }
1038        }
1039        if !options.contains_key("skill_match") {
1040            if let Some(match_config) = context.match_config {
1041                options.insert("skill_match".to_string(), match_config);
1042            }
1043        }
1044    }
1045    preserve_nested_command_policy(&mut options, node);
1046    add_workflow_agent_compaction_options(&mut options, node);
1047    add_stage_tools_option(&mut options, tools_value, tool_names);
1048    let tool_policy = tool_capability_policy_from_spec(&node.tools);
1049    let effective_policy = tool_policy
1050        .intersect(&node.capability_policy)
1051        .map_err(VmError::Runtime)?;
1052    insert_json_vm_option(&mut options, "policy", &effective_policy)?;
1053    insert_json_vm_option(&mut options, "approval_policy", &node.approval_policy)?;
1054    options.insert(
1055        "session_id".to_string(),
1056        VmValue::String(Rc::from(stage_session_id.to_string())),
1057    );
1058    options.insert(
1059        "tool_format".to_string(),
1060        VmValue::String(Rc::from(stage_agent_options.tool_format.clone())),
1061    );
1062    Ok(options)
1063}
1064
1065#[derive(Clone, Debug)]
1066pub struct PreparedWorkflowStageNode {
1067    pub prompt: String,
1068    pub system: Option<String>,
1069    pub run_agent_loop: bool,
1070    pub llm_options: BTreeMap<String, VmValue>,
1071    pub agent_loop_options: BTreeMap<String, VmValue>,
1072    pub result: Option<serde_json::Value>,
1073    pub selected: Vec<ArtifactRecord>,
1074    pub rendered_context: String,
1075    pub rendered_verification: String,
1076    pub verification_contracts: Vec<VerificationContract>,
1077    pub tool_format: String,
1078    pub stage_session_id: String,
1079}
1080
1081pub async fn prepare_stage_node(
1082    node_id: &str,
1083    node: &WorkflowNode,
1084    task: &str,
1085    artifacts: &[ArtifactRecord],
1086) -> Result<PreparedWorkflowStageNode, VmError> {
1087    let selected_stage = super::select_workflow_stage_artifacts(
1088        artifacts,
1089        &node.context_policy,
1090        &node.input_contract,
1091    )
1092    .await?;
1093    let selected = selected_stage.artifacts;
1094    let context_policy = selected_stage.context_policy;
1095    let rendered_context_override = if let Some(assembler) = node.raw_context_assembler.as_ref() {
1096        let assembled =
1097            crate::stdlib::assemble::assemble_from_options(&selected, assembler).await?;
1098        Some(super::render_assembled_chunks(&assembled))
1099    } else {
1100        None
1101    };
1102    let verification_contracts = super::stage_verification_contracts(node_id, node)?;
1103    let stage_session_id = resolve_node_session_id(node);
1104    if node.input_contract.require_transcript && !crate::agent_sessions::exists(&stage_session_id) {
1105        return Err(VmError::Runtime(format!(
1106            "workflow stage {node_id} requires an existing session \
1107             (call agent_session_open and feed session_id through model_policy \
1108             before entering this stage)"
1109        )));
1110    }
1111    if let Some(min_inputs) = node.input_contract.min_inputs {
1112        if selected.len() < min_inputs {
1113            return Err(VmError::Runtime(format!(
1114                "workflow stage {node_id} requires at least {min_inputs} input artifacts"
1115            )));
1116        }
1117    }
1118    if let Some(max_inputs) = node.input_contract.max_inputs {
1119        if selected.len() > max_inputs {
1120            return Err(VmError::Runtime(format!(
1121                "workflow stage {node_id} accepts at most {max_inputs} input artifacts"
1122            )));
1123        }
1124    }
1125    let prepared_prompt = super::prepare_workflow_stage_prompt(
1126        task,
1127        node.task_label.as_deref(),
1128        &selected,
1129        &context_policy,
1130        rendered_context_override.as_deref(),
1131        &verification_contracts,
1132    )
1133    .await?;
1134    let prompt = prepared_prompt.prompt;
1135    let rendered_context = prepared_prompt.rendered_context;
1136    let rendered_verification = prepared_prompt.rendered_verification;
1137
1138    let tool_names = tool_names_from_spec(&node.tools);
1139    let stage_agent_options = super::prepare_workflow_stage_agent_options(
1140        node,
1141        &stage_session_id,
1142        !tool_names.is_empty(),
1143    )
1144    .await?;
1145    let tool_format = stage_agent_options.tool_format.clone();
1146    let result = if node.kind == "verify" {
1147        if let Some(command) = node
1148            .verify
1149            .as_ref()
1150            .and_then(|verify| verify.as_object())
1151            .and_then(|verify| verify.get("command"))
1152            .and_then(|value| value.as_str())
1153            .map(str::trim)
1154            .filter(|value| !value.is_empty())
1155        {
1156            let (program, args) = if cfg!(target_os = "windows") {
1157                ("cmd", vec!["/C".to_string(), command.to_string()])
1158            } else {
1159                ("/bin/sh", vec!["-lc".to_string(), command.to_string()])
1160            };
1161            let mut process_config = crate::stdlib::sandbox::ProcessCommandConfig {
1162                stdin_null: true,
1163                ..Default::default()
1164            };
1165            if let Some(context) = crate::stdlib::process::current_execution_context() {
1166                if let Some(cwd) = context.cwd.filter(|cwd| !cwd.is_empty()) {
1167                    crate::stdlib::sandbox::enforce_process_cwd(std::path::Path::new(&cwd))?;
1168                    process_config.cwd = Some(std::path::PathBuf::from(cwd));
1169                }
1170                if !context.env.is_empty() {
1171                    process_config.env.extend(context.env);
1172                }
1173            }
1174            let output = crate::stdlib::sandbox::command_output(program, &args, &process_config)?;
1175            let stdout = String::from_utf8_lossy(&output.stdout).to_string();
1176            let stderr = String::from_utf8_lossy(&output.stderr).to_string();
1177            let combined = if stderr.is_empty() {
1178                stdout.clone()
1179            } else if stdout.is_empty() {
1180                stderr.clone()
1181            } else {
1182                format!("{stdout}\n{stderr}")
1183            };
1184            serde_json::json!({
1185                "status": "completed",
1186                "text": combined,
1187                "visible_text": combined,
1188                "command": command,
1189                "stdout": stdout,
1190                "stderr": stderr,
1191                "exit_status": output.status.code().unwrap_or(-1),
1192                "success": output.status.success(),
1193            })
1194        } else {
1195            serde_json::json!({
1196                "status": "completed",
1197                "text": "",
1198                "visible_text": "",
1199            })
1200        }
1201    } else {
1202        let tools_value = stage_tools_value(node);
1203        let llm_options = workflow_stage_llm_options(
1204            node,
1205            &stage_session_id,
1206            &tools_value,
1207            &tool_names,
1208            &stage_agent_options,
1209        );
1210        let agent_loop_options = if stage_agent_options.run_agent_loop {
1211            workflow_stage_agent_loop_options(
1212                node,
1213                &stage_session_id,
1214                &tools_value,
1215                &tool_names,
1216                &stage_agent_options,
1217            )?
1218        } else {
1219            BTreeMap::new()
1220        };
1221        return Ok(PreparedWorkflowStageNode {
1222            prompt,
1223            system: node.system.clone(),
1224            run_agent_loop: stage_agent_options.run_agent_loop,
1225            llm_options,
1226            agent_loop_options,
1227            result: None,
1228            selected,
1229            rendered_context,
1230            rendered_verification,
1231            verification_contracts,
1232            tool_format,
1233            stage_session_id,
1234        });
1235    };
1236
1237    Ok(PreparedWorkflowStageNode {
1238        prompt,
1239        system: node.system.clone(),
1240        run_agent_loop: false,
1241        llm_options: BTreeMap::new(),
1242        agent_loop_options: BTreeMap::new(),
1243        result: Some(result),
1244        selected,
1245        rendered_context,
1246        rendered_verification,
1247        verification_contracts,
1248        tool_format,
1249        stage_session_id,
1250    })
1251}
1252
1253pub fn complete_prepared_stage_node(
1254    node_id: &str,
1255    node: &WorkflowNode,
1256    prepared: &PreparedWorkflowStageNode,
1257    mut llm_result: serde_json::Value,
1258) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
1259    if let Some(payload) = llm_result.as_object_mut() {
1260        payload.insert(
1261            "prompt".to_string(),
1262            serde_json::json!(prepared.prompt.clone()),
1263        );
1264        payload.insert(
1265            "system_prompt".to_string(),
1266            serde_json::json!(node.system.clone().unwrap_or_default()),
1267        );
1268        payload.insert(
1269            "rendered_context".to_string(),
1270            serde_json::json!(prepared.rendered_context.clone()),
1271        );
1272        if !prepared.verification_contracts.is_empty() {
1273            payload.insert(
1274                "verification_contracts".to_string(),
1275                serde_json::to_value(&prepared.verification_contracts).unwrap_or_default(),
1276            );
1277            payload.insert(
1278                "rendered_verification_context".to_string(),
1279                serde_json::json!(prepared.rendered_verification.clone()),
1280            );
1281        }
1282        payload.insert(
1283            "selected_artifact_ids".to_string(),
1284            serde_json::json!(prepared
1285                .selected
1286                .iter()
1287                .map(|artifact| artifact.id.clone())
1288                .collect::<Vec<_>>()),
1289        );
1290        payload.insert(
1291            "selected_artifact_titles".to_string(),
1292            serde_json::json!(prepared
1293                .selected
1294                .iter()
1295                .map(|artifact| artifact.title.clone())
1296                .collect::<Vec<_>>()),
1297        );
1298        match payload
1299            .entry("tools".to_string())
1300            .or_insert_with(|| serde_json::json!({}))
1301        {
1302            serde_json::Value::Object(tools) => {
1303                tools.insert(
1304                    "mode".to_string(),
1305                    serde_json::json!(prepared.tool_format.clone()),
1306                );
1307            }
1308            slot => {
1309                *slot = serde_json::json!({ "mode": prepared.tool_format.clone() });
1310            }
1311        }
1312    }
1313
1314    let visible_text = llm_result["text"].as_str().unwrap_or_default().to_string();
1315    // Non-LLM stages (verify command, condition, fork, join, ...) don't produce
1316    // a "transcript" field; fall back to the input so cross-stage conversation
1317    // state survives transitions.
1318    let result_transcript = llm_result
1319        .get("transcript")
1320        .cloned()
1321        .map(|value| crate::stdlib::json_to_vm_value(&value));
1322    let session_transcript = crate::agent_sessions::snapshot(&prepared.stage_session_id);
1323    let transcript = result_transcript
1324        .or(session_transcript)
1325        .and_then(|value| redact_transcript_visibility(&value, node.output_visibility.as_deref()));
1326    let output_kind = node
1327        .output_contract
1328        .output_kinds
1329        .first()
1330        .cloned()
1331        .unwrap_or_else(|| {
1332            if node.kind == "verify" {
1333                "verification_result".to_string()
1334            } else {
1335                "artifact".to_string()
1336            }
1337        });
1338    let mut metadata = BTreeMap::new();
1339    metadata.insert(
1340        "input_artifact_ids".to_string(),
1341        serde_json::json!(prepared
1342            .selected
1343            .iter()
1344            .map(|artifact| artifact.id.clone())
1345            .collect::<Vec<_>>()),
1346    );
1347    metadata.insert("node_kind".to_string(), serde_json::json!(node.kind));
1348    if !node.approval_policy.write_path_allowlist.is_empty() {
1349        metadata.insert(
1350            "changed_paths".to_string(),
1351            serde_json::json!(node.approval_policy.write_path_allowlist),
1352        );
1353    }
1354    let artifact = ArtifactRecord {
1355        type_name: "artifact".to_string(),
1356        id: new_id("artifact"),
1357        kind: output_kind,
1358        title: Some(format!("stage {node_id} output")),
1359        text: Some(visible_text),
1360        data: Some(llm_result.clone()),
1361        source: Some(node_id.to_string()),
1362        created_at: now_rfc3339(),
1363        freshness: Some("fresh".to_string()),
1364        priority: None,
1365        lineage: prepared
1366            .selected
1367            .iter()
1368            .map(|artifact| artifact.id.clone())
1369            .collect(),
1370        relevance: Some(1.0),
1371        estimated_tokens: None,
1372        stage: Some(node_id.to_string()),
1373        metadata,
1374    }
1375    .normalize();
1376
1377    Ok((llm_result, vec![artifact], transcript))
1378}
1379
1380pub async fn execute_stage_node(
1381    node_id: &str,
1382    node: &WorkflowNode,
1383    task: &str,
1384    artifacts: &[ArtifactRecord],
1385) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
1386    let prepared = prepare_stage_node(node_id, node, task, artifacts).await?;
1387    let llm_result = if let Some(result) = prepared.result.clone() {
1388        result
1389    } else if prepared.run_agent_loop {
1390        let result = crate::stdlib::harn_entry::call_agent_loop(
1391            prepared.prompt.clone(),
1392            prepared.system.clone(),
1393            prepared.agent_loop_options.clone(),
1394        )
1395        .await?;
1396        crate::llm::vm_value_to_json(&result)
1397    } else {
1398        let args = vec![
1399            VmValue::String(Rc::from(prepared.prompt.clone())),
1400            prepared
1401                .system
1402                .clone()
1403                .map(|s| VmValue::String(Rc::from(s)))
1404                .unwrap_or(VmValue::Nil),
1405            VmValue::Dict(Rc::new(prepared.llm_options.clone())),
1406        ];
1407        let opts = extract_llm_options(&args)?;
1408        let result = vm_call_llm_full(&opts).await?;
1409        crate::llm::agent_loop_result_from_llm(&result, opts)
1410    };
1411    complete_prepared_stage_node(node_id, node, &prepared, llm_result)
1412}
1413
1414pub fn append_audit_entry(
1415    graph: &mut WorkflowGraph,
1416    op: &str,
1417    node_id: Option<String>,
1418    reason: Option<String>,
1419    metadata: BTreeMap<String, serde_json::Value>,
1420) {
1421    graph.audit_log.push(WorkflowAuditEntry {
1422        id: new_id("audit"),
1423        op: op.to_string(),
1424        node_id,
1425        timestamp: now_rfc3339(),
1426        reason,
1427        metadata,
1428    });
1429}