Skip to main content

harn_vm/orchestration/
workflow.rs

1//! Workflow graph types, normalization, validation, and execution.
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::rc::Rc;
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9    apply_input_transcript_policy, apply_output_transcript_policy, new_id, now_rfc3339,
10    ArtifactRecord, BranchSemantics, CapabilityPolicy, ContextPolicy, EscalationPolicy, JoinPolicy,
11    MapPolicy, ModelPolicy, ReducePolicy, RetryPolicy, StageContract, ToolRuntimePolicyMetadata,
12    TranscriptPolicy,
13};
14use crate::llm::{extract_llm_options, vm_call_llm_full, vm_value_to_json};
15use crate::value::{VmError, VmValue};
16
17#[derive(Clone, Debug, Default, Serialize, Deserialize)]
18#[serde(default)]
19pub struct WorkflowNode {
20    pub id: Option<String>,
21    pub kind: String,
22    pub mode: Option<String>,
23    pub prompt: Option<String>,
24    pub system: Option<String>,
25    pub task_label: Option<String>,
26    pub done_sentinel: Option<String>,
27    pub tools: serde_json::Value,
28    pub model_policy: ModelPolicy,
29    pub transcript_policy: TranscriptPolicy,
30    pub context_policy: ContextPolicy,
31    pub retry_policy: RetryPolicy,
32    pub capability_policy: CapabilityPolicy,
33    pub input_contract: StageContract,
34    pub output_contract: StageContract,
35    pub branch_semantics: BranchSemantics,
36    pub map_policy: MapPolicy,
37    pub join_policy: JoinPolicy,
38    pub reduce_policy: ReducePolicy,
39    pub escalation_policy: EscalationPolicy,
40    pub verify: Option<serde_json::Value>,
41    /// Optional per-stage timeout in milliseconds.  When set, stage execution
42    /// is wrapped in `tokio::time::timeout` and will fail with a timeout error
43    /// if it exceeds the given duration.
44    #[serde(default)]
45    pub timeout_ms: Option<u64>,
46    pub metadata: BTreeMap<String, serde_json::Value>,
47    #[serde(skip)]
48    pub raw_tools: Option<VmValue>,
49    /// Raw transcript_policy VmValue dict — preserved for extracting closure
50    /// fields (compress_callback, mask_callback) that can't go through serde.
51    #[serde(skip)]
52    pub raw_transcript_policy: Option<VmValue>,
53    /// Raw model_policy VmValue dict — preserved for extracting closure
54    /// fields (post_turn_callback) that can't go through serde.
55    #[serde(skip)]
56    pub raw_model_policy: Option<VmValue>,
57}
58
59impl PartialEq for WorkflowNode {
60    fn eq(&self, other: &Self) -> bool {
61        serde_json::to_value(self).ok() == serde_json::to_value(other).ok()
62    }
63}
64
65pub fn workflow_tool_names(value: &serde_json::Value) -> Vec<String> {
66    match value {
67        serde_json::Value::Null => Vec::new(),
68        serde_json::Value::Array(items) => items
69            .iter()
70            .filter_map(|item| match item {
71                serde_json::Value::Object(map) => map
72                    .get("name")
73                    .and_then(|value| value.as_str())
74                    .filter(|name| !name.is_empty())
75                    .map(|name| name.to_string()),
76                _ => None,
77            })
78            .collect(),
79        serde_json::Value::Object(map) => {
80            if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
81                return map
82                    .get("tools")
83                    .map(workflow_tool_names)
84                    .unwrap_or_default();
85            }
86            map.get("name")
87                .and_then(|value| value.as_str())
88                .filter(|name| !name.is_empty())
89                .map(|name| vec![name.to_string()])
90                .unwrap_or_default()
91        }
92        _ => Vec::new(),
93    }
94}
95
96fn max_side_effect_level(levels: impl Iterator<Item = String>) -> Option<String> {
97    fn rank(v: &str) -> usize {
98        match v {
99            "none" => 0,
100            "read_only" => 1,
101            "workspace_write" => 2,
102            "process_exec" => 3,
103            "network" => 4,
104            _ => 5,
105        }
106    }
107    levels.max_by_key(|level| rank(level))
108}
109
110fn parse_tool_runtime_policy(
111    map: &serde_json::Map<String, serde_json::Value>,
112) -> ToolRuntimePolicyMetadata {
113    let Some(policy) = map.get("policy").and_then(|value| value.as_object()) else {
114        return ToolRuntimePolicyMetadata::default();
115    };
116
117    let capabilities = policy
118        .get("capabilities")
119        .and_then(|value| value.as_object())
120        .map(|caps| {
121            caps.iter()
122                .map(|(capability, ops)| {
123                    let values = ops
124                        .as_array()
125                        .map(|items| {
126                            items
127                                .iter()
128                                .filter_map(|item| item.as_str().map(|s| s.to_string()))
129                                .collect::<Vec<_>>()
130                        })
131                        .unwrap_or_default();
132                    (capability.clone(), values)
133                })
134                .collect::<BTreeMap<_, _>>()
135        })
136        .unwrap_or_default();
137
138    let path_params = policy
139        .get("path_params")
140        .and_then(|value| value.as_array())
141        .map(|items| {
142            items
143                .iter()
144                .filter_map(|item| item.as_str().map(|s| s.to_string()))
145                .collect::<Vec<_>>()
146        })
147        .unwrap_or_default();
148
149    ToolRuntimePolicyMetadata {
150        capabilities,
151        side_effect_level: policy
152            .get("side_effect_level")
153            .and_then(|value| value.as_str())
154            .map(|s| s.to_string()),
155        path_params,
156        mutation_classification: policy
157            .get("mutation_classification")
158            .and_then(|value| value.as_str())
159            .map(|s| s.to_string()),
160    }
161}
162
163pub fn workflow_tool_metadata(
164    value: &serde_json::Value,
165) -> BTreeMap<String, ToolRuntimePolicyMetadata> {
166    match value {
167        serde_json::Value::Null => BTreeMap::new(),
168        serde_json::Value::Array(items) => items
169            .iter()
170            .filter_map(|item| match item {
171                serde_json::Value::Object(map) => map
172                    .get("name")
173                    .and_then(|value| value.as_str())
174                    .filter(|name| !name.is_empty())
175                    .map(|name| (name.to_string(), parse_tool_runtime_policy(map))),
176                _ => None,
177            })
178            .collect(),
179        serde_json::Value::Object(map) => {
180            if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
181                return map
182                    .get("tools")
183                    .map(workflow_tool_metadata)
184                    .unwrap_or_default();
185            }
186            map.get("name")
187                .and_then(|value| value.as_str())
188                .filter(|name| !name.is_empty())
189                .map(|name| {
190                    let mut metadata = BTreeMap::new();
191                    metadata.insert(name.to_string(), parse_tool_runtime_policy(map));
192                    metadata
193                })
194                .unwrap_or_default()
195        }
196        _ => BTreeMap::new(),
197    }
198}
199
200pub fn workflow_tool_policy_from_tools(value: &serde_json::Value) -> CapabilityPolicy {
201    let tools = workflow_tool_names(value);
202    let tool_metadata = workflow_tool_metadata(value);
203    let mut capabilities: BTreeMap<String, Vec<String>> = BTreeMap::new();
204    for metadata in tool_metadata.values() {
205        for (capability, ops) in &metadata.capabilities {
206            let entry = capabilities.entry(capability.clone()).or_default();
207            for op in ops {
208                if !entry.contains(op) {
209                    entry.push(op.clone());
210                }
211            }
212            entry.sort();
213        }
214    }
215    let side_effect_level = max_side_effect_level(
216        tool_metadata
217            .values()
218            .filter_map(|metadata| metadata.side_effect_level.clone()),
219    );
220    CapabilityPolicy {
221        tools,
222        capabilities,
223        workspace_roots: Vec::new(),
224        side_effect_level,
225        recursion_limit: None,
226        tool_arg_constraints: Vec::new(),
227        tool_metadata,
228    }
229}
230
231#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
232#[serde(default)]
233pub struct WorkflowEdge {
234    pub from: String,
235    pub to: String,
236    pub branch: Option<String>,
237    pub label: Option<String>,
238}
239
240#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
241#[serde(default)]
242pub struct WorkflowGraph {
243    #[serde(rename = "_type")]
244    pub type_name: String,
245    pub id: String,
246    pub name: Option<String>,
247    pub version: usize,
248    pub entry: String,
249    pub nodes: BTreeMap<String, WorkflowNode>,
250    pub edges: Vec<WorkflowEdge>,
251    pub capability_policy: CapabilityPolicy,
252    pub metadata: BTreeMap<String, serde_json::Value>,
253    pub audit_log: Vec<WorkflowAuditEntry>,
254}
255
256#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
257#[serde(default)]
258pub struct WorkflowAuditEntry {
259    pub id: String,
260    pub op: String,
261    pub node_id: Option<String>,
262    pub timestamp: String,
263    pub reason: Option<String>,
264    pub metadata: BTreeMap<String, serde_json::Value>,
265}
266
267#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
268#[serde(default)]
269pub struct WorkflowValidationReport {
270    pub valid: bool,
271    pub errors: Vec<String>,
272    pub warnings: Vec<String>,
273    pub reachable_nodes: Vec<String>,
274}
275
276pub fn parse_workflow_node_value(value: &VmValue, label: &str) -> Result<WorkflowNode, VmError> {
277    let mut node: WorkflowNode = super::parse_json_payload(vm_value_to_json(value), label)?;
278    let dict = value.as_dict();
279    node.raw_tools = dict.and_then(|d| d.get("tools")).cloned();
280    node.raw_transcript_policy = dict.and_then(|d| d.get("transcript_policy")).cloned();
281    node.raw_model_policy = dict.and_then(|d| d.get("model_policy")).cloned();
282    Ok(node)
283}
284
285pub fn parse_workflow_node_json(
286    json: serde_json::Value,
287    label: &str,
288) -> Result<WorkflowNode, VmError> {
289    super::parse_json_payload(json, label)
290}
291
292pub fn parse_workflow_edge_json(
293    json: serde_json::Value,
294    label: &str,
295) -> Result<WorkflowEdge, VmError> {
296    super::parse_json_payload(json, label)
297}
298
299pub fn normalize_workflow_value(value: &VmValue) -> Result<WorkflowGraph, VmError> {
300    let mut graph: WorkflowGraph = super::parse_json_value(value)?;
301    let as_dict = value.as_dict().cloned().unwrap_or_default();
302
303    if graph.nodes.is_empty() {
304        for key in ["act", "verify", "repair"] {
305            if let Some(node_value) = as_dict.get(key) {
306                let mut node = parse_workflow_node_value(node_value, "orchestration")?;
307                let raw_node = node_value.as_dict().cloned().unwrap_or_default();
308                node.id = Some(key.to_string());
309                if node.kind.is_empty() {
310                    node.kind = if key == "verify" {
311                        "verify".to_string()
312                    } else {
313                        "stage".to_string()
314                    };
315                }
316                if node.model_policy.provider.is_none() {
317                    node.model_policy.provider = as_dict
318                        .get("provider")
319                        .map(|value| value.display())
320                        .filter(|value| !value.is_empty());
321                }
322                if node.model_policy.model.is_none() {
323                    node.model_policy.model = as_dict
324                        .get("model")
325                        .map(|value| value.display())
326                        .filter(|value| !value.is_empty());
327                }
328                if node.model_policy.model_tier.is_none() {
329                    node.model_policy.model_tier = as_dict
330                        .get("model_tier")
331                        .or_else(|| as_dict.get("tier"))
332                        .map(|value| value.display())
333                        .filter(|value| !value.is_empty());
334                }
335                if node.model_policy.temperature.is_none() {
336                    node.model_policy.temperature = as_dict.get("temperature").and_then(|value| {
337                        if let VmValue::Float(number) = value {
338                            Some(*number)
339                        } else {
340                            value.as_int().map(|number| number as f64)
341                        }
342                    });
343                }
344                if node.model_policy.max_tokens.is_none() {
345                    node.model_policy.max_tokens =
346                        as_dict.get("max_tokens").and_then(|value| value.as_int());
347                }
348                if node.mode.is_none() {
349                    node.mode = as_dict
350                        .get("mode")
351                        .map(|value| value.display())
352                        .filter(|value| !value.is_empty());
353                }
354                if node.done_sentinel.is_none() {
355                    node.done_sentinel = as_dict
356                        .get("done_sentinel")
357                        .map(|value| value.display())
358                        .filter(|value| !value.is_empty());
359                }
360                if key == "verify"
361                    && node.verify.is_none()
362                    && (raw_node.contains_key("assert_text")
363                        || raw_node.contains_key("command")
364                        || raw_node.contains_key("expect_status")
365                        || raw_node.contains_key("expect_text"))
366                {
367                    node.verify = Some(serde_json::json!({
368                        "assert_text": raw_node.get("assert_text").map(vm_value_to_json),
369                        "command": raw_node.get("command").map(vm_value_to_json),
370                        "expect_status": raw_node.get("expect_status").map(vm_value_to_json),
371                        "expect_text": raw_node.get("expect_text").map(vm_value_to_json),
372                    }));
373                }
374                graph.nodes.insert(key.to_string(), node);
375            }
376        }
377        if graph.entry.is_empty() && graph.nodes.contains_key("act") {
378            graph.entry = "act".to_string();
379        }
380        if graph.edges.is_empty() && graph.nodes.contains_key("act") {
381            if graph.nodes.contains_key("verify") {
382                graph.edges.push(WorkflowEdge {
383                    from: "act".to_string(),
384                    to: "verify".to_string(),
385                    branch: None,
386                    label: None,
387                });
388            }
389            if graph.nodes.contains_key("repair") {
390                graph.edges.push(WorkflowEdge {
391                    from: "verify".to_string(),
392                    to: "repair".to_string(),
393                    branch: Some("failed".to_string()),
394                    label: None,
395                });
396                graph.edges.push(WorkflowEdge {
397                    from: "repair".to_string(),
398                    to: "verify".to_string(),
399                    branch: Some("retry".to_string()),
400                    label: None,
401                });
402            }
403        }
404    }
405
406    if graph.type_name.is_empty() {
407        graph.type_name = "workflow_graph".to_string();
408    }
409    if graph.id.is_empty() {
410        graph.id = new_id("workflow");
411    }
412    if graph.version == 0 {
413        graph.version = 1;
414    }
415    if graph.entry.is_empty() {
416        graph.entry = graph
417            .nodes
418            .keys()
419            .next()
420            .cloned()
421            .unwrap_or_else(|| "act".to_string());
422    }
423    for (node_id, node) in &mut graph.nodes {
424        if node.raw_tools.is_none() {
425            node.raw_tools = as_dict
426                .get("nodes")
427                .and_then(|nodes| nodes.as_dict())
428                .and_then(|nodes| nodes.get(node_id))
429                .and_then(|node_value| node_value.as_dict())
430                .and_then(|raw_node| raw_node.get("tools"))
431                .cloned();
432        }
433        if node.id.is_none() {
434            node.id = Some(node_id.clone());
435        }
436        if node.kind.is_empty() {
437            node.kind = "stage".to_string();
438        }
439        if node.join_policy.strategy.is_empty() {
440            node.join_policy.strategy = "all".to_string();
441        }
442        if node.reduce_policy.strategy.is_empty() {
443            node.reduce_policy.strategy = "concat".to_string();
444        }
445        if node.output_contract.output_kinds.is_empty() {
446            node.output_contract.output_kinds = vec![match node.kind.as_str() {
447                "verify" => "verification_result".to_string(),
448                "reduce" => node
449                    .reduce_policy
450                    .output_kind
451                    .clone()
452                    .unwrap_or_else(|| "summary".to_string()),
453                "map" => node
454                    .map_policy
455                    .output_kind
456                    .clone()
457                    .unwrap_or_else(|| "artifact".to_string()),
458                "escalation" => "plan".to_string(),
459                _ => "artifact".to_string(),
460            }];
461        }
462        if node.retry_policy.max_attempts == 0 {
463            node.retry_policy.max_attempts = 1;
464        }
465    }
466    Ok(graph)
467}
468
469pub fn validate_workflow(
470    graph: &WorkflowGraph,
471    ceiling: Option<&CapabilityPolicy>,
472) -> WorkflowValidationReport {
473    let mut errors = Vec::new();
474    let mut warnings = Vec::new();
475
476    if !graph.nodes.contains_key(&graph.entry) {
477        errors.push(format!("entry node does not exist: {}", graph.entry));
478    }
479
480    let node_ids: BTreeSet<String> = graph.nodes.keys().cloned().collect();
481    for edge in &graph.edges {
482        if !node_ids.contains(&edge.from) {
483            errors.push(format!("edge.from references unknown node: {}", edge.from));
484        }
485        if !node_ids.contains(&edge.to) {
486            errors.push(format!("edge.to references unknown node: {}", edge.to));
487        }
488    }
489
490    let reachable_nodes = reachable_nodes(graph);
491    for node_id in &node_ids {
492        if !reachable_nodes.contains(node_id) {
493            warnings.push(format!("node is unreachable: {node_id}"));
494        }
495    }
496
497    for (node_id, node) in &graph.nodes {
498        let incoming = graph
499            .edges
500            .iter()
501            .filter(|edge| edge.to == *node_id)
502            .count();
503        let outgoing: Vec<&WorkflowEdge> = graph
504            .edges
505            .iter()
506            .filter(|edge| edge.from == *node_id)
507            .collect();
508        if let Some(min_inputs) = node.input_contract.min_inputs {
509            if let Some(max_inputs) = node.input_contract.max_inputs {
510                if min_inputs > max_inputs {
511                    errors.push(format!(
512                        "node {node_id}: input contract min_inputs exceeds max_inputs"
513                    ));
514                }
515            }
516        }
517        match node.kind.as_str() {
518            "condition" => {
519                let has_true = outgoing
520                    .iter()
521                    .any(|edge| edge.branch.as_deref() == Some("true"));
522                let has_false = outgoing
523                    .iter()
524                    .any(|edge| edge.branch.as_deref() == Some("false"));
525                if !has_true || !has_false {
526                    errors.push(format!(
527                        "node {node_id}: condition nodes require both 'true' and 'false' branch edges"
528                    ));
529                }
530            }
531            "fork" => {
532                if outgoing.len() < 2 {
533                    errors.push(format!(
534                        "node {node_id}: fork nodes require at least two outgoing edges"
535                    ));
536                }
537            }
538            "join" => {
539                if incoming < 2 {
540                    warnings.push(format!(
541                        "node {node_id}: join node has fewer than two incoming edges"
542                    ));
543                }
544            }
545            "map" => {
546                if node.map_policy.items.is_empty()
547                    && node.map_policy.item_artifact_kind.is_none()
548                    && node.input_contract.input_kinds.is_empty()
549                {
550                    errors.push(format!(
551                        "node {node_id}: map nodes require items, item_artifact_kind, or input_contract.input_kinds"
552                    ));
553                }
554            }
555            "reduce" => {
556                if node.input_contract.input_kinds.is_empty() {
557                    warnings.push(format!(
558                        "node {node_id}: reduce node has no input_contract.input_kinds; it will consume all available artifacts"
559                    ));
560                }
561            }
562            _ => {}
563        }
564    }
565
566    if let Some(ceiling) = ceiling {
567        if let Err(error) = ceiling.intersect(&graph.capability_policy) {
568            errors.push(error);
569        }
570        for (node_id, node) in &graph.nodes {
571            if let Err(error) = ceiling.intersect(&node.capability_policy) {
572                errors.push(format!("node {node_id}: {error}"));
573            }
574        }
575    }
576
577    WorkflowValidationReport {
578        valid: errors.is_empty(),
579        errors,
580        warnings,
581        reachable_nodes: reachable_nodes.into_iter().collect(),
582    }
583}
584
585fn reachable_nodes(graph: &WorkflowGraph) -> BTreeSet<String> {
586    let mut seen = BTreeSet::new();
587    let mut stack = vec![graph.entry.clone()];
588    while let Some(node_id) = stack.pop() {
589        if !seen.insert(node_id.clone()) {
590            continue;
591        }
592        for edge in graph.edges.iter().filter(|edge| edge.from == node_id) {
593            stack.push(edge.to.clone());
594        }
595    }
596    seen
597}
598
599pub async fn execute_stage_node(
600    node_id: &str,
601    node: &WorkflowNode,
602    task: &str,
603    artifacts: &[ArtifactRecord],
604    transcript: Option<VmValue>,
605) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
606    let mut selection_policy = node.context_policy.clone();
607    if selection_policy.include_kinds.is_empty() && !node.input_contract.input_kinds.is_empty() {
608        selection_policy.include_kinds = node.input_contract.input_kinds.clone();
609    }
610    let selected = super::select_artifacts_adaptive(artifacts.to_vec(), &selection_policy);
611    let rendered_context = super::render_artifacts_context(&selected, &node.context_policy);
612    let transcript = apply_input_transcript_policy(transcript, &node.transcript_policy);
613    if node.input_contract.require_transcript && transcript.is_none() {
614        return Err(VmError::Runtime(format!(
615            "workflow stage {node_id} requires transcript input"
616        )));
617    }
618    if let Some(min_inputs) = node.input_contract.min_inputs {
619        if selected.len() < min_inputs {
620            return Err(VmError::Runtime(format!(
621                "workflow stage {node_id} requires at least {min_inputs} input artifacts"
622            )));
623        }
624    }
625    if let Some(max_inputs) = node.input_contract.max_inputs {
626        if selected.len() > max_inputs {
627            return Err(VmError::Runtime(format!(
628                "workflow stage {node_id} accepts at most {max_inputs} input artifacts"
629            )));
630        }
631    }
632    let prompt = if rendered_context.is_empty() {
633        task.to_string()
634    } else {
635        format!(
636            "{rendered_context}\n\n{}:\n{task}",
637            node.task_label
638                .clone()
639                .unwrap_or_else(|| "Task".to_string())
640        )
641    };
642
643    let tool_format = std::env::var("HARN_AGENT_TOOL_FORMAT")
644        .ok()
645        .filter(|value| !value.trim().is_empty())
646        .unwrap_or_else(|| {
647            // Auto-detect from model/provider alias config.
648            let model = std::env::var("HARN_LLM_MODEL").unwrap_or_default();
649            let provider = std::env::var("HARN_LLM_PROVIDER").unwrap_or_default();
650            crate::llm_config::default_tool_format(&model, &provider)
651        });
652    let mut llm_result = if node.kind == "verify" {
653        if let Some(command) = node
654            .verify
655            .as_ref()
656            .and_then(|verify| verify.as_object())
657            .and_then(|verify| verify.get("command"))
658            .and_then(|value| value.as_str())
659            .map(str::trim)
660            .filter(|value| !value.is_empty())
661        {
662            let mut process = if cfg!(target_os = "windows") {
663                let mut cmd = tokio::process::Command::new("cmd");
664                cmd.arg("/C").arg(command);
665                cmd
666            } else {
667                let mut cmd = tokio::process::Command::new("/bin/sh");
668                cmd.arg("-lc").arg(command);
669                cmd
670            };
671            process.stdin(std::process::Stdio::null());
672            if let Some(context) = crate::stdlib::process::current_execution_context() {
673                if let Some(cwd) = context.cwd.filter(|cwd| !cwd.is_empty()) {
674                    process.current_dir(cwd);
675                }
676                if !context.env.is_empty() {
677                    process.envs(context.env);
678                }
679            }
680            let output = process
681                .output()
682                .await
683                .map_err(|e| VmError::Runtime(format!("workflow verify exec failed: {e}")))?;
684            let stdout = String::from_utf8_lossy(&output.stdout).to_string();
685            let stderr = String::from_utf8_lossy(&output.stderr).to_string();
686            let combined = if stderr.is_empty() {
687                stdout.clone()
688            } else if stdout.is_empty() {
689                stderr.clone()
690            } else {
691                format!("{stdout}\n{stderr}")
692            };
693            serde_json::json!({
694                "status": "completed",
695                "text": combined,
696                "visible_text": combined,
697                "command": command,
698                "stdout": stdout,
699                "stderr": stderr,
700                "exit_status": output.status.code().unwrap_or(-1),
701                "success": output.status.success(),
702            })
703        } else {
704            serde_json::json!({
705                "status": "completed",
706                "text": "",
707                "visible_text": "",
708            })
709        }
710    } else {
711        let mut options = BTreeMap::new();
712        if let Some(provider) = &node.model_policy.provider {
713            options.insert(
714                "provider".to_string(),
715                VmValue::String(Rc::from(provider.clone())),
716            );
717        }
718        if let Some(model) = &node.model_policy.model {
719            options.insert(
720                "model".to_string(),
721                VmValue::String(Rc::from(model.clone())),
722            );
723        }
724        if let Some(model_tier) = &node.model_policy.model_tier {
725            options.insert(
726                "model_tier".to_string(),
727                VmValue::String(Rc::from(model_tier.clone())),
728            );
729        }
730        if let Some(temperature) = node.model_policy.temperature {
731            options.insert("temperature".to_string(), VmValue::Float(temperature));
732        }
733        if let Some(max_tokens) = node.model_policy.max_tokens {
734            options.insert("max_tokens".to_string(), VmValue::Int(max_tokens));
735        }
736        let tool_names = workflow_tool_names(&node.tools);
737        let tools_value = node.raw_tools.clone().or_else(|| {
738            if matches!(node.tools, serde_json::Value::Null) {
739                None
740            } else {
741                Some(crate::stdlib::json_to_vm_value(&node.tools))
742            }
743        });
744        if tools_value.is_some() && !tool_names.is_empty() {
745            options.insert("tools".to_string(), tools_value.unwrap_or(VmValue::Nil));
746        }
747        if let Some(transcript) = transcript.clone() {
748            options.insert("transcript".to_string(), transcript);
749        }
750
751        let args = vec![
752            VmValue::String(Rc::from(prompt.clone())),
753            node.system
754                .clone()
755                .map(|s| VmValue::String(Rc::from(s)))
756                .unwrap_or(VmValue::Nil),
757            VmValue::Dict(Rc::new(options)),
758        ];
759        let mut opts = extract_llm_options(&args)?;
760
761        if node.mode.as_deref() == Some("agent") || !tool_names.is_empty() {
762            // Build auto-compact config from transcript_policy fields
763            let auto_compact = if node.transcript_policy.auto_compact {
764                let mut ac = crate::orchestration::AutoCompactConfig::default();
765                if let Some(v) = node.transcript_policy.compact_threshold {
766                    ac.token_threshold = v;
767                }
768                if let Some(v) = node.transcript_policy.tool_output_max_chars {
769                    ac.tool_output_max_chars = v;
770                }
771                if let Some(ref strategy) = node.transcript_policy.compact_strategy {
772                    if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
773                        ac.compact_strategy = s;
774                    }
775                }
776                if let Some(v) = node.transcript_policy.hard_limit_tokens {
777                    ac.hard_limit_tokens = Some(v);
778                }
779                if let Some(ref strategy) = node.transcript_policy.hard_limit_strategy {
780                    if let Ok(s) = crate::orchestration::parse_compact_strategy(strategy) {
781                        ac.hard_limit_strategy = s;
782                    }
783                }
784                // Extract closure fields from raw transcript_policy dict
785                if let Some(ref raw_tp) = node.raw_transcript_policy {
786                    if let Some(dict) = raw_tp.as_dict() {
787                        if let Some(cb) = dict.get("compress_callback") {
788                            ac.compress_callback = Some(cb.clone());
789                        }
790                        if let Some(cb) = dict.get("mask_callback") {
791                            ac.mask_callback = Some(cb.clone());
792                        }
793                    }
794                }
795                // Adapt thresholds to provider context window
796                {
797                    let user_specified_threshold =
798                        node.transcript_policy.compact_threshold.is_some();
799                    let user_specified_hard_limit =
800                        node.transcript_policy.hard_limit_tokens.is_some();
801                    crate::llm::api::adapt_auto_compact_to_provider(
802                        &mut ac,
803                        user_specified_threshold,
804                        user_specified_hard_limit,
805                        &opts.provider,
806                        &opts.model,
807                        &opts.api_key,
808                    )
809                    .await;
810                }
811                Some(ac)
812            } else {
813                None
814            };
815            crate::llm::run_agent_loop_internal(
816                &mut opts,
817                crate::llm::AgentLoopConfig {
818                    persistent: true,
819                    max_iterations: node.model_policy.max_iterations.unwrap_or(16),
820                    max_nudges: node.model_policy.max_nudges.unwrap_or(3),
821                    nudge: node.model_policy.nudge.clone(),
822                    done_sentinel: node.done_sentinel.clone(),
823                    break_unless_phase: None,
824                    tool_retries: 0,
825                    tool_backoff_ms: 1000,
826                    tool_format: tool_format.clone(),
827                    auto_compact,
828                    context_callback: None,
829                    policy: None,
830                    daemon: false,
831                    llm_retries: 2,
832                    llm_backoff_ms: 2000,
833                    exit_when_verified: false,
834                    loop_detect_warn: 2,
835                    loop_detect_block: 3,
836                    loop_detect_skip: 4,
837                    tool_examples: node.model_policy.tool_examples.clone(),
838                    // Extract post_turn_callback from raw dict since serde(skip) drops it
839                    post_turn_callback: node
840                        .raw_model_policy
841                        .as_ref()
842                        .and_then(|v| v.as_dict())
843                        .and_then(|d| d.get("post_turn_callback"))
844                        .cloned(),
845                },
846            )
847            .await?
848        } else {
849            let result = vm_call_llm_full(&opts).await?;
850            crate::llm::agent_loop_result_from_llm(&result, opts)
851        }
852    };
853    if let Some(payload) = llm_result.as_object_mut() {
854        payload.insert("prompt".to_string(), serde_json::json!(prompt));
855        payload.insert(
856            "system_prompt".to_string(),
857            serde_json::json!(node.system.clone().unwrap_or_default()),
858        );
859        payload.insert(
860            "rendered_context".to_string(),
861            serde_json::json!(rendered_context),
862        );
863        payload.insert(
864            "selected_artifact_ids".to_string(),
865            serde_json::json!(selected
866                .iter()
867                .map(|artifact| artifact.id.clone())
868                .collect::<Vec<_>>()),
869        );
870        payload.insert(
871            "selected_artifact_titles".to_string(),
872            serde_json::json!(selected
873                .iter()
874                .map(|artifact| artifact.title.clone())
875                .collect::<Vec<_>>()),
876        );
877        payload.insert(
878            "tool_calling_mode".to_string(),
879            serde_json::json!(tool_format.clone()),
880        );
881    }
882
883    let visible_text = llm_result["text"].as_str().unwrap_or_default().to_string();
884    let transcript = llm_result
885        .get("transcript")
886        .cloned()
887        .map(|value| crate::stdlib::json_to_vm_value(&value));
888    let transcript = apply_output_transcript_policy(transcript, &node.transcript_policy);
889    let output_kind = node
890        .output_contract
891        .output_kinds
892        .first()
893        .cloned()
894        .unwrap_or_else(|| {
895            if node.kind == "verify" {
896                "verification_result".to_string()
897            } else {
898                "artifact".to_string()
899            }
900        });
901    let mut metadata = BTreeMap::new();
902    metadata.insert(
903        "input_artifact_ids".to_string(),
904        serde_json::json!(selected
905            .iter()
906            .map(|artifact| artifact.id.clone())
907            .collect::<Vec<_>>()),
908    );
909    metadata.insert("node_kind".to_string(), serde_json::json!(node.kind));
910    let artifact = ArtifactRecord {
911        type_name: "artifact".to_string(),
912        id: new_id("artifact"),
913        kind: output_kind,
914        title: Some(format!("stage {node_id} output")),
915        text: Some(visible_text),
916        data: Some(llm_result.clone()),
917        source: Some(node_id.to_string()),
918        created_at: now_rfc3339(),
919        freshness: Some("fresh".to_string()),
920        priority: None,
921        lineage: selected
922            .iter()
923            .map(|artifact| artifact.id.clone())
924            .collect(),
925        relevance: Some(1.0),
926        estimated_tokens: None,
927        stage: Some(node_id.to_string()),
928        metadata,
929    }
930    .normalize();
931
932    Ok((llm_result, vec![artifact], transcript))
933}
934
935pub fn next_nodes_for(
936    graph: &WorkflowGraph,
937    current: &str,
938    branch: Option<&str>,
939) -> Vec<WorkflowEdge> {
940    let mut matching: Vec<WorkflowEdge> = graph
941        .edges
942        .iter()
943        .filter(|edge| edge.from == current && edge.branch.as_deref() == branch)
944        .cloned()
945        .collect();
946    if matching.is_empty() {
947        matching = graph
948            .edges
949            .iter()
950            .filter(|edge| edge.from == current && edge.branch.is_none())
951            .cloned()
952            .collect();
953    }
954    matching
955}
956
957pub fn next_node_for(graph: &WorkflowGraph, current: &str, branch: &str) -> Option<String> {
958    next_nodes_for(graph, current, Some(branch))
959        .into_iter()
960        .next()
961        .map(|edge| edge.to)
962}
963
964pub fn append_audit_entry(
965    graph: &mut WorkflowGraph,
966    op: &str,
967    node_id: Option<String>,
968    reason: Option<String>,
969    metadata: BTreeMap<String, serde_json::Value>,
970) {
971    graph.audit_log.push(WorkflowAuditEntry {
972        id: new_id("audit"),
973        op: op.to_string(),
974        node_id,
975        timestamp: now_rfc3339(),
976        reason,
977        metadata,
978    });
979}