Skip to main content

harn_vm/orchestration/
workflow.rs

1//! Workflow graph types, normalization, validation, and execution.
2
3use crate::value::VmDictExt;
4use std::collections::{BTreeMap, BTreeSet};
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9    new_id, now_rfc3339, redact_transcript_visibility, ArtifactRecord, AutoCompactPolicy,
10    BranchSemantics, CapabilityPolicy, ContextPolicy, EqIgnored, EscalationPolicy, JoinPolicy,
11    MapPolicy, ModelPolicy, ReducePolicy, RetryPolicy, StageContract,
12};
13use crate::llm::{extract_llm_options, vm_call_llm_full, vm_value_to_json};
14use crate::tool_surface::{tool_capability_policy_from_spec, tool_names_from_spec};
15use crate::value::{VmError, VmValue};
16
17pub const WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY: &str = "workflow_verification_contracts";
18pub const WORKFLOW_VERIFICATION_SCOPE_METADATA_KEY: &str = "workflow_verification_scope";
19
20#[derive(Clone, Debug, Default, Serialize, Deserialize)]
21#[serde(default)]
22pub struct WorkflowNode {
23    pub id: Option<String>,
24    pub kind: String,
25    pub mode: Option<String>,
26    pub prompt: Option<String>,
27    pub system: Option<String>,
28    pub task_label: Option<String>,
29    pub done_sentinel: Option<String>,
30    pub tools: serde_json::Value,
31    pub model_policy: ModelPolicy,
32    /// Per-stage auto-compaction settings for the agent loop's context
33    /// window. Lifecycle operations (reset, fork, trim, compact) are NOT
34    /// expressible here — call the `agent_session_*` builtins before the
35    /// stage or in a prior stage.
36    pub auto_compact: AutoCompactPolicy,
37    /// Output visibility filter applied to the transcript after the
38    /// stage's agent loop exits. `"public"` / `"public_only"` drops
39    /// `tool_result` messages and non-public events. `None` or any
40    /// unknown string is a no-op.
41    #[serde(default)]
42    pub output_visibility: Option<String>,
43    pub context_policy: ContextPolicy,
44    pub retry_policy: RetryPolicy,
45    pub capability_policy: CapabilityPolicy,
46    pub approval_policy: super::ToolApprovalPolicy,
47    pub input_contract: StageContract,
48    pub output_contract: StageContract,
49    pub branch_semantics: BranchSemantics,
50    pub map_policy: MapPolicy,
51    pub join_policy: JoinPolicy,
52    pub reduce_policy: ReducePolicy,
53    pub escalation_policy: EscalationPolicy,
54    pub verify: Option<serde_json::Value>,
55    /// When true, the stage's agent loop gates the done sentinel on the most
56    /// recent `run()` tool call exiting cleanly (`exit_code == 0`). Use for
57    /// persistent execute stages that fold verification into the loop via a
58    /// shell-exec tool the model invokes explicitly.
59    #[serde(default)]
60    pub exit_when_verified: bool,
61    pub metadata: BTreeMap<String, serde_json::Value>,
62    #[serde(skip)]
63    pub raw_tools: Option<VmValue>,
64    /// Raw auto_compact VmValue dict — preserved for extracting closure
65    /// fields (compress_callback, mask_callback, custom_compactor) that
66    /// can't go through serde.
67    #[serde(skip)]
68    pub raw_auto_compact: Option<VmValue>,
69    /// Raw model_policy VmValue dict — preserved for extracting closure
70    /// fields (post_turn_callback) that can't go through serde.
71    #[serde(skip)]
72    pub raw_model_policy: Option<VmValue>,
73    /// Raw context_assembler VmValue dict — when set, the stage's
74    /// artifact context is packed through `assemble_context` before
75    /// rendering the system prompt. Closure fields (`ranker_callback`)
76    /// are preserved here because they can't round-trip through serde.
77    #[serde(skip)]
78    pub raw_context_assembler: Option<VmValue>,
79    /// Raw `verify` VmValue — preserved so a *callable* verifier (fn-verify
80    /// mode) survives the builtin seam. The typed `verify` field above is a
81    /// `serde_json::Value`, which drops closures; when `verify` is a Harn
82    /// function the live closure is lifted here and re-attached by
83    /// `node_to_vm_with_raw` (stage.rs) so the embedded stage loop can invoke
84    /// it against each attempt's result (`workflow_evaluate_verification`).
85    #[serde(skip)]
86    pub raw_verify: Option<VmValue>,
87}
88
89impl PartialEq for WorkflowNode {
90    fn eq(&self, other: &Self) -> bool {
91        serde_json::to_value(self).ok() == serde_json::to_value(other).ok()
92    }
93}
94
95#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
96#[serde(default)]
97pub struct VerificationRequirement {
98    pub kind: String,
99    pub value: String,
100    pub note: Option<String>,
101}
102
103#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
104#[serde(default)]
105pub struct VerificationContract {
106    pub source_node: Option<String>,
107    pub summary: Option<String>,
108    pub command: Option<String>,
109    pub expect_status: Option<i64>,
110    pub assert_text: Option<String>,
111    pub expect_text: Option<String>,
112    pub required_identifiers: Vec<String>,
113    pub required_paths: Vec<String>,
114    pub required_text: Vec<String>,
115    pub notes: Vec<String>,
116    pub checks: Vec<VerificationRequirement>,
117}
118
119impl VerificationContract {
120    fn is_empty(&self) -> bool {
121        self.summary.is_none()
122            && self.command.is_none()
123            && self.expect_status.is_none()
124            && self.assert_text.is_none()
125            && self.expect_text.is_none()
126            && self.required_identifiers.is_empty()
127            && self.required_paths.is_empty()
128            && self.required_text.is_empty()
129            && self.notes.is_empty()
130            && self.checks.is_empty()
131    }
132}
133
134fn push_unique_string(values: &mut Vec<String>, value: &str) {
135    let trimmed = value.trim();
136    if trimmed.is_empty() {
137        return;
138    }
139    if !values.iter().any(|existing| existing == trimmed) {
140        values.push(trimmed.to_string());
141    }
142}
143
144fn push_unique_requirement(
145    values: &mut Vec<VerificationRequirement>,
146    kind: &str,
147    value: &str,
148    note: Option<&str>,
149) {
150    let trimmed_kind = kind.trim();
151    let trimmed_value = value.trim();
152    let trimmed_note = note
153        .map(str::trim)
154        .filter(|candidate| !candidate.is_empty())
155        .map(|candidate| candidate.to_string());
156    if trimmed_kind.is_empty() || trimmed_value.is_empty() {
157        return;
158    }
159    let candidate = VerificationRequirement {
160        kind: trimmed_kind.to_string(),
161        value: trimmed_value.to_string(),
162        note: trimmed_note,
163    };
164    if !values.iter().any(|existing| existing == &candidate) {
165        values.push(candidate);
166    }
167}
168
169fn json_string_list(value: Option<&serde_json::Value>) -> Vec<String> {
170    match value {
171        Some(serde_json::Value::String(text)) => {
172            let mut values = Vec::new();
173            push_unique_string(&mut values, text);
174            values
175        }
176        Some(serde_json::Value::Array(items)) => {
177            let mut values = Vec::new();
178            for item in items {
179                if let Some(text) = item.as_str() {
180                    push_unique_string(&mut values, text);
181                }
182            }
183            values
184        }
185        _ => Vec::new(),
186    }
187}
188
189fn merge_verification_requirement_list(
190    target: &mut Vec<VerificationRequirement>,
191    value: Option<&serde_json::Value>,
192) {
193    let Some(items) = value.and_then(|raw| raw.as_array()) else {
194        return;
195    };
196    for item in items {
197        let Some(object) = item.as_object() else {
198            continue;
199        };
200        let kind = object
201            .get("kind")
202            .and_then(|value| value.as_str())
203            .unwrap_or_default();
204        let value = object
205            .get("value")
206            .and_then(|value| value.as_str())
207            .unwrap_or_default();
208        let note = object
209            .get("note")
210            .or_else(|| object.get("description"))
211            .or_else(|| object.get("reason"))
212            .and_then(|value| value.as_str());
213        push_unique_requirement(target, kind, value, note);
214    }
215}
216
217fn merge_verification_contract_fields(
218    target: &mut VerificationContract,
219    object: &serde_json::Map<String, serde_json::Value>,
220) {
221    if target.summary.is_none() {
222        target.summary = object
223            .get("summary")
224            .and_then(|value| value.as_str())
225            .map(str::trim)
226            .filter(|value| !value.is_empty())
227            .map(|value| value.to_string());
228    }
229    if target.command.is_none() {
230        target.command = object
231            .get("command")
232            .and_then(|value| value.as_str())
233            .map(str::trim)
234            .filter(|value| !value.is_empty())
235            .map(|value| value.to_string());
236    }
237    if target.expect_status.is_none() {
238        target.expect_status = object.get("expect_status").and_then(|value| value.as_i64());
239    }
240    if target.assert_text.is_none() {
241        target.assert_text = object
242            .get("assert_text")
243            .and_then(|value| value.as_str())
244            .map(str::trim)
245            .filter(|value| !value.is_empty())
246            .map(|value| value.to_string());
247    }
248    if target.expect_text.is_none() {
249        target.expect_text = object
250            .get("expect_text")
251            .and_then(|value| value.as_str())
252            .map(str::trim)
253            .filter(|value| !value.is_empty())
254            .map(|value| value.to_string());
255    }
256
257    for value in json_string_list(
258        object
259            .get("required_identifiers")
260            .or_else(|| object.get("identifiers")),
261    ) {
262        push_unique_string(&mut target.required_identifiers, &value);
263    }
264    for value in json_string_list(object.get("required_paths").or_else(|| object.get("paths"))) {
265        push_unique_string(&mut target.required_paths, &value);
266    }
267    for value in json_string_list(
268        object
269            .get("required_text")
270            .or_else(|| object.get("exact_text"))
271            .or_else(|| object.get("required_strings")),
272    ) {
273        push_unique_string(&mut target.required_text, &value);
274    }
275    for value in json_string_list(object.get("notes")) {
276        push_unique_string(&mut target.notes, &value);
277    }
278    merge_verification_requirement_list(&mut target.checks, object.get("checks"));
279}
280
281fn load_verification_contract_file(path: &str) -> Result<serde_json::Value, VmError> {
282    let resolved = crate::stdlib::process::resolve_source_asset_path(path);
283    let contents = std::fs::read_to_string(&resolved).map_err(|error| {
284        VmError::Runtime(format!(
285            "workflow verification contract read failed for {}: {error}",
286            resolved.display()
287        ))
288    })?;
289    serde_json::from_str(&contents).map_err(|error| {
290        VmError::Runtime(format!(
291            "workflow verification contract parse failed for {}: {error}",
292            resolved.display()
293        ))
294    })
295}
296
297fn resolve_verification_contract_path(
298    verify: &serde_json::Map<String, serde_json::Value>,
299) -> Result<Option<serde_json::Value>, VmError> {
300    let Some(path) = verify
301        .get("contract_path")
302        .or_else(|| verify.get("verification_contract_path"))
303        .and_then(|value| value.as_str())
304        .map(str::trim)
305        .filter(|value| !value.is_empty())
306    else {
307        return Ok(None);
308    };
309    Ok(Some(load_verification_contract_file(path)?))
310}
311
312pub fn verification_contract_from_verify(
313    node_id: &str,
314    verify: Option<&serde_json::Value>,
315) -> Result<Option<VerificationContract>, VmError> {
316    let Some(verify_object) = verify.and_then(|value| value.as_object()) else {
317        return Ok(None);
318    };
319
320    let mut contract = VerificationContract {
321        source_node: Some(node_id.to_string()),
322        ..Default::default()
323    };
324
325    if let Some(file_contract) = resolve_verification_contract_path(verify_object)? {
326        let Some(object) = file_contract.as_object() else {
327            return Err(VmError::Runtime(
328                "workflow verification contract file must parse to a JSON object".to_string(),
329            ));
330        };
331        merge_verification_contract_fields(&mut contract, object);
332    }
333
334    if let Some(inline_contract) = verify_object.get("contract") {
335        let Some(object) = inline_contract.as_object() else {
336            return Err(VmError::Runtime(
337                "workflow verify.contract must be an object".to_string(),
338            ));
339        };
340        merge_verification_contract_fields(&mut contract, object);
341    }
342
343    merge_verification_contract_fields(&mut contract, verify_object);
344
345    if let Some(assert_text) = contract.assert_text.clone() {
346        push_unique_requirement(
347            &mut contract.checks,
348            "visible_text_contains",
349            &assert_text,
350            Some("verify stage requires visible output to contain this text"),
351        );
352    }
353    if let Some(expect_text) = contract.expect_text.clone() {
354        push_unique_requirement(
355            &mut contract.checks,
356            "combined_output_contains",
357            &expect_text,
358            Some("verify command requires combined stdout/stderr to contain this text"),
359        );
360    }
361    if let Some(expect_status) = contract.expect_status {
362        push_unique_requirement(
363            &mut contract.checks,
364            "expect_status",
365            &expect_status.to_string(),
366            Some("verify command exit status must match exactly"),
367        );
368    }
369    for identifier in contract.required_identifiers.clone() {
370        push_unique_requirement(
371            &mut contract.checks,
372            "identifier",
373            &identifier,
374            Some("use this exact identifier spelling"),
375        );
376    }
377    for path in contract.required_paths.clone() {
378        push_unique_requirement(
379            &mut contract.checks,
380            "path",
381            &path,
382            Some("preserve this exact path"),
383        );
384    }
385    for text in contract.required_text.clone() {
386        push_unique_requirement(
387            &mut contract.checks,
388            "text",
389            &text,
390            Some("required exact text or wiring snippet"),
391        );
392    }
393
394    if contract.is_empty() {
395        return Ok(None);
396    }
397    Ok(Some(contract))
398}
399
400fn push_unique_contract(values: &mut Vec<VerificationContract>, candidate: VerificationContract) {
401    if !values.iter().any(|existing| existing == &candidate) {
402        values.push(candidate);
403    }
404}
405
406pub fn workflow_verification_contracts(
407    graph: &WorkflowGraph,
408) -> Result<Vec<VerificationContract>, VmError> {
409    let mut contracts = Vec::new();
410    for (node_id, node) in &graph.nodes {
411        if let Some(contract) = verification_contract_from_verify(node_id, node.verify.as_ref())? {
412            push_unique_contract(&mut contracts, contract);
413        }
414    }
415    Ok(contracts)
416}
417
418pub fn inject_workflow_verification_contracts(
419    node: &mut WorkflowNode,
420    contracts: &[VerificationContract],
421) {
422    if contracts.is_empty() {
423        return;
424    }
425    node.metadata.insert(
426        WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY.to_string(),
427        serde_json::to_value(contracts).unwrap_or_default(),
428    );
429}
430
431pub fn stage_verification_contracts(
432    node_id: &str,
433    node: &WorkflowNode,
434) -> Result<Vec<VerificationContract>, VmError> {
435    let local_contract = verification_contract_from_verify(node_id, node.verify.as_ref())?;
436    let local_only = matches!(
437        node.metadata
438            .get(WORKFLOW_VERIFICATION_SCOPE_METADATA_KEY)
439            .and_then(|value| value.as_str()),
440        Some("local_only")
441    );
442    if local_only {
443        return Ok(local_contract.into_iter().collect());
444    }
445
446    let mut contracts = node
447        .metadata
448        .get(WORKFLOW_VERIFICATION_CONTRACTS_METADATA_KEY)
449        .cloned()
450        .map(|value| {
451            serde_json::from_value::<Vec<VerificationContract>>(value).map_err(|error| {
452                VmError::Runtime(format!(
453                    "workflow stage {node_id} verification contract metadata parse failed: {error}"
454                ))
455            })
456        })
457        .transpose()?
458        .unwrap_or_default();
459
460    if let Some(local_contract) = local_contract {
461        push_unique_contract(&mut contracts, local_contract);
462    }
463    Ok(contracts)
464}
465
466#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
467#[serde(default)]
468pub struct WorkflowEdge {
469    pub from: String,
470    pub to: String,
471    pub branch: Option<String>,
472    pub label: Option<String>,
473}
474
475#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
476#[serde(default)]
477pub struct WorkflowGraph {
478    #[serde(rename = "_type")]
479    pub type_name: String,
480    pub id: String,
481    pub name: Option<String>,
482    pub version: usize,
483    pub entry: String,
484    pub nodes: BTreeMap<String, WorkflowNode>,
485    pub edges: Vec<WorkflowEdge>,
486    pub capability_policy: CapabilityPolicy,
487    pub approval_policy: super::ToolApprovalPolicy,
488    pub metadata: BTreeMap<String, serde_json::Value>,
489    pub audit_log: Vec<WorkflowAuditEntry>,
490}
491
492#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
493#[serde(default)]
494pub struct WorkflowAuditEntry {
495    pub id: String,
496    pub op: String,
497    pub node_id: Option<String>,
498    pub timestamp: String,
499    pub reason: Option<String>,
500    pub metadata: BTreeMap<String, serde_json::Value>,
501}
502
503#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
504#[serde(default)]
505pub struct WorkflowValidationReport {
506    pub valid: bool,
507    pub errors: Vec<String>,
508    pub warnings: Vec<String>,
509    pub reachable_nodes: Vec<String>,
510}
511
512/// Extract the raw `retry_policy.repair_prompt_builder` closure from a node's
513/// source dict. The builder carries a Harn closure that cannot round-trip
514/// through serde, so — like `raw_model_policy`'s `post_turn_callback` — it is
515/// lifted onto the typed `RetryPolicy` here and travels to the embedded stage
516/// loop (`std/workflow/stage.harn`) as a raw value.
517fn retry_repair_prompt_builder_from_dict(
518    dict: Option<&crate::value::DictMap>,
519) -> Option<EqIgnored<VmValue>> {
520    dict.and_then(|d| d.get("retry_policy"))
521        .and_then(|policy| policy.as_dict())
522        .and_then(|policy| policy.get("repair_prompt_builder"))
523        .filter(|value| !matches!(value, VmValue::Nil))
524        .cloned()
525        .map(EqIgnored)
526}
527
528pub fn parse_workflow_node_value(value: &VmValue, label: &str) -> Result<WorkflowNode, VmError> {
529    let mut node: WorkflowNode = super::parse_json_payload(vm_value_to_json(value), label)?;
530    let dict = value.as_dict();
531    node.raw_tools = dict.and_then(|d| d.get("tools")).cloned();
532    node.raw_auto_compact = dict.and_then(|d| d.get("auto_compact")).cloned();
533    node.raw_model_policy = dict.and_then(|d| d.get("model_policy")).cloned();
534    node.raw_context_assembler = dict.and_then(|d| d.get("context_assembler")).cloned();
535    // Lift a callable verifier (fn-verify mode) so the closure survives; a
536    // plain dict/command verify is left to the typed `verify` field.
537    node.raw_verify = dict
538        .and_then(|d| d.get("verify"))
539        .filter(|value| {
540            matches!(
541                value,
542                VmValue::Closure(_) | VmValue::BuiltinRef(_) | VmValue::BuiltinRefId(_)
543            )
544        })
545        .cloned();
546    node.retry_policy.repair_prompt_builder = retry_repair_prompt_builder_from_dict(dict);
547    Ok(node)
548}
549
550pub fn parse_workflow_node_json(
551    json: serde_json::Value,
552    label: &str,
553) -> Result<WorkflowNode, VmError> {
554    super::parse_json_payload(json, label)
555}
556
557pub fn parse_workflow_edge_json(
558    json: serde_json::Value,
559    label: &str,
560) -> Result<WorkflowEdge, VmError> {
561    super::parse_json_payload(json, label)
562}
563
564pub fn normalize_workflow_value(value: &VmValue) -> Result<WorkflowGraph, VmError> {
565    let mut graph: WorkflowGraph = super::parse_json_value(value)?;
566    let as_dict = value.as_dict().cloned().unwrap_or_default();
567
568    if graph.nodes.is_empty() {
569        for key in ["act", "verify", "repair"] {
570            if let Some(node_value) = as_dict.get(key) {
571                let mut node = parse_workflow_node_value(node_value, "orchestration")?;
572                let raw_node = node_value.as_dict().cloned().unwrap_or_default();
573                node.id = Some(key.to_string());
574                if node.kind.is_empty() {
575                    node.kind = if key == "verify" {
576                        "verify".to_string()
577                    } else {
578                        "stage".to_string()
579                    };
580                }
581                if node.model_policy.provider.is_none() {
582                    node.model_policy.provider = as_dict
583                        .get("provider")
584                        .map(|value| value.display())
585                        .filter(|value| !value.is_empty());
586                }
587                if node.model_policy.model.is_none() {
588                    node.model_policy.model = as_dict
589                        .get("model")
590                        .map(|value| value.display())
591                        .filter(|value| !value.is_empty());
592                }
593                if node.model_policy.model_tier.is_none() {
594                    node.model_policy.model_tier = as_dict
595                        .get("model_tier")
596                        .or_else(|| as_dict.get("tier"))
597                        .map(|value| value.display())
598                        .filter(|value| !value.is_empty());
599                }
600                if node.model_policy.temperature.is_none() {
601                    node.model_policy.temperature = as_dict.get("temperature").and_then(|value| {
602                        if let VmValue::Float(number) = value {
603                            Some(*number)
604                        } else {
605                            value.as_int().map(|number| number as f64)
606                        }
607                    });
608                }
609                if node.model_policy.max_tokens.is_none() {
610                    node.model_policy.max_tokens =
611                        as_dict.get("max_tokens").and_then(|value| value.as_int());
612                }
613                if node.mode.is_none() {
614                    node.mode = as_dict
615                        .get("mode")
616                        .map(|value| value.display())
617                        .filter(|value| !value.is_empty());
618                }
619                if node.done_sentinel.is_none() {
620                    node.done_sentinel = as_dict
621                        .get("done_sentinel")
622                        .map(|value| value.display())
623                        .filter(|value| !value.is_empty());
624                }
625                if key == "verify"
626                    && node.verify.is_none()
627                    && (raw_node.contains_key("assert_text")
628                        || raw_node.contains_key("command")
629                        || raw_node.contains_key("expect_status")
630                        || raw_node.contains_key("expect_text"))
631                {
632                    node.verify = Some(serde_json::json!({
633                        "assert_text": raw_node.get("assert_text").map(vm_value_to_json),
634                        "command": raw_node.get("command").map(vm_value_to_json),
635                        "expect_status": raw_node.get("expect_status").map(vm_value_to_json),
636                        "expect_text": raw_node.get("expect_text").map(vm_value_to_json),
637                    }));
638                }
639                graph.nodes.insert(key.to_string(), node);
640            }
641        }
642        if graph.entry.is_empty() && graph.nodes.contains_key("act") {
643            graph.entry = "act".to_string();
644        }
645        if graph.edges.is_empty() && graph.nodes.contains_key("act") {
646            if graph.nodes.contains_key("verify") {
647                graph.edges.push(WorkflowEdge {
648                    from: "act".to_string(),
649                    to: "verify".to_string(),
650                    branch: None,
651                    label: None,
652                });
653            }
654            if graph.nodes.contains_key("repair") {
655                graph.edges.push(WorkflowEdge {
656                    from: "verify".to_string(),
657                    to: "repair".to_string(),
658                    branch: Some("failed".to_string()),
659                    label: None,
660                });
661                graph.edges.push(WorkflowEdge {
662                    from: "repair".to_string(),
663                    to: "verify".to_string(),
664                    branch: Some("retry".to_string()),
665                    label: None,
666                });
667            }
668        }
669    }
670
671    if graph.type_name.is_empty() {
672        graph.type_name = "workflow_graph".to_string();
673    }
674    if graph.id.is_empty() {
675        graph.id = new_id("workflow");
676    }
677    if graph.version == 0 {
678        graph.version = 1;
679    }
680    if graph.entry.is_empty() {
681        graph.entry = graph
682            .nodes
683            .keys()
684            .next()
685            .cloned()
686            .unwrap_or_else(|| "act".to_string());
687    }
688    for (node_id, node) in &mut graph.nodes {
689        let raw_node = as_dict
690            .get("nodes")
691            .and_then(|nodes| nodes.as_dict())
692            .and_then(|nodes| nodes.get(node_id.as_str()))
693            .and_then(|node_value| node_value.as_dict());
694        if node.raw_tools.is_none() {
695            node.raw_tools = raw_node.and_then(|raw_node| raw_node.get("tools")).cloned();
696        }
697        if node.raw_verify.is_none() {
698            // fn-verify: lift a callable verifier off the source dict so the
699            // closure survives the graph round-trip (serde drops it).
700            node.raw_verify = raw_node
701                .and_then(|raw_node| raw_node.get("verify"))
702                .filter(|value| {
703                    matches!(
704                        value,
705                        VmValue::Closure(_) | VmValue::BuiltinRef(_) | VmValue::BuiltinRefId(_)
706                    )
707                })
708                .cloned();
709        }
710        if node.retry_policy.repair_prompt_builder.is_none() {
711            node.retry_policy.repair_prompt_builder =
712                retry_repair_prompt_builder_from_dict(raw_node);
713        }
714        if node.id.is_none() {
715            node.id = Some(node_id.clone());
716        }
717        if node.kind.is_empty() {
718            node.kind = "stage".to_string();
719        }
720        if node.join_policy.strategy.is_empty() {
721            node.join_policy.strategy = "all".to_string();
722        }
723        if node.reduce_policy.strategy.is_empty() {
724            node.reduce_policy.strategy = "concat".to_string();
725        }
726        if node.output_contract.output_kinds.is_empty() {
727            node.output_contract.output_kinds = vec![match node.kind.as_str() {
728                "verify" => "verification_result".to_string(),
729                "reduce" => node
730                    .reduce_policy
731                    .output_kind
732                    .clone()
733                    .unwrap_or_else(|| "summary".to_string()),
734                "map" => node
735                    .map_policy
736                    .output_kind
737                    .clone()
738                    .unwrap_or_else(|| "artifact".to_string()),
739                "escalation" => "plan".to_string(),
740                _ => "artifact".to_string(),
741            }];
742        }
743        if node.retry_policy.max_attempts == 0 {
744            node.retry_policy.max_attempts = 1;
745        }
746    }
747    Ok(graph)
748}
749
750pub fn validate_workflow(
751    graph: &WorkflowGraph,
752    ceiling: Option<&CapabilityPolicy>,
753) -> WorkflowValidationReport {
754    let mut errors = Vec::new();
755    let mut warnings = Vec::new();
756
757    if !graph.nodes.contains_key(&graph.entry) {
758        errors.push(format!("entry node does not exist: {}", graph.entry));
759    }
760
761    let node_ids: BTreeSet<String> = graph.nodes.keys().cloned().collect();
762    for edge in &graph.edges {
763        if !node_ids.contains(&edge.from) {
764            errors.push(format!("edge.from references unknown node: {}", edge.from));
765        }
766        if !node_ids.contains(&edge.to) {
767            errors.push(format!("edge.to references unknown node: {}", edge.to));
768        }
769    }
770
771    let reachable_nodes = reachable_nodes(graph);
772    for node_id in &node_ids {
773        if !reachable_nodes.contains(node_id) {
774            warnings.push(format!("node is unreachable: {node_id}"));
775        }
776    }
777
778    for (node_id, node) in &graph.nodes {
779        let incoming = graph
780            .edges
781            .iter()
782            .filter(|edge| edge.to == *node_id)
783            .count();
784        let outgoing: Vec<&WorkflowEdge> = graph
785            .edges
786            .iter()
787            .filter(|edge| edge.from == *node_id)
788            .collect();
789        if let Some(min_inputs) = node.input_contract.min_inputs {
790            if let Some(max_inputs) = node.input_contract.max_inputs {
791                if min_inputs > max_inputs {
792                    errors.push(format!(
793                        "node {node_id}: input contract min_inputs exceeds max_inputs"
794                    ));
795                }
796            }
797        }
798        match node.kind.as_str() {
799            "condition" => {
800                let has_true = outgoing
801                    .iter()
802                    .any(|edge| edge.branch.as_deref() == Some("true"));
803                let has_false = outgoing
804                    .iter()
805                    .any(|edge| edge.branch.as_deref() == Some("false"));
806                if !has_true || !has_false {
807                    errors.push(format!(
808                        "node {node_id}: condition nodes require both 'true' and 'false' branch edges"
809                    ));
810                }
811            }
812            "fork" if outgoing.len() < 2 => {
813                errors.push(format!(
814                    "node {node_id}: fork nodes require at least two outgoing edges"
815                ));
816            }
817            "join" if incoming < 2 => {
818                warnings.push(format!(
819                    "node {node_id}: join node has fewer than two incoming edges"
820                ));
821            }
822            "map"
823                if node.map_policy.items.is_empty()
824                    && node.map_policy.item_artifact_kind.is_none()
825                    && node.input_contract.input_kinds.is_empty() =>
826            {
827                errors.push(format!(
828                    "node {node_id}: map nodes require items, item_artifact_kind, or input_contract.input_kinds"
829                ));
830            }
831            "reduce" if node.input_contract.input_kinds.is_empty() => {
832                warnings.push(format!(
833                    "node {node_id}: reduce node has no input_contract.input_kinds; it will consume all available artifacts"
834                ));
835            }
836            _ => {}
837        }
838    }
839
840    if let Some(ceiling) = ceiling {
841        if let Err(error) = ceiling.intersect(&graph.capability_policy) {
842            errors.push(error);
843        }
844        for (node_id, node) in &graph.nodes {
845            if let Err(error) = ceiling.intersect(&node.capability_policy) {
846                errors.push(format!("node {node_id}: {error}"));
847            }
848        }
849    }
850
851    for diagnostic in crate::tool_surface::validate_workflow_graph(graph) {
852        let message = format!("{}: {}", diagnostic.code, diagnostic.message);
853        match diagnostic.severity {
854            crate::tool_surface::ToolSurfaceSeverity::Error => errors.push(message),
855            crate::tool_surface::ToolSurfaceSeverity::Warning => warnings.push(message),
856        }
857    }
858
859    WorkflowValidationReport {
860        valid: errors.is_empty(),
861        errors,
862        warnings,
863        reachable_nodes: reachable_nodes.into_iter().collect(),
864    }
865}
866
867fn reachable_nodes(graph: &WorkflowGraph) -> BTreeSet<String> {
868    let mut seen = BTreeSet::new();
869    let mut stack = vec![graph.entry.clone()];
870    while let Some(node_id) = stack.pop() {
871        if !seen.insert(node_id.clone()) {
872            continue;
873        }
874        for edge in graph.edges.iter().filter(|edge| edge.from == node_id) {
875            stack.push(edge.to.clone());
876        }
877    }
878    seen
879}
880
881/// Pick the session id a stage should run under. Prefers an explicit
882/// `session_id` on the node's `model_policy` dict (so pipelines with
883/// `agent_session_open` / `agent_session_fork` flowing through a graph
884/// line up); falls back to a stable, node-derived id so multi-stage
885/// graphs with no explicit session share a conversation across stages.
886fn resolve_node_session_id(node: &WorkflowNode) -> String {
887    if let Some(explicit) = node
888        .raw_model_policy
889        .as_ref()
890        .and_then(|v| v.as_dict())
891        .and_then(|d| d.get("session_id"))
892        .and_then(|v| match v {
893            VmValue::String(s) if !s.trim().is_empty() => Some(s.to_string()),
894            _ => None,
895        })
896    {
897        return explicit;
898    }
899    if let Some(persisted) = node
900        .metadata
901        .get("worker_session_id")
902        .and_then(|value| value.as_str())
903        .filter(|value| !value.trim().is_empty())
904    {
905        return persisted.to_string();
906    }
907    format!("workflow_stage_{}", uuid::Uuid::now_v7())
908}
909
910fn raw_model_policy_dict(node: &WorkflowNode) -> Option<&crate::value::DictMap> {
911    node.raw_model_policy
912        .as_ref()
913        .and_then(|value| value.as_dict())
914}
915
916fn insert_json_vm_option<T: Serialize>(
917    options: &mut crate::value::DictMap,
918    key: &str,
919    value: &T,
920) -> Result<(), VmError> {
921    let json = serde_json::to_value(value).map_err(|error| {
922        VmError::Runtime(format!("workflow stage option encode error: {error}"))
923    })?;
924    options.insert(
925        crate::value::intern_key(key),
926        crate::stdlib::json_to_vm_value(&json),
927    );
928    Ok(())
929}
930
931fn merge_raw_model_policy_options(options: &mut crate::value::DictMap, node: &WorkflowNode) {
932    if let Some(raw) = raw_model_policy_dict(node) {
933        for (key, value) in raw {
934            if !matches!(value, VmValue::Nil) {
935                options.insert(key.clone(), value.clone());
936            }
937        }
938    }
939}
940
941fn stage_tools_value(node: &WorkflowNode) -> Option<VmValue> {
942    node.raw_tools.clone().or_else(|| {
943        if matches!(node.tools, serde_json::Value::Null) {
944            None
945        } else {
946            Some(crate::stdlib::json_to_vm_value(&node.tools))
947        }
948    })
949}
950
951fn add_stage_tools_option(
952    options: &mut crate::value::DictMap,
953    tools_value: &Option<VmValue>,
954    tool_names: &[String],
955) {
956    if !tool_names.is_empty() {
957        if let Some(value) = tools_value.clone() {
958            options.insert(crate::value::intern_key("tools"), value);
959        }
960    }
961}
962
963fn workflow_stage_llm_options(
964    node: &WorkflowNode,
965    stage_session_id: &str,
966    tools_value: &Option<VmValue>,
967    tool_names: &[String],
968    stage_agent_options: &super::WorkflowStageAgentOptions,
969) -> crate::value::DictMap {
970    let mut options = stage_agent_options.llm_options_vm_dict();
971    merge_raw_model_policy_options(&mut options, node);
972    options.put_str("session_id", stage_session_id);
973    options.put_str("tool_format", stage_agent_options.tool_format.clone());
974    add_stage_tools_option(&mut options, tools_value, tool_names);
975    options
976}
977
978/// Assemble the agent_loop options for one stage.
979///
980/// The policy *flattening* — collapsing the ~15 per-stage policy structs into
981/// the options dict the loop consumes — lives in Harn
982/// (`workflow_flatten_agent_loop_options` in `std/workflow/stage.harn`, design
983/// D5). Rust keeps only the enforcement leaves: it re-derives the capability
984/// ceiling (`tool spec ∩ stage capability_policy`) and, when the flattened
985/// dict re-enters the host, rejects any result whose `policy` *widens* that
986/// ceiling ([`enforce_flattened_ceiling`]). Raw model-policy / tool / compaction
987/// values cross as `VmValue`s so their closures survive the round trip.
988async fn workflow_stage_agent_loop_options(
989    ctx: &crate::vm::AsyncBuiltinCtx,
990    node: &WorkflowNode,
991    stage_session_id: &str,
992    tools_value: &Option<VmValue>,
993    tool_names: &[String],
994    stage_agent_options: &super::WorkflowStageAgentOptions,
995) -> Result<crate::value::DictMap, VmError> {
996    // Ceiling derivation stays in Rust (enforcement, not flattening): the
997    // Harn flattener may narrow it but never widen it.
998    let tool_policy = tool_capability_policy_from_spec(&node.tools);
999    let effective_policy = tool_policy
1000        .intersect(&node.capability_policy)
1001        .map_err(VmError::Runtime)?;
1002
1003    let stage_label = node
1004        .id
1005        .clone()
1006        .unwrap_or_else(|| stage_session_id.to_string());
1007
1008    let mut config = crate::value::DictMap::new();
1009    config.insert(
1010        crate::value::intern_key("base"),
1011        VmValue::dict(stage_agent_options.agent_loop_options_vm_dict()),
1012    );
1013    config.insert(
1014        crate::value::intern_key("raw_model_policy"),
1015        node.raw_model_policy.clone().unwrap_or(VmValue::Nil),
1016    );
1017    insert_json_vm_option(&mut config, "auto_compact", &node.auto_compact)?;
1018    config.insert(
1019        crate::value::intern_key("raw_auto_compact"),
1020        node.raw_auto_compact.clone().unwrap_or(VmValue::Nil),
1021    );
1022    // The host only forwards a tool spec when the stage actually exposes tools;
1023    // matching the former `add_stage_tools_option` gate keeps the dict identical.
1024    config.insert(
1025        crate::value::intern_key("tools"),
1026        if tool_names.is_empty() {
1027            VmValue::Nil
1028        } else {
1029            tools_value.clone().unwrap_or(VmValue::Nil)
1030        },
1031    );
1032    if let Some(context) = crate::orchestration::current_workflow_skill_context() {
1033        if let Some(registry) = context.registry {
1034            config.insert(crate::value::intern_key("skills"), registry);
1035        }
1036        if let Some(match_config) = context.match_config {
1037            config.insert(crate::value::intern_key("skill_match"), match_config);
1038        }
1039    }
1040    insert_json_vm_option(&mut config, "policy", &effective_policy)?;
1041    insert_json_vm_option(&mut config, "approval_policy", &node.approval_policy)?;
1042    config.put_str("session_id", stage_session_id);
1043    config.put_str("tool_format", stage_agent_options.tool_format.clone());
1044    config.put_str(
1045        "nested_kind",
1046        crate::orchestration::NestedExecutionKind::WorkflowStage.as_str(),
1047    );
1048    config.put_str("nested_label", stage_label);
1049
1050    let flattened = crate::stdlib::harn_entry::call_harn_export_by_name(
1051        ctx,
1052        "std/workflow/stage",
1053        "workflow_flatten_agent_loop_options",
1054        "workflow_flatten_agent_loop_options",
1055        &[VmValue::dict(config)],
1056    )
1057    .await?;
1058    let VmValue::Dict(options) = flattened else {
1059        return Err(VmError::Runtime(
1060            "workflow_flatten_agent_loop_options must return a dict".to_string(),
1061        ));
1062    };
1063    let options = (*options).clone();
1064    enforce_flattened_ceiling(&options, &effective_policy)?;
1065    Ok(options)
1066}
1067
1068/// Enforce the ceiling invariant on a Harn-flattened stage options dict: its
1069/// `policy` (the capability policy the loop will run under) must never widen
1070/// `ceiling`, the workflow-level grant Rust derived. This is the trust
1071/// boundary — the Harn flattener is untrusted for *authority*, only for
1072/// *shape*, so the host re-checks the returned policy rather than assuming the
1073/// flattener narrowed correctly.
1074fn enforce_flattened_ceiling(
1075    options: &crate::value::DictMap,
1076    ceiling: &CapabilityPolicy,
1077) -> Result<(), VmError> {
1078    let Some(policy_value) = options.get("policy") else {
1079        return Err(VmError::Runtime(
1080            "flattened stage options are missing the capability policy".to_string(),
1081        ));
1082    };
1083    let requested: CapabilityPolicy = serde_json::from_value(vm_value_to_json(policy_value))
1084        .map_err(|error| {
1085            VmError::Runtime(format!(
1086                "flattened stage capability policy is malformed: {error}"
1087            ))
1088        })?;
1089    ceiling
1090        .assert_within_ceiling(&requested)
1091        .map_err(|message| VmError::CategorizedError {
1092            message,
1093            category: crate::value::ErrorCategory::ToolRejected,
1094        })
1095}
1096
1097#[derive(Clone, Debug)]
1098pub struct PreparedWorkflowStageNode {
1099    pub prompt: String,
1100    pub system: Option<String>,
1101    pub run_agent_loop: bool,
1102    pub llm_options: crate::value::DictMap,
1103    pub agent_loop_options: crate::value::DictMap,
1104    pub result: Option<serde_json::Value>,
1105    pub selected: Vec<ArtifactRecord>,
1106    pub rendered_context: String,
1107    pub rendered_verification: String,
1108    pub verification_contracts: Vec<VerificationContract>,
1109    pub tool_format: String,
1110    pub stage_session_id: String,
1111}
1112
1113pub async fn prepare_stage_node(
1114    ctx: &crate::vm::AsyncBuiltinCtx,
1115    node_id: &str,
1116    node: &WorkflowNode,
1117    task: &str,
1118    artifacts: &[ArtifactRecord],
1119) -> Result<PreparedWorkflowStageNode, VmError> {
1120    let selected_stage = super::select_workflow_stage_artifacts(
1121        ctx,
1122        artifacts,
1123        &node.context_policy,
1124        &node.input_contract,
1125    )
1126    .await?;
1127    let selected = selected_stage.artifacts;
1128    let context_policy = selected_stage.context_policy;
1129    let rendered_context_override = if let Some(assembler) = node.raw_context_assembler.as_ref() {
1130        let assembled =
1131            crate::stdlib::assemble::assemble_from_options(ctx, &selected, assembler).await?;
1132        Some(super::render_assembled_chunks(&assembled))
1133    } else {
1134        None
1135    };
1136    let verification_contracts = super::stage_verification_contracts(node_id, node)?;
1137    let stage_session_id = resolve_node_session_id(node);
1138    if node.input_contract.require_transcript && !crate::agent_sessions::exists(&stage_session_id) {
1139        return Err(VmError::Runtime(format!(
1140            "workflow stage {node_id} requires an existing session \
1141             (call agent_session_open and feed session_id through model_policy \
1142             before entering this stage)"
1143        )));
1144    }
1145    if let Some(min_inputs) = node.input_contract.min_inputs {
1146        if selected.len() < min_inputs {
1147            return Err(VmError::Runtime(format!(
1148                "workflow stage {node_id} requires at least {min_inputs} input artifacts"
1149            )));
1150        }
1151    }
1152    if let Some(max_inputs) = node.input_contract.max_inputs {
1153        if selected.len() > max_inputs {
1154            return Err(VmError::Runtime(format!(
1155                "workflow stage {node_id} accepts at most {max_inputs} input artifacts"
1156            )));
1157        }
1158    }
1159    let prepared_prompt = super::prepare_workflow_stage_prompt(
1160        ctx,
1161        task,
1162        node.task_label.as_deref(),
1163        &selected,
1164        &context_policy,
1165        rendered_context_override.as_deref(),
1166        &verification_contracts,
1167    )
1168    .await?;
1169    let prompt = prepared_prompt.prompt;
1170    let rendered_context = prepared_prompt.rendered_context;
1171    let rendered_verification = prepared_prompt.rendered_verification;
1172
1173    let tool_names = tool_names_from_spec(&node.tools);
1174    let stage_agent_options = super::prepare_workflow_stage_agent_options(
1175        ctx,
1176        node,
1177        &stage_session_id,
1178        !tool_names.is_empty(),
1179    )
1180    .await?;
1181    let tool_format = stage_agent_options.tool_format.clone();
1182    let result = if node.kind == "verify" {
1183        if let Some(command) = node
1184            .verify
1185            .as_ref()
1186            .and_then(|verify| verify.as_object())
1187            .and_then(|verify| verify.get("command"))
1188            .and_then(|value| value.as_str())
1189            .map(str::trim)
1190            .filter(|value| !value.is_empty())
1191        {
1192            let (program, args) = if cfg!(target_os = "windows") {
1193                ("cmd", vec!["/C".to_string(), command.to_string()])
1194            } else {
1195                // Do not use a login shell here. On macOS, `/bin/sh -l`
1196                // reads user dotfiles such as `~/.profile`, which makes
1197                // sandboxed verification depend on out-of-worktree state.
1198                ("/bin/sh", vec!["-c".to_string(), command.to_string()])
1199            };
1200            let mut process_config = crate::stdlib::sandbox::ProcessCommandConfig {
1201                stdin_null: true,
1202                ..Default::default()
1203            };
1204            if let Some(context) = crate::stdlib::process::current_execution_context() {
1205                if let Some(cwd) = context.cwd.filter(|cwd| !cwd.is_empty()) {
1206                    crate::stdlib::sandbox::enforce_process_cwd(std::path::Path::new(&cwd))?;
1207                    process_config.cwd = Some(std::path::PathBuf::from(cwd));
1208                }
1209                if !context.env.is_empty() {
1210                    process_config.env.extend(context.env);
1211                }
1212            }
1213            let output = crate::stdlib::sandbox::command_output(program, &args, &process_config)?;
1214            let stdout = String::from_utf8_lossy(&output.stdout).to_string();
1215            let stderr = String::from_utf8_lossy(&output.stderr).to_string();
1216            let combined = if stderr.is_empty() {
1217                stdout.clone()
1218            } else if stdout.is_empty() {
1219                stderr.clone()
1220            } else {
1221                format!("{stdout}\n{stderr}")
1222            };
1223            serde_json::json!({
1224                "status": "completed",
1225                "text": combined,
1226                "visible_text": combined,
1227                "command": command,
1228                "stdout": stdout,
1229                "stderr": stderr,
1230                "exit_status": output.status.code().unwrap_or(-1),
1231                "success": output.status.success(),
1232            })
1233        } else {
1234            serde_json::json!({
1235                "status": "completed",
1236                "text": "",
1237                "visible_text": "",
1238            })
1239        }
1240    } else {
1241        let tools_value = stage_tools_value(node);
1242        let llm_options = workflow_stage_llm_options(
1243            node,
1244            &stage_session_id,
1245            &tools_value,
1246            &tool_names,
1247            &stage_agent_options,
1248        );
1249        let agent_loop_options = if stage_agent_options.run_agent_loop {
1250            workflow_stage_agent_loop_options(
1251                ctx,
1252                node,
1253                &stage_session_id,
1254                &tools_value,
1255                &tool_names,
1256                &stage_agent_options,
1257            )
1258            .await?
1259        } else {
1260            crate::value::DictMap::new()
1261        };
1262        return Ok(PreparedWorkflowStageNode {
1263            prompt,
1264            system: node.system.clone(),
1265            run_agent_loop: stage_agent_options.run_agent_loop,
1266            llm_options,
1267            agent_loop_options,
1268            result: None,
1269            selected,
1270            rendered_context,
1271            rendered_verification,
1272            verification_contracts,
1273            tool_format,
1274            stage_session_id,
1275        });
1276    };
1277
1278    Ok(PreparedWorkflowStageNode {
1279        prompt,
1280        system: node.system.clone(),
1281        run_agent_loop: false,
1282        llm_options: crate::value::DictMap::new(),
1283        agent_loop_options: crate::value::DictMap::new(),
1284        result: Some(result),
1285        selected,
1286        rendered_context,
1287        rendered_verification,
1288        verification_contracts,
1289        tool_format,
1290        stage_session_id,
1291    })
1292}
1293
1294pub fn complete_prepared_stage_node(
1295    node_id: &str,
1296    node: &WorkflowNode,
1297    prepared: &PreparedWorkflowStageNode,
1298    mut llm_result: serde_json::Value,
1299) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
1300    if let Some(payload) = llm_result.as_object_mut() {
1301        payload.insert(
1302            "prompt".to_string(),
1303            serde_json::json!(prepared.prompt.clone()),
1304        );
1305        payload.insert(
1306            "system_prompt".to_string(),
1307            serde_json::json!(node.system.clone().unwrap_or_default()),
1308        );
1309        payload.insert(
1310            "rendered_context".to_string(),
1311            serde_json::json!(prepared.rendered_context.clone()),
1312        );
1313        if !prepared.verification_contracts.is_empty() {
1314            payload.insert(
1315                "verification_contracts".to_string(),
1316                serde_json::to_value(&prepared.verification_contracts).unwrap_or_default(),
1317            );
1318            payload.insert(
1319                "rendered_verification_context".to_string(),
1320                serde_json::json!(prepared.rendered_verification.clone()),
1321            );
1322        }
1323        payload.insert(
1324            "selected_artifact_ids".to_string(),
1325            serde_json::json!(prepared
1326                .selected
1327                .iter()
1328                .map(|artifact| artifact.id.clone())
1329                .collect::<Vec<_>>()),
1330        );
1331        payload.insert(
1332            "selected_artifact_titles".to_string(),
1333            serde_json::json!(prepared
1334                .selected
1335                .iter()
1336                .map(|artifact| artifact.title.clone())
1337                .collect::<Vec<_>>()),
1338        );
1339        match payload
1340            .entry("tools".to_string())
1341            .or_insert_with(|| serde_json::json!({}))
1342        {
1343            serde_json::Value::Object(tools) => {
1344                tools.insert(
1345                    "mode".to_string(),
1346                    serde_json::json!(prepared.tool_format.clone()),
1347                );
1348            }
1349            slot => {
1350                *slot = serde_json::json!({ "mode": prepared.tool_format.clone() });
1351            }
1352        }
1353    }
1354
1355    let visible_text = llm_result["text"].as_str().unwrap_or_default().to_string();
1356    // Non-LLM stages (verify command, condition, fork, join, ...) don't produce
1357    // a "transcript" field; fall back to the input so cross-stage conversation
1358    // state survives transitions.
1359    let result_transcript = llm_result
1360        .get("transcript")
1361        .cloned()
1362        .map(|value| crate::stdlib::json_to_vm_value(&value));
1363    let session_transcript = crate::agent_sessions::snapshot(&prepared.stage_session_id);
1364    let transcript = result_transcript
1365        .or(session_transcript)
1366        .and_then(|value| redact_transcript_visibility(&value, node.output_visibility.as_deref()));
1367    let output_kind = node
1368        .output_contract
1369        .output_kinds
1370        .first()
1371        .cloned()
1372        .unwrap_or_else(|| {
1373            if node.kind == "verify" {
1374                "verification_result".to_string()
1375            } else {
1376                "artifact".to_string()
1377            }
1378        });
1379    let mut metadata = BTreeMap::new();
1380    metadata.insert(
1381        "input_artifact_ids".to_string(),
1382        serde_json::json!(prepared
1383            .selected
1384            .iter()
1385            .map(|artifact| artifact.id.clone())
1386            .collect::<Vec<_>>()),
1387    );
1388    metadata.insert("node_kind".to_string(), serde_json::json!(node.kind));
1389    if !node.approval_policy.write_path_allowlist.is_empty() {
1390        metadata.insert(
1391            "changed_paths".to_string(),
1392            serde_json::json!(node.approval_policy.write_path_allowlist),
1393        );
1394    }
1395    let artifact = ArtifactRecord {
1396        type_name: "artifact".to_string(),
1397        id: new_id("artifact"),
1398        kind: output_kind,
1399        title: Some(format!("stage {node_id} output")),
1400        text: Some(visible_text),
1401        data: Some(llm_result.clone()),
1402        source: Some(node_id.to_string()),
1403        created_at: now_rfc3339(),
1404        freshness: Some("fresh".to_string()),
1405        priority: None,
1406        lineage: prepared
1407            .selected
1408            .iter()
1409            .map(|artifact| artifact.id.clone())
1410            .collect(),
1411        relevance: Some(1.0),
1412        estimated_tokens: None,
1413        stage: Some(node_id.to_string()),
1414        metadata,
1415    }
1416    .normalize();
1417
1418    Ok((llm_result, vec![artifact], transcript))
1419}
1420
1421pub async fn execute_stage_node(
1422    ctx: &crate::vm::AsyncBuiltinCtx,
1423    node_id: &str,
1424    node: &WorkflowNode,
1425    task: &str,
1426    artifacts: &[ArtifactRecord],
1427) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
1428    let prepared = prepare_stage_node(ctx, node_id, node, task, artifacts).await?;
1429    let llm_result = if let Some(result) = prepared.result.clone() {
1430        result
1431    } else if prepared.run_agent_loop {
1432        let result = crate::stdlib::harn_entry::call_agent_loop(
1433            ctx,
1434            prepared.prompt.clone(),
1435            prepared.system.clone(),
1436            prepared.agent_loop_options.clone(),
1437        )
1438        .await?;
1439        crate::llm::vm_value_to_json(&result)
1440    } else {
1441        let args = vec![
1442            VmValue::String(arcstr::ArcStr::from(prepared.prompt.clone())),
1443            prepared
1444                .system
1445                .clone()
1446                .map(|s| VmValue::String(arcstr::ArcStr::from(s)))
1447                .unwrap_or(VmValue::Nil),
1448            VmValue::dict(prepared.llm_options.clone()),
1449        ];
1450        let opts = extract_llm_options(&args)?;
1451        let result = vm_call_llm_full(&opts).await?;
1452        crate::llm::agent_loop_result_from_llm(&result, opts)
1453    };
1454    complete_prepared_stage_node(node_id, node, &prepared, llm_result)
1455}
1456
1457pub fn append_audit_entry(
1458    graph: &mut WorkflowGraph,
1459    op: &str,
1460    node_id: Option<String>,
1461    reason: Option<String>,
1462    metadata: BTreeMap<String, serde_json::Value>,
1463) {
1464    graph.audit_log.push(WorkflowAuditEntry {
1465        id: new_id("audit"),
1466        op: op.to_string(),
1467        node_id,
1468        timestamp: now_rfc3339(),
1469        reason,
1470        metadata,
1471    });
1472}
1473
1474#[cfg(test)]
1475mod flatten_tests {
1476    use super::*;
1477    use crate::orchestration::{CapabilityPolicy, WorkflowNode};
1478    use std::collections::BTreeMap;
1479
1480    fn ceiling_with_tools(tools: &[&str]) -> CapabilityPolicy {
1481        CapabilityPolicy {
1482            tools: tools.iter().map(|t| t.to_string()).collect(),
1483            ..Default::default()
1484        }
1485    }
1486
1487    fn options_with_policy(policy: &CapabilityPolicy) -> crate::value::DictMap {
1488        let mut options = crate::value::DictMap::new();
1489        insert_json_vm_option(&mut options, "policy", policy).unwrap();
1490        options
1491    }
1492
1493    #[test]
1494    fn ceiling_pass_through_is_within() {
1495        let ceiling = ceiling_with_tools(&["read", "edit"]);
1496        // The parity path: flattener passes the ceiling through unchanged.
1497        assert!(ceiling.assert_within_ceiling(&ceiling).is_ok());
1498        let options = options_with_policy(&ceiling);
1499        assert!(enforce_flattened_ceiling(&options, &ceiling).is_ok());
1500    }
1501
1502    #[test]
1503    fn narrowing_is_allowed() {
1504        let ceiling = ceiling_with_tools(&["read", "edit", "run_command"]);
1505        let narrowed = ceiling_with_tools(&["read"]);
1506        assert!(ceiling.assert_within_ceiling(&narrowed).is_ok());
1507    }
1508
1509    #[test]
1510    fn widening_tools_is_rejected() {
1511        let ceiling = ceiling_with_tools(&["read"]);
1512        let widened = ceiling_with_tools(&["read", "run_command"]);
1513        let err = ceiling.assert_within_ceiling(&widened).unwrap_err();
1514        assert!(
1515            err.contains("run_command"),
1516            "error names the widened tool: {err}"
1517        );
1518
1519        // ... and surfaces as a ToolRejected VmError at the flatten seam.
1520        let options = options_with_policy(&widened);
1521        match enforce_flattened_ceiling(&options, &ceiling) {
1522            Err(VmError::CategorizedError { message, category }) => {
1523                assert_eq!(category, crate::value::ErrorCategory::ToolRejected);
1524                assert!(message.contains("run_command"), "message: {message}");
1525            }
1526            other => panic!("expected a ToolRejected error, got {other:?}"),
1527        }
1528    }
1529
1530    #[test]
1531    fn widening_capability_op_is_rejected() {
1532        let mut ceiling = CapabilityPolicy::default();
1533        ceiling
1534            .capabilities
1535            .insert("fs".to_string(), vec!["read".to_string()]);
1536        let mut widened = CapabilityPolicy::default();
1537        widened.capabilities.insert(
1538            "fs".to_string(),
1539            vec!["read".to_string(), "write".to_string()],
1540        );
1541        let err = ceiling.assert_within_ceiling(&widened).unwrap_err();
1542        assert!(err.contains("fs") && err.contains("write"), "error: {err}");
1543    }
1544
1545    #[test]
1546    fn adding_new_capability_is_rejected() {
1547        let mut ceiling = CapabilityPolicy::default();
1548        ceiling
1549            .capabilities
1550            .insert("fs".to_string(), vec!["read".to_string()]);
1551        let mut widened = ceiling.clone();
1552        widened
1553            .capabilities
1554            .insert("net".to_string(), vec!["connect".to_string()]);
1555        let err = ceiling.assert_within_ceiling(&widened).unwrap_err();
1556        assert!(
1557            err.contains("net"),
1558            "error names the added capability: {err}"
1559        );
1560    }
1561
1562    #[test]
1563    fn widening_recursion_budget_is_rejected() {
1564        let ceiling = CapabilityPolicy {
1565            recursion_limit: Some(2),
1566            ..Default::default()
1567        };
1568        let widened = CapabilityPolicy {
1569            recursion_limit: Some(9),
1570            ..Default::default()
1571        };
1572        assert!(ceiling.assert_within_ceiling(&widened).is_err());
1573        // Dropping the budget entirely is also a widening.
1574        let dropped = CapabilityPolicy::default();
1575        assert!(ceiling.assert_within_ceiling(&dropped).is_err());
1576        // Narrowing the budget is allowed.
1577        let narrowed = CapabilityPolicy {
1578            recursion_limit: Some(1),
1579            ..Default::default()
1580        };
1581        assert!(ceiling.assert_within_ceiling(&narrowed).is_ok());
1582    }
1583
1584    #[test]
1585    fn widening_roots_is_rejected() {
1586        let ceiling = CapabilityPolicy {
1587            workspace_roots: vec!["/repo".to_string()],
1588            ..Default::default()
1589        };
1590        let widened = CapabilityPolicy {
1591            workspace_roots: vec!["/repo".to_string(), "/etc".to_string()],
1592            ..Default::default()
1593        };
1594        let err = ceiling.assert_within_ceiling(&widened).unwrap_err();
1595        assert!(err.contains("/etc"), "error: {err}");
1596    }
1597
1598    #[test]
1599    fn widening_side_effect_level_is_rejected() {
1600        let ceiling = CapabilityPolicy {
1601            side_effect_level: Some("read_only".to_string()),
1602            ..Default::default()
1603        };
1604        let widened = CapabilityPolicy {
1605            side_effect_level: Some("network".to_string()),
1606            ..Default::default()
1607        };
1608        assert!(ceiling.assert_within_ceiling(&widened).is_err());
1609    }
1610
1611    #[test]
1612    fn unknown_side_effect_level_ranks_fail_closed() {
1613        // Canonical `rank_str` ranks an unrecognized level as `none` (0), so a
1614        // typo/injected level can never outrank a real ceiling. A ceiling of
1615        // `none` still rejects a widening to a known-higher level.
1616        let ceiling = CapabilityPolicy {
1617            side_effect_level: Some("none".to_string()),
1618            ..Default::default()
1619        };
1620        let widened = CapabilityPolicy {
1621            side_effect_level: Some("desktop_control".to_string()),
1622            ..Default::default()
1623        };
1624        assert!(ceiling.assert_within_ceiling(&widened).is_err());
1625        // An unknown requested level ranks 0 (== none), so it is within a
1626        // `none` ceiling rather than fail-open above it.
1627        let unknown = CapabilityPolicy {
1628            side_effect_level: Some("teleport".to_string()),
1629            ..Default::default()
1630        };
1631        assert!(ceiling.assert_within_ceiling(&unknown).is_ok());
1632    }
1633
1634    #[test]
1635    fn widening_process_sandbox_roots_is_rejected() {
1636        use crate::orchestration::ProcessSandboxPolicy;
1637        let ceiling = CapabilityPolicy {
1638            process_sandbox: ProcessSandboxPolicy {
1639                write_roots: vec!["/repo/.cache".to_string()],
1640                ..Default::default()
1641            },
1642            ..Default::default()
1643        };
1644        let widened = CapabilityPolicy {
1645            process_sandbox: ProcessSandboxPolicy {
1646                write_roots: vec!["/repo/.cache".to_string(), "/etc".to_string()],
1647                ..Default::default()
1648            },
1649            ..Default::default()
1650        };
1651        let err = ceiling.assert_within_ceiling(&widened).unwrap_err();
1652        assert!(
1653            err.contains("process_sandbox.write_roots") && err.contains("/etc"),
1654            "error: {err}"
1655        );
1656        // Narrowing (fewer roots) is allowed.
1657        assert!(ceiling
1658            .assert_within_ceiling(&CapabilityPolicy::default())
1659            .is_ok());
1660    }
1661
1662    #[test]
1663    fn injecting_process_sandbox_roots_into_empty_ceiling_is_rejected() {
1664        use crate::orchestration::ProcessSandboxPolicy;
1665        // The common default: a stage that never set process_sandbox has EMPTY
1666        // read/write roots — ZERO extra subprocess FS access (additive grants,
1667        // no fallback), i.e. MOST restrictive. A flattener injecting any root
1668        // must be rejected, not waved through as "unbounded".
1669        let ceiling = CapabilityPolicy::default();
1670        for (field, requested) in [
1671            (
1672                "process_sandbox.read_roots",
1673                CapabilityPolicy {
1674                    process_sandbox: ProcessSandboxPolicy {
1675                        read_roots: vec!["/etc".to_string()],
1676                        ..Default::default()
1677                    },
1678                    ..Default::default()
1679                },
1680            ),
1681            (
1682                "process_sandbox.write_roots",
1683                CapabilityPolicy {
1684                    process_sandbox: ProcessSandboxPolicy {
1685                        write_roots: vec!["/etc".to_string()],
1686                        ..Default::default()
1687                    },
1688                    ..Default::default()
1689                },
1690            ),
1691        ] {
1692            let err = ceiling.assert_within_ceiling(&requested).unwrap_err();
1693            assert!(
1694                err.contains(field) && err.contains("/etc"),
1695                "empty ceiling must reject injected {field}: {err}"
1696            );
1697        }
1698        // Empty requested against empty ceiling stays allowed (∅ ⊆ ∅).
1699        assert!(ceiling
1700            .assert_within_ceiling(&CapabilityPolicy::default())
1701            .is_ok());
1702    }
1703
1704    #[test]
1705    fn widening_process_sandbox_presets_is_rejected() {
1706        use crate::orchestration::{ProcessSandboxPolicy, ProcessSandboxPreset};
1707        let ceiling = CapabilityPolicy {
1708            process_sandbox: ProcessSandboxPolicy {
1709                presets: Some(vec![ProcessSandboxPreset::SystemRuntime]),
1710                ..Default::default()
1711            },
1712            ..Default::default()
1713        };
1714        let widened = CapabilityPolicy {
1715            process_sandbox: ProcessSandboxPolicy {
1716                presets: Some(vec![
1717                    ProcessSandboxPreset::SystemRuntime,
1718                    ProcessSandboxPreset::DeveloperToolchains,
1719                ]),
1720                ..Default::default()
1721            },
1722            ..Default::default()
1723        };
1724        let err = ceiling.assert_within_ceiling(&widened).unwrap_err();
1725        assert!(err.contains("process_sandbox presets"), "error: {err}");
1726    }
1727
1728    #[test]
1729    fn dropping_tool_arg_constraint_is_rejected() {
1730        use crate::orchestration::ToolArgConstraint;
1731        let constraint = ToolArgConstraint {
1732            tool: "edit".to_string(),
1733            arg_patterns: vec!["src/**".to_string()],
1734            arg_key: Some("path".to_string()),
1735        };
1736        let ceiling = CapabilityPolicy {
1737            tool_arg_constraints: vec![constraint],
1738            ..Default::default()
1739        };
1740        // A flattener that drops the scope constraint widens edit to anywhere.
1741        let widened = CapabilityPolicy::default();
1742        let err = ceiling.assert_within_ceiling(&widened).unwrap_err();
1743        assert!(
1744            err.contains("tool_arg_constraints") && err.contains("edit"),
1745            "error: {err}"
1746        );
1747        // Keeping it (and adding more) is allowed.
1748        let mut narrowed = ceiling.clone();
1749        narrowed.tool_arg_constraints.push(ToolArgConstraint {
1750            tool: "run_command".to_string(),
1751            arg_patterns: vec!["cargo *".to_string()],
1752            arg_key: None,
1753        });
1754        assert!(ceiling.assert_within_ceiling(&narrowed).is_ok());
1755    }
1756
1757    #[test]
1758    fn weakening_tool_annotation_is_rejected() {
1759        use crate::tool_annotations::{SideEffectLevel, ToolAnnotations, ToolArgSchema};
1760        let strong = ToolAnnotations {
1761            side_effect_level: SideEffectLevel::ReadOnly,
1762            arg_schema: ToolArgSchema {
1763                path_params: vec!["path".to_string()],
1764                ..Default::default()
1765            },
1766            ..Default::default()
1767        };
1768        let mut ceiling = CapabilityPolicy {
1769            tools: vec!["edit".to_string(), "read".to_string()],
1770            ..Default::default()
1771        };
1772        ceiling
1773            .tool_annotations
1774            .insert("edit".to_string(), strong.clone());
1775
1776        // Dropping the annotation for a still-granted tool (loses path_params →
1777        // path constraint becomes unresolvable/permissive) is a widening.
1778        let mut dropped = ceiling.clone();
1779        dropped.tool_annotations.clear();
1780        let err = ceiling.assert_within_ceiling(&dropped).unwrap_err();
1781        assert!(
1782            err.contains("tool_annotations") && err.contains("edit"),
1783            "error: {err}"
1784        );
1785
1786        // Rewriting it (e.g. lowering the side-effect level) is also rejected.
1787        let mut rewritten = ceiling.clone();
1788        rewritten.tool_annotations.insert(
1789            "edit".to_string(),
1790            ToolAnnotations {
1791                side_effect_level: SideEffectLevel::None,
1792                ..strong
1793            },
1794        );
1795        assert!(ceiling.assert_within_ceiling(&rewritten).is_err());
1796
1797        // But if the flattener narrows the tool set so `edit` is no longer
1798        // granted, dropping its annotation is fine.
1799        let narrowed_tools = CapabilityPolicy {
1800            tools: vec!["read".to_string()],
1801            ..Default::default()
1802        };
1803        assert!(ceiling.assert_within_ceiling(&narrowed_tools).is_ok());
1804    }
1805
1806    /// The pinned pre-move Rust flattening algorithm (the deleted
1807    /// `workflow_stage_agent_loop_options` body + helpers), preserved verbatim
1808    /// as the parity oracle. `flatten_matches_pre_move_rust` asserts the Harn
1809    /// flattener reproduces it dict-for-dict.
1810    fn legacy_flatten_reference(
1811        node: &WorkflowNode,
1812        session_id: &str,
1813        tool_format: &str,
1814        mut options: crate::value::DictMap,
1815        tools_value: &Option<VmValue>,
1816        tool_names: &[String],
1817    ) -> crate::value::DictMap {
1818        if let Some(raw) = node.raw_model_policy.as_ref().and_then(|v| v.as_dict()) {
1819            for (key, value) in raw {
1820                if !matches!(value, VmValue::Nil) {
1821                    options.insert(key.clone(), value.clone());
1822                }
1823            }
1824        }
1825        if !options.contains_key("command_policy") {
1826            if let Some(command_policy) = node
1827                .raw_model_policy
1828                .as_ref()
1829                .and_then(|v| v.as_dict())
1830                .and_then(|d| d.get("policy"))
1831                .and_then(|v| v.as_dict())
1832                .and_then(|p| p.get("command_policy"))
1833            {
1834                options.insert(
1835                    crate::value::intern_key("command_policy"),
1836                    command_policy.clone(),
1837                );
1838            }
1839        }
1840        if !node.auto_compact.enabled {
1841            options.insert(
1842                crate::value::intern_key("auto_compact"),
1843                VmValue::Bool(false),
1844            );
1845        } else {
1846            options.insert(
1847                crate::value::intern_key("auto_compact"),
1848                VmValue::Bool(true),
1849            );
1850            if let Some(v) = node.auto_compact.token_threshold {
1851                options.insert(
1852                    crate::value::intern_key("compact_threshold"),
1853                    VmValue::Int(v as i64),
1854                );
1855            }
1856            if let Some(v) = node.auto_compact.tool_output_max_chars {
1857                options.insert(
1858                    crate::value::intern_key("tool_output_max_chars"),
1859                    VmValue::Int(v as i64),
1860                );
1861            }
1862            if let Some(v) = node.auto_compact.hard_limit_tokens {
1863                options.insert(
1864                    crate::value::intern_key("hard_limit_tokens"),
1865                    VmValue::Int(v as i64),
1866                );
1867            }
1868            if let Some(s) = node.auto_compact.compact_strategy.as_ref() {
1869                options.put_str("compact_strategy", s.clone());
1870            }
1871            if let Some(s) = node.auto_compact.hard_limit_strategy.as_ref() {
1872                options.put_str("hard_limit_strategy", s.clone());
1873            }
1874            let raw = node.raw_auto_compact.as_ref().and_then(|v| v.as_dict());
1875            let keep = raw
1876                .and_then(|d| d.get("compact_keep_last"))
1877                .and_then(|v| v.as_int())
1878                .filter(|v| *v >= 0)
1879                .or_else(|| {
1880                    raw.and_then(|d| d.get("keep_last"))
1881                        .and_then(|v| v.as_int())
1882                        .filter(|v| *v >= 0)
1883                });
1884            if let Some(v) = keep {
1885                options.insert(
1886                    crate::value::intern_key("compact_keep_last"),
1887                    VmValue::Int(v),
1888                );
1889            }
1890            if let Some(p) = raw
1891                .and_then(|d| d.get("summarize_prompt"))
1892                .and_then(|v| match v {
1893                    VmValue::String(t) if !t.trim().is_empty() => Some(t.to_string()),
1894                    _ => None,
1895                })
1896            {
1897                options.put_str("summarize_prompt", p);
1898            }
1899            if let Some(d) = raw {
1900                for key in ["compress_callback", "mask_callback"] {
1901                    if let Some(cb) = d.get(key) {
1902                        options.insert(crate::value::intern_key(key), cb.clone());
1903                    }
1904                }
1905                if let Some(cb) = d.get("custom_compactor") {
1906                    options.insert(crate::value::intern_key("compact_callback"), cb.clone());
1907                }
1908            }
1909        }
1910        if !tool_names.is_empty() {
1911            if let Some(v) = tools_value.clone() {
1912                options.insert(crate::value::intern_key("tools"), v);
1913            }
1914        }
1915        let tool_policy = tool_capability_policy_from_spec(&node.tools);
1916        let effective = tool_policy.intersect(&node.capability_policy).unwrap();
1917        insert_json_vm_option(&mut options, "policy", &effective).unwrap();
1918        insert_json_vm_option(&mut options, "approval_policy", &node.approval_policy).unwrap();
1919        options.put_str("session_id", session_id);
1920        options.put_str("tool_format", tool_format);
1921        let label = node.id.clone().unwrap_or_else(|| session_id.to_string());
1922        crate::orchestration::annotate_nested_execution_options(
1923            &mut options,
1924            crate::orchestration::NestedExecutionKind::WorkflowStage,
1925            &label,
1926        );
1927        options
1928    }
1929
1930    fn representative_node() -> WorkflowNode {
1931        let mut raw_model_policy = BTreeMap::new();
1932        raw_model_policy.insert(
1933            "provider".to_string(),
1934            VmValue::String(arcstr::ArcStr::from("anthropic")),
1935        );
1936        raw_model_policy.insert("temperature".to_string(), VmValue::Float(0.2));
1937        // Nested command policy hoisted to the top level by the flattener.
1938        let mut nested_policy = BTreeMap::new();
1939        nested_policy.insert(
1940            "command_policy".to_string(),
1941            VmValue::String(arcstr::ArcStr::from("worktree")),
1942        );
1943        raw_model_policy.insert("policy".to_string(), VmValue::dict(nested_policy));
1944        // A nil entry must be skipped by the merge.
1945        raw_model_policy.insert("nudge".to_string(), VmValue::Nil);
1946
1947        let mut raw_auto_compact = BTreeMap::new();
1948        raw_auto_compact.insert("keep_last".to_string(), VmValue::Int(4));
1949        raw_auto_compact.insert(
1950            "summarize_prompt".to_string(),
1951            VmValue::String(arcstr::ArcStr::from("summarize tersely")),
1952        );
1953
1954        WorkflowNode {
1955            id: Some("act".to_string()),
1956            kind: "stage".to_string(),
1957            mode: Some("agent".to_string()),
1958            tools: serde_json::json!(["read", "edit"]),
1959            auto_compact: crate::orchestration::AutoCompactPolicy {
1960                enabled: true,
1961                token_threshold: Some(8000),
1962                tool_output_max_chars: Some(2000),
1963                hard_limit_tokens: Some(20000),
1964                compact_strategy: Some("summary".to_string()),
1965                hard_limit_strategy: Some("truncate".to_string()),
1966            },
1967            capability_policy: CapabilityPolicy {
1968                tools: vec!["read".to_string(), "edit".to_string()],
1969                recursion_limit: Some(3),
1970                ..Default::default()
1971            },
1972            raw_model_policy: Some(VmValue::dict(raw_model_policy)),
1973            raw_auto_compact: Some(VmValue::dict(raw_auto_compact)),
1974            ..Default::default()
1975        }
1976    }
1977
1978    #[tokio::test(flavor = "current_thread", start_paused = true)]
1979    async fn flatten_matches_pre_move_rust() {
1980        crate::reset_thread_local_state();
1981        let node = representative_node();
1982        let session_id = "session-parity";
1983        let tool_format = "text";
1984        let tool_names = vec!["read".to_string(), "edit".to_string()];
1985        let tools_value = Some(crate::stdlib::json_to_vm_value(&node.tools));
1986
1987        // Base agent_loop options (as std/workflow/options would normalize).
1988        let mut base = crate::value::DictMap::new();
1989        base.insert(
1990            crate::value::intern_key("loop_until_done"),
1991            VmValue::Bool(true),
1992        );
1993        base.insert(crate::value::intern_key("max_iterations"), VmValue::Int(16));
1994
1995        let stage_agent_options = super::super::WorkflowStageAgentOptions {
1996            run_agent_loop: true,
1997            tool_format: tool_format.to_string(),
1998            llm_options: BTreeMap::new(),
1999            agent_loop_options: base
2000                .iter()
2001                .map(|(k, v)| (k.to_string(), vm_value_to_json(v)))
2002                .collect(),
2003        };
2004
2005        let mut vm = crate::Vm::new();
2006        crate::register_vm_stdlib(&mut vm);
2007        let ctx = crate::vm::AsyncBuiltinCtx::for_test(vm);
2008
2009        let flattened = workflow_stage_agent_loop_options(
2010            &ctx,
2011            &node,
2012            session_id,
2013            &tools_value,
2014            &tool_names,
2015            &stage_agent_options,
2016        )
2017        .await
2018        .expect("harn flatten succeeds");
2019
2020        let expected = legacy_flatten_reference(
2021            &node,
2022            session_id,
2023            tool_format,
2024            base,
2025            &tools_value,
2026            &tool_names,
2027        );
2028
2029        let flattened_json = vm_value_to_json(&VmValue::dict(flattened));
2030        let expected_json = vm_value_to_json(&VmValue::dict(expected));
2031        assert_eq!(
2032            flattened_json, expected_json,
2033            "Harn flatten must be dict-equal to the pre-move Rust flatten"
2034        );
2035    }
2036}