Skip to main content

harn_vm/
orchestration.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::path::{Path, PathBuf};
3use std::rc::Rc;
4use std::{cell::RefCell, thread_local};
5
6use serde::{Deserialize, Serialize};
7
8use crate::llm::{extract_llm_options, vm_call_llm_full, vm_value_to_json};
9use crate::value::{VmError, VmValue};
10
11fn now_rfc3339() -> String {
12    use std::time::{SystemTime, UNIX_EPOCH};
13    let ts = SystemTime::now()
14        .duration_since(UNIX_EPOCH)
15        .unwrap_or_default()
16        .as_secs();
17    format!("{ts}")
18}
19
20fn new_id(prefix: &str) -> String {
21    format!("{prefix}_{}", uuid::Uuid::now_v7())
22}
23
24fn default_run_dir() -> PathBuf {
25    std::env::var("HARN_RUN_DIR")
26        .map(PathBuf::from)
27        .unwrap_or_else(|_| PathBuf::from(".harn-runs"))
28}
29
30thread_local! {
31    static EXECUTION_POLICY_STACK: RefCell<Vec<CapabilityPolicy>> = const { RefCell::new(Vec::new()) };
32}
33
34fn normalize_artifact_kind(kind: &str) -> String {
35    match kind {
36        "resource"
37        | "workspace_file"
38        | "editor_selection"
39        | "workspace_snapshot"
40        | "transcript_summary"
41        | "summary"
42        | "plan"
43        | "diff"
44        | "git_diff"
45        | "patch"
46        | "patch_set"
47        | "patch_proposal"
48        | "diff_review"
49        | "review_decision"
50        | "verification_bundle"
51        | "apply_intent"
52        | "verification_result"
53        | "test_result"
54        | "command_result"
55        | "provider_payload"
56        | "worker_result"
57        | "worker_notification"
58        | "artifact" => kind.to_string(),
59        "file" => "workspace_file".to_string(),
60        "transcript" => "transcript_summary".to_string(),
61        "verification" => "verification_result".to_string(),
62        "test" => "test_result".to_string(),
63        other if other.trim().is_empty() => "artifact".to_string(),
64        other => other.to_string(),
65    }
66}
67
68fn default_artifact_priority(kind: &str) -> i64 {
69    match kind {
70        "verification_result" | "test_result" => 100,
71        "verification_bundle" => 95,
72        "diff" | "git_diff" | "patch" | "patch_set" | "patch_proposal" | "diff_review"
73        | "review_decision" | "apply_intent" => 90,
74        "plan" => 80,
75        "workspace_file" | "workspace_snapshot" | "editor_selection" | "resource" => 70,
76        "summary" | "transcript_summary" => 60,
77        "command_result" => 50,
78        _ => 40,
79    }
80}
81
82fn freshness_rank(value: Option<&str>) -> i64 {
83    match value.unwrap_or_default() {
84        "fresh" | "live" => 3,
85        "recent" => 2,
86        "stale" => 0,
87        _ => 1,
88    }
89}
90
91#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
92#[serde(default)]
93pub struct CapabilityPolicy {
94    pub tools: Vec<String>,
95    pub capabilities: BTreeMap<String, Vec<String>>,
96    pub workspace_roots: Vec<String>,
97    pub side_effect_level: Option<String>,
98    pub recursion_limit: Option<usize>,
99}
100
101impl CapabilityPolicy {
102    pub fn intersect(&self, requested: &CapabilityPolicy) -> Result<CapabilityPolicy, String> {
103        let side_effect_level = match (&self.side_effect_level, &requested.side_effect_level) {
104            (Some(a), Some(b)) => Some(min_side_effect(a, b).to_string()),
105            (Some(a), None) => Some(a.clone()),
106            (None, Some(b)) => Some(b.clone()),
107            (None, None) => None,
108        };
109
110        if !self.tools.is_empty() {
111            let denied: Vec<String> = requested
112                .tools
113                .iter()
114                .filter(|tool| !self.tools.contains(*tool))
115                .cloned()
116                .collect();
117            if !denied.is_empty() {
118                return Err(format!(
119                    "requested tools exceed host ceiling: {}",
120                    denied.join(", ")
121                ));
122            }
123        }
124
125        for (capability, requested_ops) in &requested.capabilities {
126            if let Some(allowed_ops) = self.capabilities.get(capability) {
127                let denied: Vec<String> = requested_ops
128                    .iter()
129                    .filter(|op| !allowed_ops.contains(*op))
130                    .cloned()
131                    .collect();
132                if !denied.is_empty() {
133                    return Err(format!(
134                        "requested capability operations exceed host ceiling: {}.{}",
135                        capability,
136                        denied.join(",")
137                    ));
138                }
139            } else if !self.capabilities.is_empty() {
140                return Err(format!(
141                    "requested capability exceeds host ceiling: {capability}"
142                ));
143            }
144        }
145
146        let tools = if self.tools.is_empty() {
147            requested.tools.clone()
148        } else if requested.tools.is_empty() {
149            self.tools.clone()
150        } else {
151            requested
152                .tools
153                .iter()
154                .filter(|tool| self.tools.contains(*tool))
155                .cloned()
156                .collect()
157        };
158
159        let capabilities = if self.capabilities.is_empty() {
160            requested.capabilities.clone()
161        } else if requested.capabilities.is_empty() {
162            self.capabilities.clone()
163        } else {
164            requested
165                .capabilities
166                .iter()
167                .filter_map(|(capability, requested_ops)| {
168                    self.capabilities.get(capability).map(|allowed_ops| {
169                        (
170                            capability.clone(),
171                            requested_ops
172                                .iter()
173                                .filter(|op| allowed_ops.contains(*op))
174                                .cloned()
175                                .collect::<Vec<_>>(),
176                        )
177                    })
178                })
179                .collect()
180        };
181
182        let workspace_roots = if self.workspace_roots.is_empty() {
183            requested.workspace_roots.clone()
184        } else if requested.workspace_roots.is_empty() {
185            self.workspace_roots.clone()
186        } else {
187            requested
188                .workspace_roots
189                .iter()
190                .filter(|root| self.workspace_roots.contains(*root))
191                .cloned()
192                .collect()
193        };
194
195        let recursion_limit = match (self.recursion_limit, requested.recursion_limit) {
196            (Some(a), Some(b)) => Some(a.min(b)),
197            (Some(a), None) => Some(a),
198            (None, Some(b)) => Some(b),
199            (None, None) => None,
200        };
201
202        Ok(CapabilityPolicy {
203            tools,
204            capabilities,
205            workspace_roots,
206            side_effect_level,
207            recursion_limit,
208        })
209    }
210}
211
212fn min_side_effect<'a>(a: &'a str, b: &'a str) -> &'a str {
213    fn rank(v: &str) -> usize {
214        match v {
215            "none" => 0,
216            "read_only" => 1,
217            "workspace_write" => 2,
218            "process_exec" => 3,
219            "network" => 4,
220            _ => 5,
221        }
222    }
223    if rank(a) <= rank(b) {
224        a
225    } else {
226        b
227    }
228}
229
230#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
231#[serde(default)]
232pub struct ModelPolicy {
233    pub provider: Option<String>,
234    pub model: Option<String>,
235    pub model_tier: Option<String>,
236    pub temperature: Option<f64>,
237    pub max_tokens: Option<i64>,
238}
239
240#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
241#[serde(default)]
242pub struct TranscriptPolicy {
243    pub mode: Option<String>,
244    pub visibility: Option<String>,
245    pub summarize: bool,
246    pub compact: bool,
247    pub keep_last: Option<usize>,
248}
249
250#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
251#[serde(default)]
252pub struct ContextPolicy {
253    pub max_artifacts: Option<usize>,
254    pub max_tokens: Option<usize>,
255    pub reserve_tokens: Option<usize>,
256    pub include_kinds: Vec<String>,
257    pub exclude_kinds: Vec<String>,
258    pub prioritize_kinds: Vec<String>,
259    pub pinned_ids: Vec<String>,
260    pub include_stages: Vec<String>,
261    pub prefer_recent: bool,
262    pub prefer_fresh: bool,
263    pub render: Option<String>,
264}
265
266#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
267#[serde(default)]
268pub struct RetryPolicy {
269    pub max_attempts: usize,
270    pub verify: bool,
271    pub repair: bool,
272}
273
274#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
275#[serde(default)]
276pub struct StageContract {
277    pub input_kinds: Vec<String>,
278    pub output_kinds: Vec<String>,
279    pub min_inputs: Option<usize>,
280    pub max_inputs: Option<usize>,
281    pub require_transcript: bool,
282    pub schema: Option<serde_json::Value>,
283}
284
285#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
286#[serde(default)]
287pub struct BranchSemantics {
288    pub success: Option<String>,
289    pub failure: Option<String>,
290    pub verify_pass: Option<String>,
291    pub verify_fail: Option<String>,
292    pub condition_true: Option<String>,
293    pub condition_false: Option<String>,
294    pub loop_continue: Option<String>,
295    pub loop_exit: Option<String>,
296    pub escalation: Option<String>,
297}
298
299#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
300#[serde(default)]
301pub struct MapPolicy {
302    pub items: Vec<serde_json::Value>,
303    pub item_artifact_kind: Option<String>,
304    pub output_kind: Option<String>,
305    pub max_items: Option<usize>,
306}
307
308#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
309#[serde(default)]
310pub struct JoinPolicy {
311    pub strategy: String,
312    pub require_all_inputs: bool,
313    pub min_completed: Option<usize>,
314}
315
316#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
317#[serde(default)]
318pub struct ReducePolicy {
319    pub strategy: String,
320    pub separator: Option<String>,
321    pub output_kind: Option<String>,
322}
323
324#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
325#[serde(default)]
326pub struct EscalationPolicy {
327    pub level: Option<String>,
328    pub queue: Option<String>,
329    pub reason: Option<String>,
330}
331
332#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
333#[serde(default)]
334pub struct ArtifactRecord {
335    #[serde(rename = "_type")]
336    pub type_name: String,
337    pub id: String,
338    pub kind: String,
339    pub title: Option<String>,
340    pub text: Option<String>,
341    pub data: Option<serde_json::Value>,
342    pub source: Option<String>,
343    pub created_at: String,
344    pub freshness: Option<String>,
345    pub priority: Option<i64>,
346    pub lineage: Vec<String>,
347    pub relevance: Option<f64>,
348    pub estimated_tokens: Option<usize>,
349    pub stage: Option<String>,
350    pub metadata: BTreeMap<String, serde_json::Value>,
351}
352
353impl ArtifactRecord {
354    pub fn normalize(mut self) -> Self {
355        if self.type_name.is_empty() {
356            self.type_name = "artifact".to_string();
357        }
358        if self.id.is_empty() {
359            self.id = new_id("artifact");
360        }
361        if self.created_at.is_empty() {
362            self.created_at = now_rfc3339();
363        }
364        if self.kind.is_empty() {
365            self.kind = "artifact".to_string();
366        }
367        self.kind = normalize_artifact_kind(&self.kind);
368        if self.estimated_tokens.is_none() {
369            self.estimated_tokens = self
370                .text
371                .as_ref()
372                .map(|text| ((text.len() as f64) / 4.0).ceil() as usize);
373        }
374        if self.priority.is_none() {
375            self.priority = Some(default_artifact_priority(&self.kind));
376        }
377        self
378    }
379}
380
381#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
382#[serde(default)]
383pub struct WorkflowNode {
384    pub id: Option<String>,
385    pub kind: String,
386    pub mode: Option<String>,
387    pub prompt: Option<String>,
388    pub system: Option<String>,
389    pub task_label: Option<String>,
390    pub tools: Vec<String>,
391    pub model_policy: ModelPolicy,
392    pub transcript_policy: TranscriptPolicy,
393    pub context_policy: ContextPolicy,
394    pub retry_policy: RetryPolicy,
395    pub capability_policy: CapabilityPolicy,
396    pub input_contract: StageContract,
397    pub output_contract: StageContract,
398    pub branch_semantics: BranchSemantics,
399    pub map_policy: MapPolicy,
400    pub join_policy: JoinPolicy,
401    pub reduce_policy: ReducePolicy,
402    pub escalation_policy: EscalationPolicy,
403    pub verify: Option<serde_json::Value>,
404    pub metadata: BTreeMap<String, serde_json::Value>,
405}
406
407#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
408#[serde(default)]
409pub struct WorkflowEdge {
410    pub from: String,
411    pub to: String,
412    pub branch: Option<String>,
413    pub label: Option<String>,
414}
415
416#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
417#[serde(default)]
418pub struct WorkflowGraph {
419    #[serde(rename = "_type")]
420    pub type_name: String,
421    pub id: String,
422    pub name: Option<String>,
423    pub version: usize,
424    pub entry: String,
425    pub nodes: BTreeMap<String, WorkflowNode>,
426    pub edges: Vec<WorkflowEdge>,
427    pub capability_policy: CapabilityPolicy,
428    pub metadata: BTreeMap<String, serde_json::Value>,
429    pub audit_log: Vec<WorkflowAuditEntry>,
430}
431
432#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
433#[serde(default)]
434pub struct WorkflowAuditEntry {
435    pub id: String,
436    pub op: String,
437    pub node_id: Option<String>,
438    pub timestamp: String,
439    pub reason: Option<String>,
440    pub metadata: BTreeMap<String, serde_json::Value>,
441}
442
443#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
444#[serde(default)]
445pub struct RunStageRecord {
446    pub id: String,
447    pub node_id: String,
448    pub kind: String,
449    pub status: String,
450    pub outcome: String,
451    pub branch: Option<String>,
452    pub started_at: String,
453    pub finished_at: Option<String>,
454    pub visible_text: Option<String>,
455    pub private_reasoning: Option<String>,
456    pub transcript: Option<serde_json::Value>,
457    pub verification: Option<serde_json::Value>,
458    pub artifacts: Vec<ArtifactRecord>,
459    pub consumed_artifact_ids: Vec<String>,
460    pub produced_artifact_ids: Vec<String>,
461    pub attempts: Vec<RunStageAttemptRecord>,
462    pub metadata: BTreeMap<String, serde_json::Value>,
463}
464
465#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
466#[serde(default)]
467pub struct RunStageAttemptRecord {
468    pub attempt: usize,
469    pub status: String,
470    pub outcome: String,
471    pub branch: Option<String>,
472    pub error: Option<String>,
473    pub verification: Option<serde_json::Value>,
474    pub started_at: String,
475    pub finished_at: Option<String>,
476}
477
478#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
479#[serde(default)]
480pub struct RunTransitionRecord {
481    pub id: String,
482    pub from_stage_id: Option<String>,
483    pub from_node_id: Option<String>,
484    pub to_node_id: String,
485    pub branch: Option<String>,
486    pub timestamp: String,
487    pub consumed_artifact_ids: Vec<String>,
488    pub produced_artifact_ids: Vec<String>,
489}
490
491#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
492#[serde(default)]
493pub struct RunCheckpointRecord {
494    pub id: String,
495    pub ready_nodes: Vec<String>,
496    pub completed_nodes: Vec<String>,
497    pub last_stage_id: Option<String>,
498    pub persisted_at: String,
499    pub reason: String,
500}
501
502#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
503#[serde(default)]
504pub struct ReplayFixture {
505    #[serde(rename = "_type")]
506    pub type_name: String,
507    pub id: String,
508    pub source_run_id: String,
509    pub workflow_id: String,
510    pub workflow_name: Option<String>,
511    pub created_at: String,
512    pub expected_status: String,
513    pub stage_assertions: Vec<ReplayStageAssertion>,
514}
515
516#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
517#[serde(default)]
518pub struct ReplayStageAssertion {
519    pub node_id: String,
520    pub expected_status: String,
521    pub expected_outcome: String,
522    pub expected_branch: Option<String>,
523    pub required_artifact_kinds: Vec<String>,
524    pub visible_text_contains: Option<String>,
525}
526
527#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
528#[serde(default)]
529pub struct ReplayEvalReport {
530    pub pass: bool,
531    pub failures: Vec<String>,
532    pub stage_count: usize,
533}
534
535#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
536#[serde(default)]
537pub struct ReplayEvalCaseReport {
538    pub run_id: String,
539    pub workflow_id: String,
540    pub label: Option<String>,
541    pub pass: bool,
542    pub failures: Vec<String>,
543    pub stage_count: usize,
544    pub source_path: Option<String>,
545    pub comparison: Option<RunDiffReport>,
546}
547
548#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
549#[serde(default)]
550pub struct ReplayEvalSuiteReport {
551    pub pass: bool,
552    pub total: usize,
553    pub passed: usize,
554    pub failed: usize,
555    pub cases: Vec<ReplayEvalCaseReport>,
556}
557
558#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
559#[serde(default)]
560pub struct RunStageDiffRecord {
561    pub node_id: String,
562    pub change: String,
563    pub details: Vec<String>,
564}
565
566#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
567#[serde(default)]
568pub struct RunDiffReport {
569    pub left_run_id: String,
570    pub right_run_id: String,
571    pub identical: bool,
572    pub status_changed: bool,
573    pub left_status: String,
574    pub right_status: String,
575    pub stage_diffs: Vec<RunStageDiffRecord>,
576    pub transition_count_delta: isize,
577    pub artifact_count_delta: isize,
578    pub checkpoint_count_delta: isize,
579}
580
581#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
582#[serde(default)]
583pub struct EvalSuiteManifest {
584    #[serde(rename = "_type")]
585    pub type_name: String,
586    pub id: String,
587    pub name: Option<String>,
588    pub base_dir: Option<String>,
589    pub cases: Vec<EvalSuiteCase>,
590}
591
592#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
593#[serde(default)]
594pub struct EvalSuiteCase {
595    pub label: Option<String>,
596    pub run_path: String,
597    pub fixture_path: Option<String>,
598    pub compare_to: Option<String>,
599}
600
601#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
602#[serde(default)]
603pub struct RunRecord {
604    #[serde(rename = "_type")]
605    pub type_name: String,
606    pub id: String,
607    pub workflow_id: String,
608    pub workflow_name: Option<String>,
609    pub task: String,
610    pub status: String,
611    pub started_at: String,
612    pub finished_at: Option<String>,
613    pub parent_run_id: Option<String>,
614    pub root_run_id: Option<String>,
615    pub stages: Vec<RunStageRecord>,
616    pub transitions: Vec<RunTransitionRecord>,
617    pub checkpoints: Vec<RunCheckpointRecord>,
618    pub pending_nodes: Vec<String>,
619    pub completed_nodes: Vec<String>,
620    pub child_runs: Vec<RunChildRecord>,
621    pub artifacts: Vec<ArtifactRecord>,
622    pub policy: CapabilityPolicy,
623    pub execution: Option<RunExecutionRecord>,
624    pub transcript: Option<serde_json::Value>,
625    pub replay_fixture: Option<ReplayFixture>,
626    pub metadata: BTreeMap<String, serde_json::Value>,
627    pub persisted_path: Option<String>,
628}
629
630#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
631#[serde(default)]
632pub struct RunChildRecord {
633    pub worker_id: String,
634    pub worker_name: String,
635    pub parent_stage_id: Option<String>,
636    pub task: String,
637    pub status: String,
638    pub started_at: String,
639    pub finished_at: Option<String>,
640    pub run_id: Option<String>,
641    pub run_path: Option<String>,
642    pub snapshot_path: Option<String>,
643    pub execution: Option<RunExecutionRecord>,
644}
645
646#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
647#[serde(default)]
648pub struct RunExecutionRecord {
649    pub cwd: Option<String>,
650    pub source_dir: Option<String>,
651    pub env: BTreeMap<String, String>,
652    pub adapter: Option<String>,
653    pub repo_path: Option<String>,
654    pub worktree_path: Option<String>,
655    pub branch: Option<String>,
656    pub base_ref: Option<String>,
657    pub cleanup: Option<String>,
658}
659
660#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
661#[serde(default)]
662pub struct WorkflowValidationReport {
663    pub valid: bool,
664    pub errors: Vec<String>,
665    pub warnings: Vec<String>,
666    pub reachable_nodes: Vec<String>,
667}
668
669fn parse_json_value<T: for<'de> Deserialize<'de>>(value: &VmValue) -> Result<T, VmError> {
670    serde_json::from_value(vm_value_to_json(value))
671        .map_err(|e| VmError::Runtime(format!("orchestration parse error: {e}")))
672}
673
674pub fn normalize_workflow_value(value: &VmValue) -> Result<WorkflowGraph, VmError> {
675    let mut graph: WorkflowGraph = parse_json_value(value)?;
676    let as_dict = value.as_dict().cloned().unwrap_or_default();
677
678    if graph.nodes.is_empty() {
679        for key in ["act", "verify", "repair"] {
680            if let Some(node_value) = as_dict.get(key) {
681                let mut node: WorkflowNode = parse_json_value(node_value)?;
682                let raw_node = node_value.as_dict().cloned().unwrap_or_default();
683                node.id = Some(key.to_string());
684                if node.kind.is_empty() {
685                    node.kind = if key == "verify" {
686                        "verify".to_string()
687                    } else {
688                        "stage".to_string()
689                    };
690                }
691                if node.model_policy.provider.is_none() {
692                    node.model_policy.provider = as_dict
693                        .get("provider")
694                        .map(|value| value.display())
695                        .filter(|value| !value.is_empty());
696                }
697                if node.model_policy.model.is_none() {
698                    node.model_policy.model = as_dict
699                        .get("model")
700                        .map(|value| value.display())
701                        .filter(|value| !value.is_empty());
702                }
703                if node.model_policy.model_tier.is_none() {
704                    node.model_policy.model_tier = as_dict
705                        .get("model_tier")
706                        .or_else(|| as_dict.get("tier"))
707                        .map(|value| value.display())
708                        .filter(|value| !value.is_empty());
709                }
710                if node.model_policy.temperature.is_none() {
711                    node.model_policy.temperature = as_dict.get("temperature").and_then(|value| {
712                        if let VmValue::Float(number) = value {
713                            Some(*number)
714                        } else {
715                            value.as_int().map(|number| number as f64)
716                        }
717                    });
718                }
719                if node.model_policy.max_tokens.is_none() {
720                    node.model_policy.max_tokens =
721                        as_dict.get("max_tokens").and_then(|value| value.as_int());
722                }
723                if node.mode.is_none() {
724                    node.mode = as_dict
725                        .get("mode")
726                        .map(|value| value.display())
727                        .filter(|value| !value.is_empty());
728                }
729                if key == "verify"
730                    && node.verify.is_none()
731                    && (raw_node.contains_key("assert_text")
732                        || raw_node.contains_key("command")
733                        || raw_node.contains_key("expect_status")
734                        || raw_node.contains_key("expect_text"))
735                {
736                    node.verify = Some(serde_json::json!({
737                        "assert_text": raw_node.get("assert_text").map(vm_value_to_json),
738                        "command": raw_node.get("command").map(vm_value_to_json),
739                        "expect_status": raw_node.get("expect_status").map(vm_value_to_json),
740                        "expect_text": raw_node.get("expect_text").map(vm_value_to_json),
741                    }));
742                }
743                graph.nodes.insert(key.to_string(), node);
744            }
745        }
746        if graph.entry.is_empty() && graph.nodes.contains_key("act") {
747            graph.entry = "act".to_string();
748        }
749        if graph.edges.is_empty() && graph.nodes.contains_key("act") {
750            if graph.nodes.contains_key("verify") {
751                graph.edges.push(WorkflowEdge {
752                    from: "act".to_string(),
753                    to: "verify".to_string(),
754                    branch: None,
755                    label: None,
756                });
757            }
758            if graph.nodes.contains_key("repair") {
759                graph.edges.push(WorkflowEdge {
760                    from: "verify".to_string(),
761                    to: "repair".to_string(),
762                    branch: Some("failed".to_string()),
763                    label: None,
764                });
765                graph.edges.push(WorkflowEdge {
766                    from: "repair".to_string(),
767                    to: "verify".to_string(),
768                    branch: Some("retry".to_string()),
769                    label: None,
770                });
771            }
772        }
773    }
774
775    if graph.type_name.is_empty() {
776        graph.type_name = "workflow_graph".to_string();
777    }
778    if graph.id.is_empty() {
779        graph.id = new_id("workflow");
780    }
781    if graph.version == 0 {
782        graph.version = 1;
783    }
784    if graph.entry.is_empty() {
785        graph.entry = graph
786            .nodes
787            .keys()
788            .next()
789            .cloned()
790            .unwrap_or_else(|| "act".to_string());
791    }
792    for (node_id, node) in &mut graph.nodes {
793        if node.id.is_none() {
794            node.id = Some(node_id.clone());
795        }
796        if node.kind.is_empty() {
797            node.kind = "stage".to_string();
798        }
799        if node.join_policy.strategy.is_empty() {
800            node.join_policy.strategy = "all".to_string();
801        }
802        if node.reduce_policy.strategy.is_empty() {
803            node.reduce_policy.strategy = "concat".to_string();
804        }
805        if node.output_contract.output_kinds.is_empty() {
806            node.output_contract.output_kinds = vec![match node.kind.as_str() {
807                "verify" => "verification_result".to_string(),
808                "reduce" => node
809                    .reduce_policy
810                    .output_kind
811                    .clone()
812                    .unwrap_or_else(|| "summary".to_string()),
813                "map" => node
814                    .map_policy
815                    .output_kind
816                    .clone()
817                    .unwrap_or_else(|| "artifact".to_string()),
818                "escalation" => "plan".to_string(),
819                _ => "artifact".to_string(),
820            }];
821        }
822        if node.retry_policy.max_attempts == 0 {
823            node.retry_policy.max_attempts = 1;
824        }
825    }
826    Ok(graph)
827}
828
829pub fn validate_workflow(
830    graph: &WorkflowGraph,
831    ceiling: Option<&CapabilityPolicy>,
832) -> WorkflowValidationReport {
833    let mut errors = Vec::new();
834    let mut warnings = Vec::new();
835
836    if !graph.nodes.contains_key(&graph.entry) {
837        errors.push(format!("entry node does not exist: {}", graph.entry));
838    }
839
840    let node_ids: BTreeSet<String> = graph.nodes.keys().cloned().collect();
841    for edge in &graph.edges {
842        if !node_ids.contains(&edge.from) {
843            errors.push(format!("edge.from references unknown node: {}", edge.from));
844        }
845        if !node_ids.contains(&edge.to) {
846            errors.push(format!("edge.to references unknown node: {}", edge.to));
847        }
848    }
849
850    let reachable_nodes = reachable_nodes(graph);
851    for node_id in &node_ids {
852        if !reachable_nodes.contains(node_id) {
853            warnings.push(format!("node is unreachable: {node_id}"));
854        }
855    }
856
857    for (node_id, node) in &graph.nodes {
858        let incoming = graph
859            .edges
860            .iter()
861            .filter(|edge| edge.to == *node_id)
862            .count();
863        let outgoing: Vec<&WorkflowEdge> = graph
864            .edges
865            .iter()
866            .filter(|edge| edge.from == *node_id)
867            .collect();
868        if let Some(min_inputs) = node.input_contract.min_inputs {
869            if let Some(max_inputs) = node.input_contract.max_inputs {
870                if min_inputs > max_inputs {
871                    errors.push(format!(
872                        "node {node_id}: input contract min_inputs exceeds max_inputs"
873                    ));
874                }
875            }
876        }
877        match node.kind.as_str() {
878            "condition" => {
879                let has_true = outgoing
880                    .iter()
881                    .any(|edge| edge.branch.as_deref() == Some("true"));
882                let has_false = outgoing
883                    .iter()
884                    .any(|edge| edge.branch.as_deref() == Some("false"));
885                if !has_true || !has_false {
886                    errors.push(format!(
887                        "node {node_id}: condition nodes require both 'true' and 'false' branch edges"
888                    ));
889                }
890            }
891            "fork" => {
892                if outgoing.len() < 2 {
893                    errors.push(format!(
894                        "node {node_id}: fork nodes require at least two outgoing edges"
895                    ));
896                }
897            }
898            "join" => {
899                if incoming < 2 {
900                    warnings.push(format!(
901                        "node {node_id}: join node has fewer than two incoming edges"
902                    ));
903                }
904            }
905            "map" => {
906                if node.map_policy.items.is_empty()
907                    && node.map_policy.item_artifact_kind.is_none()
908                    && node.input_contract.input_kinds.is_empty()
909                {
910                    errors.push(format!(
911                        "node {node_id}: map nodes require items, item_artifact_kind, or input_contract.input_kinds"
912                    ));
913                }
914            }
915            "reduce" => {
916                if node.input_contract.input_kinds.is_empty() {
917                    warnings.push(format!(
918                        "node {node_id}: reduce node has no input_contract.input_kinds; it will consume all available artifacts"
919                    ));
920                }
921            }
922            _ => {}
923        }
924    }
925
926    if let Some(ceiling) = ceiling {
927        if let Err(error) = ceiling.intersect(&graph.capability_policy) {
928            errors.push(error);
929        }
930        for (node_id, node) in &graph.nodes {
931            if let Err(error) = ceiling.intersect(&node.capability_policy) {
932                errors.push(format!("node {node_id}: {error}"));
933            }
934        }
935    }
936
937    WorkflowValidationReport {
938        valid: errors.is_empty(),
939        errors,
940        warnings,
941        reachable_nodes: reachable_nodes.into_iter().collect(),
942    }
943}
944
945fn reachable_nodes(graph: &WorkflowGraph) -> BTreeSet<String> {
946    let mut seen = BTreeSet::new();
947    let mut stack = vec![graph.entry.clone()];
948    while let Some(node_id) = stack.pop() {
949        if !seen.insert(node_id.clone()) {
950            continue;
951        }
952        for edge in graph.edges.iter().filter(|edge| edge.from == node_id) {
953            stack.push(edge.to.clone());
954        }
955    }
956    seen
957}
958
959pub fn select_artifacts(
960    mut artifacts: Vec<ArtifactRecord>,
961    policy: &ContextPolicy,
962) -> Vec<ArtifactRecord> {
963    artifacts.retain(|artifact| {
964        (policy.include_kinds.is_empty() || policy.include_kinds.contains(&artifact.kind))
965            && !policy.exclude_kinds.contains(&artifact.kind)
966            && (policy.include_stages.is_empty()
967                || artifact
968                    .stage
969                    .as_ref()
970                    .is_some_and(|stage| policy.include_stages.contains(stage)))
971    });
972    artifacts.sort_by(|a, b| {
973        let b_pinned = policy.pinned_ids.contains(&b.id);
974        let a_pinned = policy.pinned_ids.contains(&a.id);
975        b_pinned
976            .cmp(&a_pinned)
977            .then_with(|| {
978                let b_prio_kind = policy.prioritize_kinds.contains(&b.kind);
979                let a_prio_kind = policy.prioritize_kinds.contains(&a.kind);
980                b_prio_kind.cmp(&a_prio_kind)
981            })
982            .then_with(|| {
983                b.priority
984                    .unwrap_or_default()
985                    .cmp(&a.priority.unwrap_or_default())
986            })
987            .then_with(|| {
988                if policy.prefer_fresh {
989                    freshness_rank(b.freshness.as_deref())
990                        .cmp(&freshness_rank(a.freshness.as_deref()))
991                } else {
992                    std::cmp::Ordering::Equal
993                }
994            })
995            .then_with(|| {
996                if policy.prefer_recent {
997                    b.created_at.cmp(&a.created_at)
998                } else {
999                    std::cmp::Ordering::Equal
1000                }
1001            })
1002            .then_with(|| {
1003                b.relevance
1004                    .partial_cmp(&a.relevance)
1005                    .unwrap_or(std::cmp::Ordering::Equal)
1006            })
1007            .then_with(|| {
1008                a.estimated_tokens
1009                    .unwrap_or(usize::MAX)
1010                    .cmp(&b.estimated_tokens.unwrap_or(usize::MAX))
1011            })
1012    });
1013
1014    let mut selected = Vec::new();
1015    let mut used_tokens = 0usize;
1016    let reserve_tokens = policy.reserve_tokens.unwrap_or(0);
1017    let effective_max_tokens = policy
1018        .max_tokens
1019        .map(|max| max.saturating_sub(reserve_tokens));
1020    for artifact in artifacts {
1021        if let Some(max_artifacts) = policy.max_artifacts {
1022            if selected.len() >= max_artifacts {
1023                break;
1024            }
1025        }
1026        let next_tokens = artifact.estimated_tokens.unwrap_or(0);
1027        if let Some(max_tokens) = effective_max_tokens {
1028            if used_tokens + next_tokens > max_tokens {
1029                continue;
1030            }
1031        }
1032        used_tokens += next_tokens;
1033        selected.push(artifact);
1034    }
1035    selected
1036}
1037
1038pub fn render_artifacts_context(artifacts: &[ArtifactRecord], policy: &ContextPolicy) -> String {
1039    let mut parts = Vec::new();
1040    for artifact in artifacts {
1041        let title = artifact
1042            .title
1043            .clone()
1044            .unwrap_or_else(|| format!("{} {}", artifact.kind, artifact.id));
1045        let body = artifact
1046            .text
1047            .clone()
1048            .or_else(|| artifact.data.as_ref().map(|v| v.to_string()))
1049            .unwrap_or_default();
1050        match policy.render.as_deref() {
1051            Some("json") => {
1052                parts.push(
1053                    serde_json::json!({
1054                        "id": artifact.id,
1055                        "kind": artifact.kind,
1056                        "title": title,
1057                        "source": artifact.source,
1058                        "freshness": artifact.freshness,
1059                        "priority": artifact.priority,
1060                        "text": body,
1061                    })
1062                    .to_string(),
1063                );
1064            }
1065            _ => parts.push(format!(
1066                "[{title}] kind={} source={} freshness={} priority={}\n{}",
1067                artifact.kind,
1068                artifact
1069                    .source
1070                    .clone()
1071                    .unwrap_or_else(|| "unknown".to_string()),
1072                artifact
1073                    .freshness
1074                    .clone()
1075                    .unwrap_or_else(|| "normal".to_string()),
1076                artifact.priority.unwrap_or_default(),
1077                body
1078            )),
1079        }
1080    }
1081    parts.join("\n\n")
1082}
1083
1084pub fn normalize_artifact(value: &VmValue) -> Result<ArtifactRecord, VmError> {
1085    let artifact: ArtifactRecord = parse_json_value(value)?;
1086    Ok(artifact.normalize())
1087}
1088
1089pub fn normalize_run_record(value: &VmValue) -> Result<RunRecord, VmError> {
1090    let mut run: RunRecord = parse_json_value(value)?;
1091    if run.type_name.is_empty() {
1092        run.type_name = "run_record".to_string();
1093    }
1094    if run.id.is_empty() {
1095        run.id = new_id("run");
1096    }
1097    if run.started_at.is_empty() {
1098        run.started_at = now_rfc3339();
1099    }
1100    if run.status.is_empty() {
1101        run.status = "running".to_string();
1102    }
1103    if run.root_run_id.is_none() {
1104        run.root_run_id = Some(run.id.clone());
1105    }
1106    if run.replay_fixture.is_none() {
1107        run.replay_fixture = Some(replay_fixture_from_run(&run));
1108    }
1109    Ok(run)
1110}
1111
1112pub fn normalize_eval_suite_manifest(value: &VmValue) -> Result<EvalSuiteManifest, VmError> {
1113    let mut manifest: EvalSuiteManifest = parse_json_value(value)?;
1114    if manifest.type_name.is_empty() {
1115        manifest.type_name = "eval_suite_manifest".to_string();
1116    }
1117    if manifest.id.is_empty() {
1118        manifest.id = new_id("eval_suite");
1119    }
1120    Ok(manifest)
1121}
1122
1123fn load_replay_fixture(path: &Path) -> Result<ReplayFixture, VmError> {
1124    let content = std::fs::read_to_string(path)
1125        .map_err(|e| VmError::Runtime(format!("failed to read replay fixture: {e}")))?;
1126    serde_json::from_str(&content)
1127        .map_err(|e| VmError::Runtime(format!("failed to parse replay fixture: {e}")))
1128}
1129
1130fn resolve_manifest_path(base_dir: Option<&Path>, path: &str) -> PathBuf {
1131    let path_buf = PathBuf::from(path);
1132    if path_buf.is_absolute() {
1133        path_buf
1134    } else if let Some(base_dir) = base_dir {
1135        base_dir.join(path_buf)
1136    } else {
1137        path_buf
1138    }
1139}
1140
1141pub fn evaluate_run_suite_manifest(
1142    manifest: &EvalSuiteManifest,
1143) -> Result<ReplayEvalSuiteReport, VmError> {
1144    let base_dir = manifest.base_dir.as_deref().map(Path::new);
1145    let mut reports = Vec::new();
1146    for case in &manifest.cases {
1147        let run_path = resolve_manifest_path(base_dir, &case.run_path);
1148        let run = load_run_record(&run_path)?;
1149        let fixture = match &case.fixture_path {
1150            Some(path) => load_replay_fixture(&resolve_manifest_path(base_dir, path))?,
1151            None => run
1152                .replay_fixture
1153                .clone()
1154                .unwrap_or_else(|| replay_fixture_from_run(&run)),
1155        };
1156        let eval = evaluate_run_against_fixture(&run, &fixture);
1157        let mut pass = eval.pass;
1158        let mut failures = eval.failures;
1159        let comparison = match &case.compare_to {
1160            Some(path) => {
1161                let baseline_path = resolve_manifest_path(base_dir, path);
1162                let baseline = load_run_record(&baseline_path)?;
1163                let diff = diff_run_records(&baseline, &run);
1164                if !diff.identical {
1165                    pass = false;
1166                    failures.push(format!(
1167                        "run differs from baseline {} with {} stage changes",
1168                        baseline_path.display(),
1169                        diff.stage_diffs.len()
1170                    ));
1171                }
1172                Some(diff)
1173            }
1174            None => None,
1175        };
1176        reports.push(ReplayEvalCaseReport {
1177            run_id: run.id.clone(),
1178            workflow_id: run.workflow_id.clone(),
1179            label: case.label.clone(),
1180            pass,
1181            failures,
1182            stage_count: eval.stage_count,
1183            source_path: Some(run_path.display().to_string()),
1184            comparison,
1185        });
1186    }
1187    let total = reports.len();
1188    let passed = reports.iter().filter(|report| report.pass).count();
1189    let failed = total.saturating_sub(passed);
1190    Ok(ReplayEvalSuiteReport {
1191        pass: failed == 0,
1192        total,
1193        passed,
1194        failed,
1195        cases: reports,
1196    })
1197}
1198
1199pub fn render_unified_diff(path: Option<&str>, before: &str, after: &str) -> String {
1200    let before_lines: Vec<&str> = before.lines().collect();
1201    let after_lines: Vec<&str> = after.lines().collect();
1202    let mut table = vec![vec![0usize; after_lines.len() + 1]; before_lines.len() + 1];
1203    for i in (0..before_lines.len()).rev() {
1204        for j in (0..after_lines.len()).rev() {
1205            table[i][j] = if before_lines[i] == after_lines[j] {
1206                table[i + 1][j + 1] + 1
1207            } else {
1208                table[i + 1][j].max(table[i][j + 1])
1209            };
1210        }
1211    }
1212
1213    let mut diff = String::new();
1214    let file = path.unwrap_or("artifact");
1215    diff.push_str(&format!("--- a/{file}\n+++ b/{file}\n"));
1216    let mut i = 0;
1217    let mut j = 0;
1218    while i < before_lines.len() && j < after_lines.len() {
1219        if before_lines[i] == after_lines[j] {
1220            diff.push_str(&format!(" {}\n", before_lines[i]));
1221            i += 1;
1222            j += 1;
1223        } else if table[i + 1][j] >= table[i][j + 1] {
1224            diff.push_str(&format!("-{}\n", before_lines[i]));
1225            i += 1;
1226        } else {
1227            diff.push_str(&format!("+{}\n", after_lines[j]));
1228            j += 1;
1229        }
1230    }
1231    while i < before_lines.len() {
1232        diff.push_str(&format!("-{}\n", before_lines[i]));
1233        i += 1;
1234    }
1235    while j < after_lines.len() {
1236        diff.push_str(&format!("+{}\n", after_lines[j]));
1237        j += 1;
1238    }
1239    diff
1240}
1241
1242pub fn save_run_record(run: &RunRecord, path: Option<&str>) -> Result<String, VmError> {
1243    let path = path
1244        .map(PathBuf::from)
1245        .unwrap_or_else(|| default_run_dir().join(format!("{}.json", run.id)));
1246    if let Some(parent) = path.parent() {
1247        std::fs::create_dir_all(parent)
1248            .map_err(|e| VmError::Runtime(format!("failed to create run directory: {e}")))?;
1249    }
1250    let json = serde_json::to_string_pretty(run)
1251        .map_err(|e| VmError::Runtime(format!("failed to encode run record: {e}")))?;
1252    std::fs::write(&path, json)
1253        .map_err(|e| VmError::Runtime(format!("failed to persist run record: {e}")))?;
1254    Ok(path.to_string_lossy().to_string())
1255}
1256
1257pub fn load_run_record(path: &Path) -> Result<RunRecord, VmError> {
1258    let content = std::fs::read_to_string(path)
1259        .map_err(|e| VmError::Runtime(format!("failed to read run record: {e}")))?;
1260    serde_json::from_str(&content)
1261        .map_err(|e| VmError::Runtime(format!("failed to parse run record: {e}")))
1262}
1263
1264pub fn replay_fixture_from_run(run: &RunRecord) -> ReplayFixture {
1265    ReplayFixture {
1266        type_name: "replay_fixture".to_string(),
1267        id: new_id("fixture"),
1268        source_run_id: run.id.clone(),
1269        workflow_id: run.workflow_id.clone(),
1270        workflow_name: run.workflow_name.clone(),
1271        created_at: now_rfc3339(),
1272        expected_status: run.status.clone(),
1273        stage_assertions: run
1274            .stages
1275            .iter()
1276            .map(|stage| ReplayStageAssertion {
1277                node_id: stage.node_id.clone(),
1278                expected_status: stage.status.clone(),
1279                expected_outcome: stage.outcome.clone(),
1280                expected_branch: stage.branch.clone(),
1281                required_artifact_kinds: stage
1282                    .artifacts
1283                    .iter()
1284                    .map(|artifact| artifact.kind.clone())
1285                    .collect(),
1286                visible_text_contains: stage
1287                    .visible_text
1288                    .as_ref()
1289                    .filter(|text| !text.is_empty())
1290                    .map(|text| text.chars().take(80).collect()),
1291            })
1292            .collect(),
1293    }
1294}
1295
1296pub fn evaluate_run_against_fixture(run: &RunRecord, fixture: &ReplayFixture) -> ReplayEvalReport {
1297    let mut failures = Vec::new();
1298    if run.status != fixture.expected_status {
1299        failures.push(format!(
1300            "run status mismatch: expected {}, got {}",
1301            fixture.expected_status, run.status
1302        ));
1303    }
1304    for assertion in &fixture.stage_assertions {
1305        let Some(stage) = run
1306            .stages
1307            .iter()
1308            .find(|stage| stage.node_id == assertion.node_id)
1309        else {
1310            failures.push(format!("missing stage {}", assertion.node_id));
1311            continue;
1312        };
1313        if stage.status != assertion.expected_status {
1314            failures.push(format!(
1315                "stage {} status mismatch: expected {}, got {}",
1316                assertion.node_id, assertion.expected_status, stage.status
1317            ));
1318        }
1319        if stage.outcome != assertion.expected_outcome {
1320            failures.push(format!(
1321                "stage {} outcome mismatch: expected {}, got {}",
1322                assertion.node_id, assertion.expected_outcome, stage.outcome
1323            ));
1324        }
1325        if stage.branch != assertion.expected_branch {
1326            failures.push(format!(
1327                "stage {} branch mismatch: expected {:?}, got {:?}",
1328                assertion.node_id, assertion.expected_branch, stage.branch
1329            ));
1330        }
1331        for required_kind in &assertion.required_artifact_kinds {
1332            if !stage
1333                .artifacts
1334                .iter()
1335                .any(|artifact| &artifact.kind == required_kind)
1336            {
1337                failures.push(format!(
1338                    "stage {} missing artifact kind {}",
1339                    assertion.node_id, required_kind
1340                ));
1341            }
1342        }
1343        if let Some(snippet) = &assertion.visible_text_contains {
1344            let actual = stage.visible_text.clone().unwrap_or_default();
1345            if !actual.contains(snippet) {
1346                failures.push(format!(
1347                    "stage {} visible text does not contain expected snippet {:?}",
1348                    assertion.node_id, snippet
1349                ));
1350            }
1351        }
1352    }
1353
1354    ReplayEvalReport {
1355        pass: failures.is_empty(),
1356        failures,
1357        stage_count: run.stages.len(),
1358    }
1359}
1360
1361pub fn evaluate_run_suite(
1362    cases: Vec<(RunRecord, ReplayFixture, Option<String>)>,
1363) -> ReplayEvalSuiteReport {
1364    let mut reports = Vec::new();
1365    for (run, fixture, source_path) in cases {
1366        let report = evaluate_run_against_fixture(&run, &fixture);
1367        reports.push(ReplayEvalCaseReport {
1368            run_id: run.id.clone(),
1369            workflow_id: run.workflow_id.clone(),
1370            label: None,
1371            pass: report.pass,
1372            failures: report.failures,
1373            stage_count: report.stage_count,
1374            source_path,
1375            comparison: None,
1376        });
1377    }
1378    let total = reports.len();
1379    let passed = reports.iter().filter(|report| report.pass).count();
1380    let failed = total.saturating_sub(passed);
1381    ReplayEvalSuiteReport {
1382        pass: failed == 0,
1383        total,
1384        passed,
1385        failed,
1386        cases: reports,
1387    }
1388}
1389
1390pub fn diff_run_records(left: &RunRecord, right: &RunRecord) -> RunDiffReport {
1391    let mut stage_diffs = Vec::new();
1392    let mut all_node_ids = BTreeSet::new();
1393    all_node_ids.extend(left.stages.iter().map(|stage| stage.node_id.clone()));
1394    all_node_ids.extend(right.stages.iter().map(|stage| stage.node_id.clone()));
1395
1396    for node_id in all_node_ids {
1397        let left_stage = left.stages.iter().find(|stage| stage.node_id == node_id);
1398        let right_stage = right.stages.iter().find(|stage| stage.node_id == node_id);
1399        match (left_stage, right_stage) {
1400            (Some(_), None) => stage_diffs.push(RunStageDiffRecord {
1401                node_id,
1402                change: "removed".to_string(),
1403                details: vec!["stage missing from right run".to_string()],
1404            }),
1405            (None, Some(_)) => stage_diffs.push(RunStageDiffRecord {
1406                node_id,
1407                change: "added".to_string(),
1408                details: vec!["stage missing from left run".to_string()],
1409            }),
1410            (Some(left_stage), Some(right_stage)) => {
1411                let mut details = Vec::new();
1412                if left_stage.status != right_stage.status {
1413                    details.push(format!(
1414                        "status: {} -> {}",
1415                        left_stage.status, right_stage.status
1416                    ));
1417                }
1418                if left_stage.outcome != right_stage.outcome {
1419                    details.push(format!(
1420                        "outcome: {} -> {}",
1421                        left_stage.outcome, right_stage.outcome
1422                    ));
1423                }
1424                if left_stage.branch != right_stage.branch {
1425                    details.push(format!(
1426                        "branch: {:?} -> {:?}",
1427                        left_stage.branch, right_stage.branch
1428                    ));
1429                }
1430                if left_stage.produced_artifact_ids.len() != right_stage.produced_artifact_ids.len()
1431                {
1432                    details.push(format!(
1433                        "produced_artifacts: {} -> {}",
1434                        left_stage.produced_artifact_ids.len(),
1435                        right_stage.produced_artifact_ids.len()
1436                    ));
1437                }
1438                if left_stage.artifacts.len() != right_stage.artifacts.len() {
1439                    details.push(format!(
1440                        "artifact_records: {} -> {}",
1441                        left_stage.artifacts.len(),
1442                        right_stage.artifacts.len()
1443                    ));
1444                }
1445                if !details.is_empty() {
1446                    stage_diffs.push(RunStageDiffRecord {
1447                        node_id,
1448                        change: "changed".to_string(),
1449                        details,
1450                    });
1451                }
1452            }
1453            (None, None) => {}
1454        }
1455    }
1456
1457    let status_changed = left.status != right.status;
1458    let identical = !status_changed
1459        && stage_diffs.is_empty()
1460        && left.transitions.len() == right.transitions.len()
1461        && left.artifacts.len() == right.artifacts.len()
1462        && left.checkpoints.len() == right.checkpoints.len();
1463
1464    RunDiffReport {
1465        left_run_id: left.id.clone(),
1466        right_run_id: right.id.clone(),
1467        identical,
1468        status_changed,
1469        left_status: left.status.clone(),
1470        right_status: right.status.clone(),
1471        stage_diffs,
1472        transition_count_delta: right.transitions.len() as isize - left.transitions.len() as isize,
1473        artifact_count_delta: right.artifacts.len() as isize - left.artifacts.len() as isize,
1474        checkpoint_count_delta: right.checkpoints.len() as isize - left.checkpoints.len() as isize,
1475    }
1476}
1477
1478pub fn push_execution_policy(policy: CapabilityPolicy) {
1479    EXECUTION_POLICY_STACK.with(|stack| stack.borrow_mut().push(policy));
1480}
1481
1482pub fn pop_execution_policy() {
1483    EXECUTION_POLICY_STACK.with(|stack| {
1484        stack.borrow_mut().pop();
1485    });
1486}
1487
1488pub fn current_execution_policy() -> Option<CapabilityPolicy> {
1489    EXECUTION_POLICY_STACK.with(|stack| stack.borrow().last().cloned())
1490}
1491
1492fn policy_allows_tool(policy: &CapabilityPolicy, tool: &str) -> bool {
1493    policy.tools.is_empty() || policy.tools.iter().any(|allowed| allowed == tool)
1494}
1495
1496fn policy_allows_capability(policy: &CapabilityPolicy, capability: &str, op: &str) -> bool {
1497    policy.capabilities.is_empty()
1498        || policy
1499            .capabilities
1500            .get(capability)
1501            .is_some_and(|ops| ops.is_empty() || ops.iter().any(|allowed| allowed == op))
1502}
1503
1504fn policy_allows_side_effect(policy: &CapabilityPolicy, requested: &str) -> bool {
1505    fn rank(v: &str) -> usize {
1506        match v {
1507            "none" => 0,
1508            "read_only" => 1,
1509            "workspace_write" => 2,
1510            "process_exec" => 3,
1511            "network" => 4,
1512            _ => 5,
1513        }
1514    }
1515    policy
1516        .side_effect_level
1517        .as_ref()
1518        .map(|allowed| rank(allowed) >= rank(requested))
1519        .unwrap_or(true)
1520}
1521
1522fn reject_policy(reason: String) -> Result<(), VmError> {
1523    Err(VmError::CategorizedError {
1524        message: reason,
1525        category: crate::value::ErrorCategory::ToolRejected,
1526    })
1527}
1528
1529pub fn enforce_current_policy_for_builtin(name: &str, args: &[VmValue]) -> Result<(), VmError> {
1530    let Some(policy) = current_execution_policy() else {
1531        return Ok(());
1532    };
1533    match name {
1534        "read" | "read_file" => {
1535            if !policy_allows_tool(&policy, name)
1536                || !policy_allows_capability(&policy, "workspace", "read_text")
1537            {
1538                return reject_policy(format!(
1539                    "builtin '{name}' exceeds workspace.read_text ceiling"
1540                ));
1541            }
1542        }
1543        "search" | "list_dir" => {
1544            if !policy_allows_tool(&policy, name)
1545                || !policy_allows_capability(&policy, "workspace", "list")
1546            {
1547                return reject_policy(format!("builtin '{name}' exceeds workspace.list ceiling"));
1548            }
1549        }
1550        "file_exists" | "stat" => {
1551            if !policy_allows_capability(&policy, "workspace", "exists") {
1552                return reject_policy(format!("builtin '{name}' exceeds workspace.exists ceiling"));
1553            }
1554        }
1555        "edit" | "write_file" | "append_file" | "mkdir" | "copy_file" => {
1556            if !policy_allows_tool(&policy, "edit")
1557                || !policy_allows_capability(&policy, "workspace", "write_text")
1558                || !policy_allows_side_effect(&policy, "workspace_write")
1559            {
1560                return reject_policy(format!("builtin '{name}' exceeds workspace write ceiling"));
1561            }
1562        }
1563        "delete_file" => {
1564            if !policy_allows_capability(&policy, "workspace", "delete")
1565                || !policy_allows_side_effect(&policy, "workspace_write")
1566            {
1567                return reject_policy(
1568                    "builtin 'delete_file' exceeds workspace.delete ceiling".to_string(),
1569                );
1570            }
1571        }
1572        "apply_edit" => {
1573            if !policy_allows_capability(&policy, "workspace", "apply_edit")
1574                || !policy_allows_side_effect(&policy, "workspace_write")
1575            {
1576                return reject_policy(
1577                    "builtin 'apply_edit' exceeds workspace.apply_edit ceiling".to_string(),
1578                );
1579            }
1580        }
1581        "exec" | "exec_at" | "shell" | "shell_at" | "run_command" => {
1582            if !policy_allows_tool(&policy, "run")
1583                || !policy_allows_capability(&policy, "process", "exec")
1584                || !policy_allows_side_effect(&policy, "process_exec")
1585            {
1586                return reject_policy(format!("builtin '{name}' exceeds process.exec ceiling"));
1587            }
1588        }
1589        "http_get" | "http_post" | "http_put" | "http_patch" | "http_delete" | "http_request" => {
1590            if !policy_allows_side_effect(&policy, "network") {
1591                return reject_policy(format!("builtin '{name}' exceeds network ceiling"));
1592            }
1593        }
1594        "mcp_connect"
1595        | "mcp_call"
1596        | "mcp_list_tools"
1597        | "mcp_list_resources"
1598        | "mcp_list_resource_templates"
1599        | "mcp_read_resource"
1600        | "mcp_list_prompts"
1601        | "mcp_get_prompt"
1602        | "mcp_server_info"
1603        | "mcp_disconnect" => {
1604            if !policy_allows_tool(&policy, "run")
1605                || !policy_allows_capability(&policy, "process", "exec")
1606                || !policy_allows_side_effect(&policy, "process_exec")
1607            {
1608                return reject_policy(format!("builtin '{name}' exceeds process.exec ceiling"));
1609            }
1610        }
1611        "host_invoke" => {
1612            let capability = args.first().map(|v| v.display()).unwrap_or_default();
1613            let op = args.get(1).map(|v| v.display()).unwrap_or_default();
1614            if !policy_allows_capability(&policy, &capability, &op) {
1615                return reject_policy(format!(
1616                    "host_invoke {capability}.{op} exceeds capability ceiling"
1617                ));
1618            }
1619            let requested_side_effect = match (capability.as_str(), op.as_str()) {
1620                ("workspace", "write_text" | "apply_edit" | "delete") => "workspace_write",
1621                ("process", "exec") => "process_exec",
1622                _ => "read_only",
1623            };
1624            if !policy_allows_side_effect(&policy, requested_side_effect) {
1625                return reject_policy(format!(
1626                    "host_invoke {capability}.{op} exceeds side-effect ceiling"
1627                ));
1628            }
1629        }
1630        _ => {}
1631    }
1632    Ok(())
1633}
1634
1635pub fn enforce_current_policy_for_bridge_builtin(name: &str) -> Result<(), VmError> {
1636    if current_execution_policy().is_some() {
1637        return reject_policy(format!(
1638            "bridged builtin '{name}' exceeds execution policy; declare an explicit capability/tool surface instead"
1639        ));
1640    }
1641    Ok(())
1642}
1643
1644pub fn enforce_current_policy_for_tool(tool_name: &str) -> Result<(), VmError> {
1645    let Some(policy) = current_execution_policy() else {
1646        return Ok(());
1647    };
1648    if !policy_allows_tool(&policy, tool_name) {
1649        return reject_policy(format!("tool '{tool_name}' exceeds tool ceiling"));
1650    }
1651    Ok(())
1652}
1653
1654fn compact_transcript(transcript: &VmValue, keep_last: usize) -> Option<VmValue> {
1655    let dict = transcript.as_dict()?;
1656    let messages = match dict.get("messages") {
1657        Some(VmValue::List(list)) => list.iter().cloned().collect::<Vec<_>>(),
1658        _ => Vec::new(),
1659    };
1660    let retained = messages
1661        .into_iter()
1662        .rev()
1663        .take(keep_last)
1664        .collect::<Vec<_>>()
1665        .into_iter()
1666        .rev()
1667        .collect::<Vec<_>>();
1668    let mut compacted = dict.clone();
1669    compacted.insert(
1670        "messages".to_string(),
1671        VmValue::List(Rc::new(retained.clone())),
1672    );
1673    compacted.insert(
1674        "events".to_string(),
1675        VmValue::List(Rc::new(
1676            crate::llm::helpers::transcript_events_from_messages(&retained),
1677        )),
1678    );
1679    Some(VmValue::Dict(Rc::new(compacted)))
1680}
1681
1682fn redact_transcript_visibility(transcript: &VmValue, visibility: Option<&str>) -> Option<VmValue> {
1683    let Some(visibility) = visibility else {
1684        return Some(transcript.clone());
1685    };
1686    if visibility != "public" && visibility != "public_only" {
1687        return Some(transcript.clone());
1688    }
1689    let dict = transcript.as_dict()?;
1690    let public_messages = match dict.get("messages") {
1691        Some(VmValue::List(list)) => list
1692            .iter()
1693            .filter(|message| {
1694                message
1695                    .as_dict()
1696                    .and_then(|d| d.get("role"))
1697                    .map(|v| v.display())
1698                    .map(|role| role != "tool_result")
1699                    .unwrap_or(true)
1700            })
1701            .cloned()
1702            .collect::<Vec<_>>(),
1703        _ => Vec::new(),
1704    };
1705    let public_events = match dict.get("events") {
1706        Some(VmValue::List(list)) => list
1707            .iter()
1708            .filter(|event| {
1709                event
1710                    .as_dict()
1711                    .and_then(|d| d.get("visibility"))
1712                    .map(|v| v.display())
1713                    .map(|value| value == "public")
1714                    .unwrap_or(true)
1715            })
1716            .cloned()
1717            .collect::<Vec<_>>(),
1718        _ => Vec::new(),
1719    };
1720    let mut redacted = dict.clone();
1721    redacted.insert(
1722        "messages".to_string(),
1723        VmValue::List(Rc::new(public_messages)),
1724    );
1725    redacted.insert("events".to_string(), VmValue::List(Rc::new(public_events)));
1726    Some(VmValue::Dict(Rc::new(redacted)))
1727}
1728
1729pub(crate) fn apply_input_transcript_policy(
1730    transcript: Option<VmValue>,
1731    policy: &TranscriptPolicy,
1732) -> Option<VmValue> {
1733    let mut transcript = transcript;
1734    match policy.mode.as_deref() {
1735        Some("reset") => return None,
1736        Some("fork") => {
1737            if let Some(VmValue::Dict(dict)) = transcript.as_ref() {
1738                let mut forked = dict.as_ref().clone();
1739                forked.insert(
1740                    "id".to_string(),
1741                    VmValue::String(Rc::from(new_id("transcript"))),
1742                );
1743                transcript = Some(VmValue::Dict(Rc::new(forked)));
1744            }
1745        }
1746        _ => {}
1747    }
1748    if policy.compact {
1749        let keep_last = policy.keep_last.unwrap_or(6);
1750        transcript = transcript.and_then(|value| compact_transcript(&value, keep_last));
1751    }
1752    transcript
1753}
1754
1755fn apply_output_transcript_policy(
1756    transcript: Option<VmValue>,
1757    policy: &TranscriptPolicy,
1758) -> Option<VmValue> {
1759    let mut transcript = transcript;
1760    if policy.compact {
1761        let keep_last = policy.keep_last.unwrap_or(6);
1762        transcript = transcript.and_then(|value| compact_transcript(&value, keep_last));
1763    }
1764    transcript.and_then(|value| redact_transcript_visibility(&value, policy.visibility.as_deref()))
1765}
1766
1767pub async fn execute_stage_node(
1768    node_id: &str,
1769    node: &WorkflowNode,
1770    task: &str,
1771    artifacts: &[ArtifactRecord],
1772    transcript: Option<VmValue>,
1773) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
1774    let mut selection_policy = node.context_policy.clone();
1775    if selection_policy.include_kinds.is_empty() && !node.input_contract.input_kinds.is_empty() {
1776        selection_policy.include_kinds = node.input_contract.input_kinds.clone();
1777    }
1778    let selected = select_artifacts(artifacts.to_vec(), &selection_policy);
1779    let rendered_context = render_artifacts_context(&selected, &node.context_policy);
1780    let transcript = apply_input_transcript_policy(transcript, &node.transcript_policy);
1781    if node.input_contract.require_transcript && transcript.is_none() {
1782        return Err(VmError::Runtime(format!(
1783            "workflow stage {node_id} requires transcript input"
1784        )));
1785    }
1786    if let Some(min_inputs) = node.input_contract.min_inputs {
1787        if selected.len() < min_inputs {
1788            return Err(VmError::Runtime(format!(
1789                "workflow stage {node_id} requires at least {min_inputs} input artifacts"
1790            )));
1791        }
1792    }
1793    if let Some(max_inputs) = node.input_contract.max_inputs {
1794        if selected.len() > max_inputs {
1795            return Err(VmError::Runtime(format!(
1796                "workflow stage {node_id} accepts at most {max_inputs} input artifacts"
1797            )));
1798        }
1799    }
1800    let prompt = if rendered_context.is_empty() {
1801        task.to_string()
1802    } else {
1803        format!(
1804            "{rendered_context}\n\n{}:\n{task}",
1805            node.task_label
1806                .clone()
1807                .unwrap_or_else(|| "Task".to_string())
1808        )
1809    };
1810
1811    let mut options = BTreeMap::new();
1812    if let Some(provider) = &node.model_policy.provider {
1813        options.insert(
1814            "provider".to_string(),
1815            VmValue::String(Rc::from(provider.clone())),
1816        );
1817    }
1818    if let Some(model) = &node.model_policy.model {
1819        options.insert(
1820            "model".to_string(),
1821            VmValue::String(Rc::from(model.clone())),
1822        );
1823    }
1824    if let Some(model_tier) = &node.model_policy.model_tier {
1825        options.insert(
1826            "model_tier".to_string(),
1827            VmValue::String(Rc::from(model_tier.clone())),
1828        );
1829    }
1830    if let Some(temperature) = node.model_policy.temperature {
1831        options.insert("temperature".to_string(), VmValue::Float(temperature));
1832    }
1833    if let Some(max_tokens) = node.model_policy.max_tokens {
1834        options.insert("max_tokens".to_string(), VmValue::Int(max_tokens));
1835    }
1836    if !node.tools.is_empty() {
1837        options.insert(
1838            "tools".to_string(),
1839            VmValue::List(Rc::new(
1840                node.tools
1841                    .iter()
1842                    .map(|tool| VmValue::String(Rc::from(tool.clone())))
1843                    .collect(),
1844            )),
1845        );
1846    }
1847    if let Some(transcript) = transcript.clone() {
1848        options.insert("transcript".to_string(), transcript);
1849    }
1850
1851    let args = vec![
1852        VmValue::String(Rc::from(prompt)),
1853        node.system
1854            .clone()
1855            .map(|s| VmValue::String(Rc::from(s)))
1856            .unwrap_or(VmValue::Nil),
1857        VmValue::Dict(Rc::new(options)),
1858    ];
1859    let mut opts = extract_llm_options(&args)?;
1860
1861    let llm_result = if node.mode.as_deref() == Some("agent") || !node.tools.is_empty() {
1862        crate::llm::run_agent_loop_internal(
1863            &mut opts,
1864            crate::llm::AgentLoopConfig {
1865                persistent: true,
1866                max_iterations: 12,
1867                max_nudges: 3,
1868                nudge: None,
1869                tool_retries: 0,
1870                tool_backoff_ms: 1000,
1871                tool_format: "text".to_string(),
1872            },
1873        )
1874        .await?
1875    } else {
1876        let result = vm_call_llm_full(&opts).await?;
1877        crate::llm::agent_loop_result_from_llm(&result, opts)
1878    };
1879
1880    let visible_text = llm_result["text"].as_str().unwrap_or_default().to_string();
1881    let transcript = llm_result
1882        .get("transcript")
1883        .cloned()
1884        .map(|value| crate::stdlib::json_to_vm_value(&value));
1885    let transcript = apply_output_transcript_policy(transcript, &node.transcript_policy);
1886    let output_kind = node
1887        .output_contract
1888        .output_kinds
1889        .first()
1890        .cloned()
1891        .unwrap_or_else(|| {
1892            if node.kind == "verify" {
1893                "verification_result".to_string()
1894            } else {
1895                "artifact".to_string()
1896            }
1897        });
1898    let mut metadata = BTreeMap::new();
1899    metadata.insert(
1900        "input_artifact_ids".to_string(),
1901        serde_json::json!(selected
1902            .iter()
1903            .map(|artifact| artifact.id.clone())
1904            .collect::<Vec<_>>()),
1905    );
1906    metadata.insert("node_kind".to_string(), serde_json::json!(node.kind));
1907    let artifact = ArtifactRecord {
1908        type_name: "artifact".to_string(),
1909        id: new_id("artifact"),
1910        kind: output_kind,
1911        title: Some(format!("stage {node_id} output")),
1912        text: Some(visible_text),
1913        data: Some(llm_result.clone()),
1914        source: Some(node_id.to_string()),
1915        created_at: now_rfc3339(),
1916        freshness: Some("fresh".to_string()),
1917        priority: None,
1918        lineage: selected
1919            .iter()
1920            .map(|artifact| artifact.id.clone())
1921            .collect(),
1922        relevance: Some(1.0),
1923        estimated_tokens: None,
1924        stage: Some(node_id.to_string()),
1925        metadata,
1926    }
1927    .normalize();
1928
1929    Ok((llm_result, vec![artifact], transcript))
1930}
1931
1932pub fn next_nodes_for(
1933    graph: &WorkflowGraph,
1934    current: &str,
1935    branch: Option<&str>,
1936) -> Vec<WorkflowEdge> {
1937    let mut matching: Vec<WorkflowEdge> = graph
1938        .edges
1939        .iter()
1940        .filter(|edge| edge.from == current && edge.branch.as_deref() == branch)
1941        .cloned()
1942        .collect();
1943    if matching.is_empty() {
1944        matching = graph
1945            .edges
1946            .iter()
1947            .filter(|edge| edge.from == current && edge.branch.is_none())
1948            .cloned()
1949            .collect();
1950    }
1951    matching
1952}
1953
1954pub fn next_node_for(graph: &WorkflowGraph, current: &str, branch: &str) -> Option<String> {
1955    next_nodes_for(graph, current, Some(branch))
1956        .into_iter()
1957        .next()
1958        .map(|edge| edge.to)
1959}
1960
1961pub fn append_audit_entry(
1962    graph: &mut WorkflowGraph,
1963    op: &str,
1964    node_id: Option<String>,
1965    reason: Option<String>,
1966    metadata: BTreeMap<String, serde_json::Value>,
1967) {
1968    graph.audit_log.push(WorkflowAuditEntry {
1969        id: new_id("audit"),
1970        op: op.to_string(),
1971        node_id,
1972        timestamp: now_rfc3339(),
1973        reason,
1974        metadata,
1975    });
1976}
1977
1978pub fn builtin_ceiling() -> CapabilityPolicy {
1979    CapabilityPolicy {
1980        tools: vec![
1981            "read".to_string(),
1982            "read_file".to_string(),
1983            "search".to_string(),
1984            "edit".to_string(),
1985            "run".to_string(),
1986            "exec".to_string(),
1987            "outline".to_string(),
1988            "list_directory".to_string(),
1989            "lsp_hover".to_string(),
1990            "lsp_definition".to_string(),
1991            "lsp_references".to_string(),
1992            "web_search".to_string(),
1993            "web_fetch".to_string(),
1994        ],
1995        capabilities: BTreeMap::from([
1996            (
1997                "workspace".to_string(),
1998                vec![
1999                    "read_text".to_string(),
2000                    "write_text".to_string(),
2001                    "apply_edit".to_string(),
2002                    "delete".to_string(),
2003                    "exists".to_string(),
2004                    "list".to_string(),
2005                ],
2006            ),
2007            ("process".to_string(), vec!["exec".to_string()]),
2008        ]),
2009        workspace_roots: Vec::new(),
2010        side_effect_level: Some("network".to_string()),
2011        recursion_limit: Some(8),
2012    }
2013}
2014
2015#[cfg(test)]
2016mod tests {
2017    use super::*;
2018
2019    #[test]
2020    fn capability_intersection_rejects_privilege_expansion() {
2021        let ceiling = CapabilityPolicy {
2022            tools: vec!["read".to_string()],
2023            capabilities: BTreeMap::new(),
2024            workspace_roots: Vec::new(),
2025            side_effect_level: Some("read_only".to_string()),
2026            recursion_limit: Some(2),
2027        };
2028        let requested = CapabilityPolicy {
2029            tools: vec!["read".to_string(), "edit".to_string()],
2030            ..Default::default()
2031        };
2032        let error = ceiling.intersect(&requested).unwrap_err();
2033        assert!(error.contains("host ceiling"));
2034    }
2035
2036    #[test]
2037    fn active_execution_policy_rejects_unknown_bridge_builtin() {
2038        push_execution_policy(CapabilityPolicy {
2039            tools: vec!["read".to_string()],
2040            capabilities: BTreeMap::from([(
2041                "workspace".to_string(),
2042                vec!["read_text".to_string()],
2043            )]),
2044            workspace_roots: Vec::new(),
2045            side_effect_level: Some("read_only".to_string()),
2046            recursion_limit: Some(1),
2047        });
2048        let error = enforce_current_policy_for_bridge_builtin("custom_host_builtin").unwrap_err();
2049        pop_execution_policy();
2050        assert!(matches!(
2051            error,
2052            VmError::CategorizedError {
2053                category: crate::value::ErrorCategory::ToolRejected,
2054                ..
2055            }
2056        ));
2057    }
2058
2059    #[test]
2060    fn active_execution_policy_rejects_mcp_escape_hatch() {
2061        push_execution_policy(CapabilityPolicy {
2062            tools: vec!["read".to_string()],
2063            capabilities: BTreeMap::from([(
2064                "workspace".to_string(),
2065                vec!["read_text".to_string()],
2066            )]),
2067            workspace_roots: Vec::new(),
2068            side_effect_level: Some("read_only".to_string()),
2069            recursion_limit: Some(1),
2070        });
2071        let error = enforce_current_policy_for_builtin("mcp_connect", &[]).unwrap_err();
2072        pop_execution_policy();
2073        assert!(matches!(
2074            error,
2075            VmError::CategorizedError {
2076                category: crate::value::ErrorCategory::ToolRejected,
2077                ..
2078            }
2079        ));
2080    }
2081
2082    #[test]
2083    fn workflow_normalization_upgrades_legacy_act_verify_repair_shape() {
2084        let value = crate::stdlib::json_to_vm_value(&serde_json::json!({
2085            "name": "legacy",
2086            "act": {"mode": "llm"},
2087            "verify": {"kind": "verify"},
2088            "repair": {"mode": "agent"},
2089        }));
2090        let graph = normalize_workflow_value(&value).unwrap();
2091        assert_eq!(graph.type_name, "workflow_graph");
2092        assert!(graph.nodes.contains_key("act"));
2093        assert!(graph.nodes.contains_key("verify"));
2094        assert!(graph.nodes.contains_key("repair"));
2095        assert_eq!(graph.entry, "act");
2096    }
2097
2098    #[test]
2099    fn artifact_selection_honors_budget_and_priority() {
2100        let policy = ContextPolicy {
2101            max_artifacts: Some(2),
2102            max_tokens: Some(30),
2103            prefer_recent: true,
2104            prefer_fresh: true,
2105            prioritize_kinds: vec!["verification_result".to_string()],
2106            ..Default::default()
2107        };
2108        let artifacts = vec![
2109            ArtifactRecord {
2110                type_name: "artifact".to_string(),
2111                id: "a".to_string(),
2112                kind: "summary".to_string(),
2113                text: Some("short".to_string()),
2114                relevance: Some(0.9),
2115                created_at: now_rfc3339(),
2116                ..Default::default()
2117            }
2118            .normalize(),
2119            ArtifactRecord {
2120                type_name: "artifact".to_string(),
2121                id: "b".to_string(),
2122                kind: "summary".to_string(),
2123                text: Some("this is a much larger artifact body".to_string()),
2124                relevance: Some(1.0),
2125                created_at: now_rfc3339(),
2126                ..Default::default()
2127            }
2128            .normalize(),
2129            ArtifactRecord {
2130                type_name: "artifact".to_string(),
2131                id: "c".to_string(),
2132                kind: "summary".to_string(),
2133                text: Some("tiny".to_string()),
2134                relevance: Some(0.5),
2135                created_at: now_rfc3339(),
2136                ..Default::default()
2137            }
2138            .normalize(),
2139        ];
2140        let selected = select_artifacts(artifacts, &policy);
2141        assert_eq!(selected.len(), 2);
2142        assert!(selected.iter().all(|artifact| artifact.kind == "summary"));
2143    }
2144
2145    #[test]
2146    fn workflow_validation_rejects_condition_without_true_false_edges() {
2147        let graph = WorkflowGraph {
2148            entry: "gate".to_string(),
2149            nodes: BTreeMap::from([(
2150                "gate".to_string(),
2151                WorkflowNode {
2152                    id: Some("gate".to_string()),
2153                    kind: "condition".to_string(),
2154                    ..Default::default()
2155                },
2156            )]),
2157            edges: vec![WorkflowEdge {
2158                from: "gate".to_string(),
2159                to: "next".to_string(),
2160                branch: Some("true".to_string()),
2161                label: None,
2162            }],
2163            ..Default::default()
2164        };
2165        let report = validate_workflow(&graph, None);
2166        assert!(!report.valid);
2167        assert!(report
2168            .errors
2169            .iter()
2170            .any(|error| error.contains("true") && error.contains("false")));
2171    }
2172
2173    #[test]
2174    fn replay_fixture_round_trip_passes() {
2175        let run = RunRecord {
2176            type_name: "run_record".to_string(),
2177            id: "run_1".to_string(),
2178            workflow_id: "wf".to_string(),
2179            workflow_name: Some("demo".to_string()),
2180            task: "demo".to_string(),
2181            status: "completed".to_string(),
2182            started_at: "1".to_string(),
2183            finished_at: Some("2".to_string()),
2184            parent_run_id: None,
2185            root_run_id: Some("run_1".to_string()),
2186            stages: vec![RunStageRecord {
2187                id: "stage_1".to_string(),
2188                node_id: "act".to_string(),
2189                kind: "stage".to_string(),
2190                status: "completed".to_string(),
2191                outcome: "success".to_string(),
2192                branch: Some("success".to_string()),
2193                started_at: "1".to_string(),
2194                finished_at: Some("2".to_string()),
2195                visible_text: Some("done".to_string()),
2196                private_reasoning: None,
2197                transcript: None,
2198                verification: None,
2199                artifacts: vec![ArtifactRecord {
2200                    type_name: "artifact".to_string(),
2201                    id: "a1".to_string(),
2202                    kind: "summary".to_string(),
2203                    text: Some("done".to_string()),
2204                    created_at: "1".to_string(),
2205                    ..Default::default()
2206                }
2207                .normalize()],
2208                consumed_artifact_ids: vec![],
2209                produced_artifact_ids: vec!["a1".to_string()],
2210                attempts: vec![],
2211                metadata: BTreeMap::new(),
2212            }],
2213            transitions: vec![],
2214            checkpoints: vec![],
2215            pending_nodes: vec![],
2216            completed_nodes: vec!["act".to_string()],
2217            child_runs: vec![],
2218            artifacts: vec![],
2219            policy: CapabilityPolicy::default(),
2220            execution: None,
2221            transcript: None,
2222            replay_fixture: None,
2223            metadata: BTreeMap::new(),
2224            persisted_path: None,
2225        };
2226        let fixture = replay_fixture_from_run(&run);
2227        let report = evaluate_run_against_fixture(&run, &fixture);
2228        assert!(report.pass);
2229        assert!(report.failures.is_empty());
2230    }
2231
2232    #[test]
2233    fn replay_eval_suite_reports_failed_case() {
2234        let good = RunRecord {
2235            id: "run_good".to_string(),
2236            workflow_id: "wf".to_string(),
2237            status: "completed".to_string(),
2238            stages: vec![RunStageRecord {
2239                node_id: "act".to_string(),
2240                status: "completed".to_string(),
2241                outcome: "success".to_string(),
2242                ..Default::default()
2243            }],
2244            ..Default::default()
2245        };
2246        let bad = RunRecord {
2247            id: "run_bad".to_string(),
2248            workflow_id: "wf".to_string(),
2249            status: "failed".to_string(),
2250            stages: vec![RunStageRecord {
2251                node_id: "act".to_string(),
2252                status: "failed".to_string(),
2253                outcome: "error".to_string(),
2254                ..Default::default()
2255            }],
2256            ..Default::default()
2257        };
2258        let suite = evaluate_run_suite(vec![
2259            (
2260                good.clone(),
2261                replay_fixture_from_run(&good),
2262                Some("good.json".to_string()),
2263            ),
2264            (
2265                bad.clone(),
2266                replay_fixture_from_run(&good),
2267                Some("bad.json".to_string()),
2268            ),
2269        ]);
2270        assert!(!suite.pass);
2271        assert_eq!(suite.total, 2);
2272        assert_eq!(suite.failed, 1);
2273        assert!(suite.cases.iter().any(|case| !case.pass));
2274    }
2275
2276    #[test]
2277    fn run_diff_reports_changed_stage() {
2278        let left = RunRecord {
2279            id: "left".to_string(),
2280            workflow_id: "wf".to_string(),
2281            status: "completed".to_string(),
2282            stages: vec![RunStageRecord {
2283                node_id: "act".to_string(),
2284                status: "completed".to_string(),
2285                outcome: "success".to_string(),
2286                ..Default::default()
2287            }],
2288            ..Default::default()
2289        };
2290        let right = RunRecord {
2291            id: "right".to_string(),
2292            workflow_id: "wf".to_string(),
2293            status: "failed".to_string(),
2294            stages: vec![RunStageRecord {
2295                node_id: "act".to_string(),
2296                status: "failed".to_string(),
2297                outcome: "error".to_string(),
2298                ..Default::default()
2299            }],
2300            ..Default::default()
2301        };
2302        let diff = diff_run_records(&left, &right);
2303        assert!(diff.status_changed);
2304        assert!(!diff.identical);
2305        assert_eq!(diff.stage_diffs.len(), 1);
2306    }
2307
2308    #[test]
2309    fn eval_suite_manifest_can_fail_on_baseline_diff() {
2310        let temp_dir =
2311            std::env::temp_dir().join(format!("harn-eval-suite-{}", uuid::Uuid::now_v7()));
2312        std::fs::create_dir_all(&temp_dir).unwrap();
2313        let baseline_path = temp_dir.join("baseline.json");
2314        let candidate_path = temp_dir.join("candidate.json");
2315
2316        let baseline = RunRecord {
2317            id: "baseline".to_string(),
2318            workflow_id: "wf".to_string(),
2319            status: "completed".to_string(),
2320            stages: vec![RunStageRecord {
2321                node_id: "act".to_string(),
2322                status: "completed".to_string(),
2323                outcome: "success".to_string(),
2324                ..Default::default()
2325            }],
2326            ..Default::default()
2327        };
2328        let candidate = RunRecord {
2329            id: "candidate".to_string(),
2330            workflow_id: "wf".to_string(),
2331            status: "failed".to_string(),
2332            stages: vec![RunStageRecord {
2333                node_id: "act".to_string(),
2334                status: "failed".to_string(),
2335                outcome: "error".to_string(),
2336                ..Default::default()
2337            }],
2338            ..Default::default()
2339        };
2340
2341        save_run_record(&baseline, Some(baseline_path.to_str().unwrap())).unwrap();
2342        save_run_record(&candidate, Some(candidate_path.to_str().unwrap())).unwrap();
2343
2344        let manifest = EvalSuiteManifest {
2345            base_dir: Some(temp_dir.display().to_string()),
2346            cases: vec![EvalSuiteCase {
2347                label: Some("candidate".to_string()),
2348                run_path: "candidate.json".to_string(),
2349                fixture_path: None,
2350                compare_to: Some("baseline.json".to_string()),
2351            }],
2352            ..Default::default()
2353        };
2354        let suite = evaluate_run_suite_manifest(&manifest).unwrap();
2355        assert!(!suite.pass);
2356        assert_eq!(suite.failed, 1);
2357        assert!(suite.cases[0].comparison.is_some());
2358        assert!(suite.cases[0]
2359            .failures
2360            .iter()
2361            .any(|failure| failure.contains("baseline")));
2362    }
2363
2364    #[test]
2365    fn render_unified_diff_marks_removed_and_added_lines() {
2366        let diff = render_unified_diff(Some("src/main.rs"), "old\nsame", "new\nsame");
2367        assert!(diff.contains("--- a/src/main.rs"));
2368        assert!(diff.contains("+++ b/src/main.rs"));
2369        assert!(diff.contains("-old"));
2370        assert!(diff.contains("+new"));
2371        assert!(diff.contains(" same"));
2372    }
2373
2374    #[test]
2375    fn execution_policy_rejects_process_exec_when_read_only() {
2376        push_execution_policy(CapabilityPolicy {
2377            side_effect_level: Some("read_only".to_string()),
2378            capabilities: BTreeMap::from([("process".to_string(), vec!["exec".to_string()])]),
2379            ..Default::default()
2380        });
2381        let result = enforce_current_policy_for_builtin("exec", &[]);
2382        pop_execution_policy();
2383        assert!(result.is_err());
2384    }
2385
2386    #[test]
2387    fn execution_policy_rejects_unlisted_tool() {
2388        push_execution_policy(CapabilityPolicy {
2389            tools: vec!["read".to_string()],
2390            ..Default::default()
2391        });
2392        let result = enforce_current_policy_for_tool("edit");
2393        pop_execution_policy();
2394        assert!(result.is_err());
2395    }
2396}