1use std::collections::{BTreeMap, BTreeSet};
2use std::path::{Path, PathBuf};
3use std::rc::Rc;
4use std::{cell::RefCell, thread_local};
5
6use serde::{Deserialize, Serialize};
7
8use crate::llm::{extract_llm_options, vm_call_llm_full, vm_value_to_json};
9use crate::value::{VmError, VmValue};
10
11fn now_rfc3339() -> String {
12 use std::time::{SystemTime, UNIX_EPOCH};
13 let ts = SystemTime::now()
14 .duration_since(UNIX_EPOCH)
15 .unwrap_or_default()
16 .as_secs();
17 format!("{ts}")
18}
19
20fn new_id(prefix: &str) -> String {
21 format!("{prefix}_{}", uuid::Uuid::now_v7())
22}
23
24fn default_run_dir() -> PathBuf {
25 std::env::var("HARN_RUN_DIR")
26 .map(PathBuf::from)
27 .unwrap_or_else(|_| PathBuf::from(".harn-runs"))
28}
29
30thread_local! {
31 static EXECUTION_POLICY_STACK: RefCell<Vec<CapabilityPolicy>> = const { RefCell::new(Vec::new()) };
32}
33
34fn normalize_artifact_kind(kind: &str) -> String {
35 match kind {
36 "resource"
37 | "workspace_file"
38 | "editor_selection"
39 | "workspace_snapshot"
40 | "transcript_summary"
41 | "summary"
42 | "plan"
43 | "diff"
44 | "git_diff"
45 | "patch"
46 | "patch_set"
47 | "patch_proposal"
48 | "diff_review"
49 | "review_decision"
50 | "verification_bundle"
51 | "apply_intent"
52 | "verification_result"
53 | "test_result"
54 | "command_result"
55 | "provider_payload"
56 | "worker_result"
57 | "worker_notification"
58 | "artifact" => kind.to_string(),
59 "file" => "workspace_file".to_string(),
60 "transcript" => "transcript_summary".to_string(),
61 "verification" => "verification_result".to_string(),
62 "test" => "test_result".to_string(),
63 other if other.trim().is_empty() => "artifact".to_string(),
64 other => other.to_string(),
65 }
66}
67
68fn default_artifact_priority(kind: &str) -> i64 {
69 match kind {
70 "verification_result" | "test_result" => 100,
71 "verification_bundle" => 95,
72 "diff" | "git_diff" | "patch" | "patch_set" | "patch_proposal" | "diff_review"
73 | "review_decision" | "apply_intent" => 90,
74 "plan" => 80,
75 "workspace_file" | "workspace_snapshot" | "editor_selection" | "resource" => 70,
76 "summary" | "transcript_summary" => 60,
77 "command_result" => 50,
78 _ => 40,
79 }
80}
81
82fn freshness_rank(value: Option<&str>) -> i64 {
83 match value.unwrap_or_default() {
84 "fresh" | "live" => 3,
85 "recent" => 2,
86 "stale" => 0,
87 _ => 1,
88 }
89}
90
91#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
92#[serde(default)]
93pub struct CapabilityPolicy {
94 pub tools: Vec<String>,
95 pub capabilities: BTreeMap<String, Vec<String>>,
96 pub workspace_roots: Vec<String>,
97 pub side_effect_level: Option<String>,
98 pub recursion_limit: Option<usize>,
99}
100
101impl CapabilityPolicy {
102 pub fn intersect(&self, requested: &CapabilityPolicy) -> Result<CapabilityPolicy, String> {
103 let side_effect_level = match (&self.side_effect_level, &requested.side_effect_level) {
104 (Some(a), Some(b)) => Some(min_side_effect(a, b).to_string()),
105 (Some(a), None) => Some(a.clone()),
106 (None, Some(b)) => Some(b.clone()),
107 (None, None) => None,
108 };
109
110 if !self.tools.is_empty() {
111 let denied: Vec<String> = requested
112 .tools
113 .iter()
114 .filter(|tool| !self.tools.contains(*tool))
115 .cloned()
116 .collect();
117 if !denied.is_empty() {
118 return Err(format!(
119 "requested tools exceed host ceiling: {}",
120 denied.join(", ")
121 ));
122 }
123 }
124
125 for (capability, requested_ops) in &requested.capabilities {
126 if let Some(allowed_ops) = self.capabilities.get(capability) {
127 let denied: Vec<String> = requested_ops
128 .iter()
129 .filter(|op| !allowed_ops.contains(*op))
130 .cloned()
131 .collect();
132 if !denied.is_empty() {
133 return Err(format!(
134 "requested capability operations exceed host ceiling: {}.{}",
135 capability,
136 denied.join(",")
137 ));
138 }
139 } else if !self.capabilities.is_empty() {
140 return Err(format!(
141 "requested capability exceeds host ceiling: {capability}"
142 ));
143 }
144 }
145
146 let tools = if self.tools.is_empty() {
147 requested.tools.clone()
148 } else if requested.tools.is_empty() {
149 self.tools.clone()
150 } else {
151 requested
152 .tools
153 .iter()
154 .filter(|tool| self.tools.contains(*tool))
155 .cloned()
156 .collect()
157 };
158
159 let capabilities = if self.capabilities.is_empty() {
160 requested.capabilities.clone()
161 } else if requested.capabilities.is_empty() {
162 self.capabilities.clone()
163 } else {
164 requested
165 .capabilities
166 .iter()
167 .filter_map(|(capability, requested_ops)| {
168 self.capabilities.get(capability).map(|allowed_ops| {
169 (
170 capability.clone(),
171 requested_ops
172 .iter()
173 .filter(|op| allowed_ops.contains(*op))
174 .cloned()
175 .collect::<Vec<_>>(),
176 )
177 })
178 })
179 .collect()
180 };
181
182 let workspace_roots = if self.workspace_roots.is_empty() {
183 requested.workspace_roots.clone()
184 } else if requested.workspace_roots.is_empty() {
185 self.workspace_roots.clone()
186 } else {
187 requested
188 .workspace_roots
189 .iter()
190 .filter(|root| self.workspace_roots.contains(*root))
191 .cloned()
192 .collect()
193 };
194
195 let recursion_limit = match (self.recursion_limit, requested.recursion_limit) {
196 (Some(a), Some(b)) => Some(a.min(b)),
197 (Some(a), None) => Some(a),
198 (None, Some(b)) => Some(b),
199 (None, None) => None,
200 };
201
202 Ok(CapabilityPolicy {
203 tools,
204 capabilities,
205 workspace_roots,
206 side_effect_level,
207 recursion_limit,
208 })
209 }
210}
211
212fn min_side_effect<'a>(a: &'a str, b: &'a str) -> &'a str {
213 fn rank(v: &str) -> usize {
214 match v {
215 "none" => 0,
216 "read_only" => 1,
217 "workspace_write" => 2,
218 "process_exec" => 3,
219 "network" => 4,
220 _ => 5,
221 }
222 }
223 if rank(a) <= rank(b) {
224 a
225 } else {
226 b
227 }
228}
229
230#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
231#[serde(default)]
232pub struct ModelPolicy {
233 pub provider: Option<String>,
234 pub model: Option<String>,
235 pub model_tier: Option<String>,
236 pub temperature: Option<f64>,
237 pub max_tokens: Option<i64>,
238}
239
240#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
241#[serde(default)]
242pub struct TranscriptPolicy {
243 pub mode: Option<String>,
244 pub visibility: Option<String>,
245 pub summarize: bool,
246 pub compact: bool,
247 pub keep_last: Option<usize>,
248}
249
250#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
251#[serde(default)]
252pub struct ContextPolicy {
253 pub max_artifacts: Option<usize>,
254 pub max_tokens: Option<usize>,
255 pub reserve_tokens: Option<usize>,
256 pub include_kinds: Vec<String>,
257 pub exclude_kinds: Vec<String>,
258 pub prioritize_kinds: Vec<String>,
259 pub pinned_ids: Vec<String>,
260 pub include_stages: Vec<String>,
261 pub prefer_recent: bool,
262 pub prefer_fresh: bool,
263 pub render: Option<String>,
264}
265
266#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
267#[serde(default)]
268pub struct RetryPolicy {
269 pub max_attempts: usize,
270 pub verify: bool,
271 pub repair: bool,
272}
273
274#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
275#[serde(default)]
276pub struct StageContract {
277 pub input_kinds: Vec<String>,
278 pub output_kinds: Vec<String>,
279 pub min_inputs: Option<usize>,
280 pub max_inputs: Option<usize>,
281 pub require_transcript: bool,
282 pub schema: Option<serde_json::Value>,
283}
284
285#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
286#[serde(default)]
287pub struct BranchSemantics {
288 pub success: Option<String>,
289 pub failure: Option<String>,
290 pub verify_pass: Option<String>,
291 pub verify_fail: Option<String>,
292 pub condition_true: Option<String>,
293 pub condition_false: Option<String>,
294 pub loop_continue: Option<String>,
295 pub loop_exit: Option<String>,
296 pub escalation: Option<String>,
297}
298
299#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
300#[serde(default)]
301pub struct MapPolicy {
302 pub items: Vec<serde_json::Value>,
303 pub item_artifact_kind: Option<String>,
304 pub output_kind: Option<String>,
305 pub max_items: Option<usize>,
306}
307
308#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
309#[serde(default)]
310pub struct JoinPolicy {
311 pub strategy: String,
312 pub require_all_inputs: bool,
313 pub min_completed: Option<usize>,
314}
315
316#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
317#[serde(default)]
318pub struct ReducePolicy {
319 pub strategy: String,
320 pub separator: Option<String>,
321 pub output_kind: Option<String>,
322}
323
324#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
325#[serde(default)]
326pub struct EscalationPolicy {
327 pub level: Option<String>,
328 pub queue: Option<String>,
329 pub reason: Option<String>,
330}
331
332#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
333#[serde(default)]
334pub struct ArtifactRecord {
335 #[serde(rename = "_type")]
336 pub type_name: String,
337 pub id: String,
338 pub kind: String,
339 pub title: Option<String>,
340 pub text: Option<String>,
341 pub data: Option<serde_json::Value>,
342 pub source: Option<String>,
343 pub created_at: String,
344 pub freshness: Option<String>,
345 pub priority: Option<i64>,
346 pub lineage: Vec<String>,
347 pub relevance: Option<f64>,
348 pub estimated_tokens: Option<usize>,
349 pub stage: Option<String>,
350 pub metadata: BTreeMap<String, serde_json::Value>,
351}
352
353impl ArtifactRecord {
354 pub fn normalize(mut self) -> Self {
355 if self.type_name.is_empty() {
356 self.type_name = "artifact".to_string();
357 }
358 if self.id.is_empty() {
359 self.id = new_id("artifact");
360 }
361 if self.created_at.is_empty() {
362 self.created_at = now_rfc3339();
363 }
364 if self.kind.is_empty() {
365 self.kind = "artifact".to_string();
366 }
367 self.kind = normalize_artifact_kind(&self.kind);
368 if self.estimated_tokens.is_none() {
369 self.estimated_tokens = self
370 .text
371 .as_ref()
372 .map(|text| ((text.len() as f64) / 4.0).ceil() as usize);
373 }
374 if self.priority.is_none() {
375 self.priority = Some(default_artifact_priority(&self.kind));
376 }
377 self
378 }
379}
380
381#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
382#[serde(default)]
383pub struct WorkflowNode {
384 pub id: Option<String>,
385 pub kind: String,
386 pub mode: Option<String>,
387 pub prompt: Option<String>,
388 pub system: Option<String>,
389 pub task_label: Option<String>,
390 pub tools: Vec<String>,
391 pub model_policy: ModelPolicy,
392 pub transcript_policy: TranscriptPolicy,
393 pub context_policy: ContextPolicy,
394 pub retry_policy: RetryPolicy,
395 pub capability_policy: CapabilityPolicy,
396 pub input_contract: StageContract,
397 pub output_contract: StageContract,
398 pub branch_semantics: BranchSemantics,
399 pub map_policy: MapPolicy,
400 pub join_policy: JoinPolicy,
401 pub reduce_policy: ReducePolicy,
402 pub escalation_policy: EscalationPolicy,
403 pub verify: Option<serde_json::Value>,
404 pub metadata: BTreeMap<String, serde_json::Value>,
405}
406
407#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
408#[serde(default)]
409pub struct WorkflowEdge {
410 pub from: String,
411 pub to: String,
412 pub branch: Option<String>,
413 pub label: Option<String>,
414}
415
416#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
417#[serde(default)]
418pub struct WorkflowGraph {
419 #[serde(rename = "_type")]
420 pub type_name: String,
421 pub id: String,
422 pub name: Option<String>,
423 pub version: usize,
424 pub entry: String,
425 pub nodes: BTreeMap<String, WorkflowNode>,
426 pub edges: Vec<WorkflowEdge>,
427 pub capability_policy: CapabilityPolicy,
428 pub metadata: BTreeMap<String, serde_json::Value>,
429 pub audit_log: Vec<WorkflowAuditEntry>,
430}
431
432#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
433#[serde(default)]
434pub struct WorkflowAuditEntry {
435 pub id: String,
436 pub op: String,
437 pub node_id: Option<String>,
438 pub timestamp: String,
439 pub reason: Option<String>,
440 pub metadata: BTreeMap<String, serde_json::Value>,
441}
442
443#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
444#[serde(default)]
445pub struct RunStageRecord {
446 pub id: String,
447 pub node_id: String,
448 pub kind: String,
449 pub status: String,
450 pub outcome: String,
451 pub branch: Option<String>,
452 pub started_at: String,
453 pub finished_at: Option<String>,
454 pub visible_text: Option<String>,
455 pub private_reasoning: Option<String>,
456 pub transcript: Option<serde_json::Value>,
457 pub verification: Option<serde_json::Value>,
458 pub artifacts: Vec<ArtifactRecord>,
459 pub consumed_artifact_ids: Vec<String>,
460 pub produced_artifact_ids: Vec<String>,
461 pub attempts: Vec<RunStageAttemptRecord>,
462 pub metadata: BTreeMap<String, serde_json::Value>,
463}
464
465#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
466#[serde(default)]
467pub struct RunStageAttemptRecord {
468 pub attempt: usize,
469 pub status: String,
470 pub outcome: String,
471 pub branch: Option<String>,
472 pub error: Option<String>,
473 pub verification: Option<serde_json::Value>,
474 pub started_at: String,
475 pub finished_at: Option<String>,
476}
477
478#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
479#[serde(default)]
480pub struct RunTransitionRecord {
481 pub id: String,
482 pub from_stage_id: Option<String>,
483 pub from_node_id: Option<String>,
484 pub to_node_id: String,
485 pub branch: Option<String>,
486 pub timestamp: String,
487 pub consumed_artifact_ids: Vec<String>,
488 pub produced_artifact_ids: Vec<String>,
489}
490
491#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
492#[serde(default)]
493pub struct RunCheckpointRecord {
494 pub id: String,
495 pub ready_nodes: Vec<String>,
496 pub completed_nodes: Vec<String>,
497 pub last_stage_id: Option<String>,
498 pub persisted_at: String,
499 pub reason: String,
500}
501
502#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
503#[serde(default)]
504pub struct ReplayFixture {
505 #[serde(rename = "_type")]
506 pub type_name: String,
507 pub id: String,
508 pub source_run_id: String,
509 pub workflow_id: String,
510 pub workflow_name: Option<String>,
511 pub created_at: String,
512 pub expected_status: String,
513 pub stage_assertions: Vec<ReplayStageAssertion>,
514}
515
516#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
517#[serde(default)]
518pub struct ReplayStageAssertion {
519 pub node_id: String,
520 pub expected_status: String,
521 pub expected_outcome: String,
522 pub expected_branch: Option<String>,
523 pub required_artifact_kinds: Vec<String>,
524 pub visible_text_contains: Option<String>,
525}
526
527#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
528#[serde(default)]
529pub struct ReplayEvalReport {
530 pub pass: bool,
531 pub failures: Vec<String>,
532 pub stage_count: usize,
533}
534
535#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
536#[serde(default)]
537pub struct ReplayEvalCaseReport {
538 pub run_id: String,
539 pub workflow_id: String,
540 pub label: Option<String>,
541 pub pass: bool,
542 pub failures: Vec<String>,
543 pub stage_count: usize,
544 pub source_path: Option<String>,
545 pub comparison: Option<RunDiffReport>,
546}
547
548#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
549#[serde(default)]
550pub struct ReplayEvalSuiteReport {
551 pub pass: bool,
552 pub total: usize,
553 pub passed: usize,
554 pub failed: usize,
555 pub cases: Vec<ReplayEvalCaseReport>,
556}
557
558#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
559#[serde(default)]
560pub struct RunStageDiffRecord {
561 pub node_id: String,
562 pub change: String,
563 pub details: Vec<String>,
564}
565
566#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
567#[serde(default)]
568pub struct RunDiffReport {
569 pub left_run_id: String,
570 pub right_run_id: String,
571 pub identical: bool,
572 pub status_changed: bool,
573 pub left_status: String,
574 pub right_status: String,
575 pub stage_diffs: Vec<RunStageDiffRecord>,
576 pub transition_count_delta: isize,
577 pub artifact_count_delta: isize,
578 pub checkpoint_count_delta: isize,
579}
580
581#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
582#[serde(default)]
583pub struct EvalSuiteManifest {
584 #[serde(rename = "_type")]
585 pub type_name: String,
586 pub id: String,
587 pub name: Option<String>,
588 pub base_dir: Option<String>,
589 pub cases: Vec<EvalSuiteCase>,
590}
591
592#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
593#[serde(default)]
594pub struct EvalSuiteCase {
595 pub label: Option<String>,
596 pub run_path: String,
597 pub fixture_path: Option<String>,
598 pub compare_to: Option<String>,
599}
600
601#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
602#[serde(default)]
603pub struct RunRecord {
604 #[serde(rename = "_type")]
605 pub type_name: String,
606 pub id: String,
607 pub workflow_id: String,
608 pub workflow_name: Option<String>,
609 pub task: String,
610 pub status: String,
611 pub started_at: String,
612 pub finished_at: Option<String>,
613 pub parent_run_id: Option<String>,
614 pub root_run_id: Option<String>,
615 pub stages: Vec<RunStageRecord>,
616 pub transitions: Vec<RunTransitionRecord>,
617 pub checkpoints: Vec<RunCheckpointRecord>,
618 pub pending_nodes: Vec<String>,
619 pub completed_nodes: Vec<String>,
620 pub child_runs: Vec<RunChildRecord>,
621 pub artifacts: Vec<ArtifactRecord>,
622 pub policy: CapabilityPolicy,
623 pub execution: Option<RunExecutionRecord>,
624 pub transcript: Option<serde_json::Value>,
625 pub replay_fixture: Option<ReplayFixture>,
626 pub metadata: BTreeMap<String, serde_json::Value>,
627 pub persisted_path: Option<String>,
628}
629
630#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
631#[serde(default)]
632pub struct RunChildRecord {
633 pub worker_id: String,
634 pub worker_name: String,
635 pub parent_stage_id: Option<String>,
636 pub task: String,
637 pub status: String,
638 pub started_at: String,
639 pub finished_at: Option<String>,
640 pub run_id: Option<String>,
641 pub run_path: Option<String>,
642 pub snapshot_path: Option<String>,
643 pub execution: Option<RunExecutionRecord>,
644}
645
646#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
647#[serde(default)]
648pub struct RunExecutionRecord {
649 pub cwd: Option<String>,
650 pub source_dir: Option<String>,
651 pub env: BTreeMap<String, String>,
652 pub adapter: Option<String>,
653 pub repo_path: Option<String>,
654 pub worktree_path: Option<String>,
655 pub branch: Option<String>,
656 pub base_ref: Option<String>,
657 pub cleanup: Option<String>,
658}
659
660#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
661#[serde(default)]
662pub struct WorkflowValidationReport {
663 pub valid: bool,
664 pub errors: Vec<String>,
665 pub warnings: Vec<String>,
666 pub reachable_nodes: Vec<String>,
667}
668
669fn parse_json_value<T: for<'de> Deserialize<'de>>(value: &VmValue) -> Result<T, VmError> {
670 serde_json::from_value(vm_value_to_json(value))
671 .map_err(|e| VmError::Runtime(format!("orchestration parse error: {e}")))
672}
673
674pub fn normalize_workflow_value(value: &VmValue) -> Result<WorkflowGraph, VmError> {
675 let mut graph: WorkflowGraph = parse_json_value(value)?;
676 let as_dict = value.as_dict().cloned().unwrap_or_default();
677
678 if graph.nodes.is_empty() {
679 for key in ["act", "verify", "repair"] {
680 if let Some(node_value) = as_dict.get(key) {
681 let mut node: WorkflowNode = parse_json_value(node_value)?;
682 let raw_node = node_value.as_dict().cloned().unwrap_or_default();
683 node.id = Some(key.to_string());
684 if node.kind.is_empty() {
685 node.kind = if key == "verify" {
686 "verify".to_string()
687 } else {
688 "stage".to_string()
689 };
690 }
691 if node.model_policy.provider.is_none() {
692 node.model_policy.provider = as_dict
693 .get("provider")
694 .map(|value| value.display())
695 .filter(|value| !value.is_empty());
696 }
697 if node.model_policy.model.is_none() {
698 node.model_policy.model = as_dict
699 .get("model")
700 .map(|value| value.display())
701 .filter(|value| !value.is_empty());
702 }
703 if node.model_policy.model_tier.is_none() {
704 node.model_policy.model_tier = as_dict
705 .get("model_tier")
706 .or_else(|| as_dict.get("tier"))
707 .map(|value| value.display())
708 .filter(|value| !value.is_empty());
709 }
710 if node.model_policy.temperature.is_none() {
711 node.model_policy.temperature = as_dict.get("temperature").and_then(|value| {
712 if let VmValue::Float(number) = value {
713 Some(*number)
714 } else {
715 value.as_int().map(|number| number as f64)
716 }
717 });
718 }
719 if node.model_policy.max_tokens.is_none() {
720 node.model_policy.max_tokens =
721 as_dict.get("max_tokens").and_then(|value| value.as_int());
722 }
723 if node.mode.is_none() {
724 node.mode = as_dict
725 .get("mode")
726 .map(|value| value.display())
727 .filter(|value| !value.is_empty());
728 }
729 if key == "verify"
730 && node.verify.is_none()
731 && (raw_node.contains_key("assert_text")
732 || raw_node.contains_key("command")
733 || raw_node.contains_key("expect_status")
734 || raw_node.contains_key("expect_text"))
735 {
736 node.verify = Some(serde_json::json!({
737 "assert_text": raw_node.get("assert_text").map(vm_value_to_json),
738 "command": raw_node.get("command").map(vm_value_to_json),
739 "expect_status": raw_node.get("expect_status").map(vm_value_to_json),
740 "expect_text": raw_node.get("expect_text").map(vm_value_to_json),
741 }));
742 }
743 graph.nodes.insert(key.to_string(), node);
744 }
745 }
746 if graph.entry.is_empty() && graph.nodes.contains_key("act") {
747 graph.entry = "act".to_string();
748 }
749 if graph.edges.is_empty() && graph.nodes.contains_key("act") {
750 if graph.nodes.contains_key("verify") {
751 graph.edges.push(WorkflowEdge {
752 from: "act".to_string(),
753 to: "verify".to_string(),
754 branch: None,
755 label: None,
756 });
757 }
758 if graph.nodes.contains_key("repair") {
759 graph.edges.push(WorkflowEdge {
760 from: "verify".to_string(),
761 to: "repair".to_string(),
762 branch: Some("failed".to_string()),
763 label: None,
764 });
765 graph.edges.push(WorkflowEdge {
766 from: "repair".to_string(),
767 to: "verify".to_string(),
768 branch: Some("retry".to_string()),
769 label: None,
770 });
771 }
772 }
773 }
774
775 if graph.type_name.is_empty() {
776 graph.type_name = "workflow_graph".to_string();
777 }
778 if graph.id.is_empty() {
779 graph.id = new_id("workflow");
780 }
781 if graph.version == 0 {
782 graph.version = 1;
783 }
784 if graph.entry.is_empty() {
785 graph.entry = graph
786 .nodes
787 .keys()
788 .next()
789 .cloned()
790 .unwrap_or_else(|| "act".to_string());
791 }
792 for (node_id, node) in &mut graph.nodes {
793 if node.id.is_none() {
794 node.id = Some(node_id.clone());
795 }
796 if node.kind.is_empty() {
797 node.kind = "stage".to_string();
798 }
799 if node.join_policy.strategy.is_empty() {
800 node.join_policy.strategy = "all".to_string();
801 }
802 if node.reduce_policy.strategy.is_empty() {
803 node.reduce_policy.strategy = "concat".to_string();
804 }
805 if node.output_contract.output_kinds.is_empty() {
806 node.output_contract.output_kinds = vec![match node.kind.as_str() {
807 "verify" => "verification_result".to_string(),
808 "reduce" => node
809 .reduce_policy
810 .output_kind
811 .clone()
812 .unwrap_or_else(|| "summary".to_string()),
813 "map" => node
814 .map_policy
815 .output_kind
816 .clone()
817 .unwrap_or_else(|| "artifact".to_string()),
818 "escalation" => "plan".to_string(),
819 _ => "artifact".to_string(),
820 }];
821 }
822 if node.retry_policy.max_attempts == 0 {
823 node.retry_policy.max_attempts = 1;
824 }
825 }
826 Ok(graph)
827}
828
829pub fn validate_workflow(
830 graph: &WorkflowGraph,
831 ceiling: Option<&CapabilityPolicy>,
832) -> WorkflowValidationReport {
833 let mut errors = Vec::new();
834 let mut warnings = Vec::new();
835
836 if !graph.nodes.contains_key(&graph.entry) {
837 errors.push(format!("entry node does not exist: {}", graph.entry));
838 }
839
840 let node_ids: BTreeSet<String> = graph.nodes.keys().cloned().collect();
841 for edge in &graph.edges {
842 if !node_ids.contains(&edge.from) {
843 errors.push(format!("edge.from references unknown node: {}", edge.from));
844 }
845 if !node_ids.contains(&edge.to) {
846 errors.push(format!("edge.to references unknown node: {}", edge.to));
847 }
848 }
849
850 let reachable_nodes = reachable_nodes(graph);
851 for node_id in &node_ids {
852 if !reachable_nodes.contains(node_id) {
853 warnings.push(format!("node is unreachable: {node_id}"));
854 }
855 }
856
857 for (node_id, node) in &graph.nodes {
858 let incoming = graph
859 .edges
860 .iter()
861 .filter(|edge| edge.to == *node_id)
862 .count();
863 let outgoing: Vec<&WorkflowEdge> = graph
864 .edges
865 .iter()
866 .filter(|edge| edge.from == *node_id)
867 .collect();
868 if let Some(min_inputs) = node.input_contract.min_inputs {
869 if let Some(max_inputs) = node.input_contract.max_inputs {
870 if min_inputs > max_inputs {
871 errors.push(format!(
872 "node {node_id}: input contract min_inputs exceeds max_inputs"
873 ));
874 }
875 }
876 }
877 match node.kind.as_str() {
878 "condition" => {
879 let has_true = outgoing
880 .iter()
881 .any(|edge| edge.branch.as_deref() == Some("true"));
882 let has_false = outgoing
883 .iter()
884 .any(|edge| edge.branch.as_deref() == Some("false"));
885 if !has_true || !has_false {
886 errors.push(format!(
887 "node {node_id}: condition nodes require both 'true' and 'false' branch edges"
888 ));
889 }
890 }
891 "fork" => {
892 if outgoing.len() < 2 {
893 errors.push(format!(
894 "node {node_id}: fork nodes require at least two outgoing edges"
895 ));
896 }
897 }
898 "join" => {
899 if incoming < 2 {
900 warnings.push(format!(
901 "node {node_id}: join node has fewer than two incoming edges"
902 ));
903 }
904 }
905 "map" => {
906 if node.map_policy.items.is_empty()
907 && node.map_policy.item_artifact_kind.is_none()
908 && node.input_contract.input_kinds.is_empty()
909 {
910 errors.push(format!(
911 "node {node_id}: map nodes require items, item_artifact_kind, or input_contract.input_kinds"
912 ));
913 }
914 }
915 "reduce" => {
916 if node.input_contract.input_kinds.is_empty() {
917 warnings.push(format!(
918 "node {node_id}: reduce node has no input_contract.input_kinds; it will consume all available artifacts"
919 ));
920 }
921 }
922 _ => {}
923 }
924 }
925
926 if let Some(ceiling) = ceiling {
927 if let Err(error) = ceiling.intersect(&graph.capability_policy) {
928 errors.push(error);
929 }
930 for (node_id, node) in &graph.nodes {
931 if let Err(error) = ceiling.intersect(&node.capability_policy) {
932 errors.push(format!("node {node_id}: {error}"));
933 }
934 }
935 }
936
937 WorkflowValidationReport {
938 valid: errors.is_empty(),
939 errors,
940 warnings,
941 reachable_nodes: reachable_nodes.into_iter().collect(),
942 }
943}
944
945fn reachable_nodes(graph: &WorkflowGraph) -> BTreeSet<String> {
946 let mut seen = BTreeSet::new();
947 let mut stack = vec![graph.entry.clone()];
948 while let Some(node_id) = stack.pop() {
949 if !seen.insert(node_id.clone()) {
950 continue;
951 }
952 for edge in graph.edges.iter().filter(|edge| edge.from == node_id) {
953 stack.push(edge.to.clone());
954 }
955 }
956 seen
957}
958
959pub fn select_artifacts(
960 mut artifacts: Vec<ArtifactRecord>,
961 policy: &ContextPolicy,
962) -> Vec<ArtifactRecord> {
963 artifacts.retain(|artifact| {
964 (policy.include_kinds.is_empty() || policy.include_kinds.contains(&artifact.kind))
965 && !policy.exclude_kinds.contains(&artifact.kind)
966 && (policy.include_stages.is_empty()
967 || artifact
968 .stage
969 .as_ref()
970 .is_some_and(|stage| policy.include_stages.contains(stage)))
971 });
972 artifacts.sort_by(|a, b| {
973 let b_pinned = policy.pinned_ids.contains(&b.id);
974 let a_pinned = policy.pinned_ids.contains(&a.id);
975 b_pinned
976 .cmp(&a_pinned)
977 .then_with(|| {
978 let b_prio_kind = policy.prioritize_kinds.contains(&b.kind);
979 let a_prio_kind = policy.prioritize_kinds.contains(&a.kind);
980 b_prio_kind.cmp(&a_prio_kind)
981 })
982 .then_with(|| {
983 b.priority
984 .unwrap_or_default()
985 .cmp(&a.priority.unwrap_or_default())
986 })
987 .then_with(|| {
988 if policy.prefer_fresh {
989 freshness_rank(b.freshness.as_deref())
990 .cmp(&freshness_rank(a.freshness.as_deref()))
991 } else {
992 std::cmp::Ordering::Equal
993 }
994 })
995 .then_with(|| {
996 if policy.prefer_recent {
997 b.created_at.cmp(&a.created_at)
998 } else {
999 std::cmp::Ordering::Equal
1000 }
1001 })
1002 .then_with(|| {
1003 b.relevance
1004 .partial_cmp(&a.relevance)
1005 .unwrap_or(std::cmp::Ordering::Equal)
1006 })
1007 .then_with(|| {
1008 a.estimated_tokens
1009 .unwrap_or(usize::MAX)
1010 .cmp(&b.estimated_tokens.unwrap_or(usize::MAX))
1011 })
1012 });
1013
1014 let mut selected = Vec::new();
1015 let mut used_tokens = 0usize;
1016 let reserve_tokens = policy.reserve_tokens.unwrap_or(0);
1017 let effective_max_tokens = policy
1018 .max_tokens
1019 .map(|max| max.saturating_sub(reserve_tokens));
1020 for artifact in artifacts {
1021 if let Some(max_artifacts) = policy.max_artifacts {
1022 if selected.len() >= max_artifacts {
1023 break;
1024 }
1025 }
1026 let next_tokens = artifact.estimated_tokens.unwrap_or(0);
1027 if let Some(max_tokens) = effective_max_tokens {
1028 if used_tokens + next_tokens > max_tokens {
1029 continue;
1030 }
1031 }
1032 used_tokens += next_tokens;
1033 selected.push(artifact);
1034 }
1035 selected
1036}
1037
1038pub fn render_artifacts_context(artifacts: &[ArtifactRecord], policy: &ContextPolicy) -> String {
1039 let mut parts = Vec::new();
1040 for artifact in artifacts {
1041 let title = artifact
1042 .title
1043 .clone()
1044 .unwrap_or_else(|| format!("{} {}", artifact.kind, artifact.id));
1045 let body = artifact
1046 .text
1047 .clone()
1048 .or_else(|| artifact.data.as_ref().map(|v| v.to_string()))
1049 .unwrap_or_default();
1050 match policy.render.as_deref() {
1051 Some("json") => {
1052 parts.push(
1053 serde_json::json!({
1054 "id": artifact.id,
1055 "kind": artifact.kind,
1056 "title": title,
1057 "source": artifact.source,
1058 "freshness": artifact.freshness,
1059 "priority": artifact.priority,
1060 "text": body,
1061 })
1062 .to_string(),
1063 );
1064 }
1065 _ => parts.push(format!(
1066 "[{title}] kind={} source={} freshness={} priority={}\n{}",
1067 artifact.kind,
1068 artifact
1069 .source
1070 .clone()
1071 .unwrap_or_else(|| "unknown".to_string()),
1072 artifact
1073 .freshness
1074 .clone()
1075 .unwrap_or_else(|| "normal".to_string()),
1076 artifact.priority.unwrap_or_default(),
1077 body
1078 )),
1079 }
1080 }
1081 parts.join("\n\n")
1082}
1083
1084pub fn normalize_artifact(value: &VmValue) -> Result<ArtifactRecord, VmError> {
1085 let artifact: ArtifactRecord = parse_json_value(value)?;
1086 Ok(artifact.normalize())
1087}
1088
1089pub fn normalize_run_record(value: &VmValue) -> Result<RunRecord, VmError> {
1090 let mut run: RunRecord = parse_json_value(value)?;
1091 if run.type_name.is_empty() {
1092 run.type_name = "run_record".to_string();
1093 }
1094 if run.id.is_empty() {
1095 run.id = new_id("run");
1096 }
1097 if run.started_at.is_empty() {
1098 run.started_at = now_rfc3339();
1099 }
1100 if run.status.is_empty() {
1101 run.status = "running".to_string();
1102 }
1103 if run.root_run_id.is_none() {
1104 run.root_run_id = Some(run.id.clone());
1105 }
1106 if run.replay_fixture.is_none() {
1107 run.replay_fixture = Some(replay_fixture_from_run(&run));
1108 }
1109 Ok(run)
1110}
1111
1112pub fn normalize_eval_suite_manifest(value: &VmValue) -> Result<EvalSuiteManifest, VmError> {
1113 let mut manifest: EvalSuiteManifest = parse_json_value(value)?;
1114 if manifest.type_name.is_empty() {
1115 manifest.type_name = "eval_suite_manifest".to_string();
1116 }
1117 if manifest.id.is_empty() {
1118 manifest.id = new_id("eval_suite");
1119 }
1120 Ok(manifest)
1121}
1122
1123fn load_replay_fixture(path: &Path) -> Result<ReplayFixture, VmError> {
1124 let content = std::fs::read_to_string(path)
1125 .map_err(|e| VmError::Runtime(format!("failed to read replay fixture: {e}")))?;
1126 serde_json::from_str(&content)
1127 .map_err(|e| VmError::Runtime(format!("failed to parse replay fixture: {e}")))
1128}
1129
1130fn resolve_manifest_path(base_dir: Option<&Path>, path: &str) -> PathBuf {
1131 let path_buf = PathBuf::from(path);
1132 if path_buf.is_absolute() {
1133 path_buf
1134 } else if let Some(base_dir) = base_dir {
1135 base_dir.join(path_buf)
1136 } else {
1137 path_buf
1138 }
1139}
1140
1141pub fn evaluate_run_suite_manifest(
1142 manifest: &EvalSuiteManifest,
1143) -> Result<ReplayEvalSuiteReport, VmError> {
1144 let base_dir = manifest.base_dir.as_deref().map(Path::new);
1145 let mut reports = Vec::new();
1146 for case in &manifest.cases {
1147 let run_path = resolve_manifest_path(base_dir, &case.run_path);
1148 let run = load_run_record(&run_path)?;
1149 let fixture = match &case.fixture_path {
1150 Some(path) => load_replay_fixture(&resolve_manifest_path(base_dir, path))?,
1151 None => run
1152 .replay_fixture
1153 .clone()
1154 .unwrap_or_else(|| replay_fixture_from_run(&run)),
1155 };
1156 let eval = evaluate_run_against_fixture(&run, &fixture);
1157 let mut pass = eval.pass;
1158 let mut failures = eval.failures;
1159 let comparison = match &case.compare_to {
1160 Some(path) => {
1161 let baseline_path = resolve_manifest_path(base_dir, path);
1162 let baseline = load_run_record(&baseline_path)?;
1163 let diff = diff_run_records(&baseline, &run);
1164 if !diff.identical {
1165 pass = false;
1166 failures.push(format!(
1167 "run differs from baseline {} with {} stage changes",
1168 baseline_path.display(),
1169 diff.stage_diffs.len()
1170 ));
1171 }
1172 Some(diff)
1173 }
1174 None => None,
1175 };
1176 reports.push(ReplayEvalCaseReport {
1177 run_id: run.id.clone(),
1178 workflow_id: run.workflow_id.clone(),
1179 label: case.label.clone(),
1180 pass,
1181 failures,
1182 stage_count: eval.stage_count,
1183 source_path: Some(run_path.display().to_string()),
1184 comparison,
1185 });
1186 }
1187 let total = reports.len();
1188 let passed = reports.iter().filter(|report| report.pass).count();
1189 let failed = total.saturating_sub(passed);
1190 Ok(ReplayEvalSuiteReport {
1191 pass: failed == 0,
1192 total,
1193 passed,
1194 failed,
1195 cases: reports,
1196 })
1197}
1198
1199pub fn render_unified_diff(path: Option<&str>, before: &str, after: &str) -> String {
1200 let before_lines: Vec<&str> = before.lines().collect();
1201 let after_lines: Vec<&str> = after.lines().collect();
1202 let mut table = vec![vec![0usize; after_lines.len() + 1]; before_lines.len() + 1];
1203 for i in (0..before_lines.len()).rev() {
1204 for j in (0..after_lines.len()).rev() {
1205 table[i][j] = if before_lines[i] == after_lines[j] {
1206 table[i + 1][j + 1] + 1
1207 } else {
1208 table[i + 1][j].max(table[i][j + 1])
1209 };
1210 }
1211 }
1212
1213 let mut diff = String::new();
1214 let file = path.unwrap_or("artifact");
1215 diff.push_str(&format!("--- a/{file}\n+++ b/{file}\n"));
1216 let mut i = 0;
1217 let mut j = 0;
1218 while i < before_lines.len() && j < after_lines.len() {
1219 if before_lines[i] == after_lines[j] {
1220 diff.push_str(&format!(" {}\n", before_lines[i]));
1221 i += 1;
1222 j += 1;
1223 } else if table[i + 1][j] >= table[i][j + 1] {
1224 diff.push_str(&format!("-{}\n", before_lines[i]));
1225 i += 1;
1226 } else {
1227 diff.push_str(&format!("+{}\n", after_lines[j]));
1228 j += 1;
1229 }
1230 }
1231 while i < before_lines.len() {
1232 diff.push_str(&format!("-{}\n", before_lines[i]));
1233 i += 1;
1234 }
1235 while j < after_lines.len() {
1236 diff.push_str(&format!("+{}\n", after_lines[j]));
1237 j += 1;
1238 }
1239 diff
1240}
1241
1242pub fn save_run_record(run: &RunRecord, path: Option<&str>) -> Result<String, VmError> {
1243 let path = path
1244 .map(PathBuf::from)
1245 .unwrap_or_else(|| default_run_dir().join(format!("{}.json", run.id)));
1246 if let Some(parent) = path.parent() {
1247 std::fs::create_dir_all(parent)
1248 .map_err(|e| VmError::Runtime(format!("failed to create run directory: {e}")))?;
1249 }
1250 let json = serde_json::to_string_pretty(run)
1251 .map_err(|e| VmError::Runtime(format!("failed to encode run record: {e}")))?;
1252 std::fs::write(&path, json)
1253 .map_err(|e| VmError::Runtime(format!("failed to persist run record: {e}")))?;
1254 Ok(path.to_string_lossy().to_string())
1255}
1256
1257pub fn load_run_record(path: &Path) -> Result<RunRecord, VmError> {
1258 let content = std::fs::read_to_string(path)
1259 .map_err(|e| VmError::Runtime(format!("failed to read run record: {e}")))?;
1260 serde_json::from_str(&content)
1261 .map_err(|e| VmError::Runtime(format!("failed to parse run record: {e}")))
1262}
1263
1264pub fn replay_fixture_from_run(run: &RunRecord) -> ReplayFixture {
1265 ReplayFixture {
1266 type_name: "replay_fixture".to_string(),
1267 id: new_id("fixture"),
1268 source_run_id: run.id.clone(),
1269 workflow_id: run.workflow_id.clone(),
1270 workflow_name: run.workflow_name.clone(),
1271 created_at: now_rfc3339(),
1272 expected_status: run.status.clone(),
1273 stage_assertions: run
1274 .stages
1275 .iter()
1276 .map(|stage| ReplayStageAssertion {
1277 node_id: stage.node_id.clone(),
1278 expected_status: stage.status.clone(),
1279 expected_outcome: stage.outcome.clone(),
1280 expected_branch: stage.branch.clone(),
1281 required_artifact_kinds: stage
1282 .artifacts
1283 .iter()
1284 .map(|artifact| artifact.kind.clone())
1285 .collect(),
1286 visible_text_contains: stage
1287 .visible_text
1288 .as_ref()
1289 .filter(|text| !text.is_empty())
1290 .map(|text| text.chars().take(80).collect()),
1291 })
1292 .collect(),
1293 }
1294}
1295
1296pub fn evaluate_run_against_fixture(run: &RunRecord, fixture: &ReplayFixture) -> ReplayEvalReport {
1297 let mut failures = Vec::new();
1298 if run.status != fixture.expected_status {
1299 failures.push(format!(
1300 "run status mismatch: expected {}, got {}",
1301 fixture.expected_status, run.status
1302 ));
1303 }
1304 for assertion in &fixture.stage_assertions {
1305 let Some(stage) = run
1306 .stages
1307 .iter()
1308 .find(|stage| stage.node_id == assertion.node_id)
1309 else {
1310 failures.push(format!("missing stage {}", assertion.node_id));
1311 continue;
1312 };
1313 if stage.status != assertion.expected_status {
1314 failures.push(format!(
1315 "stage {} status mismatch: expected {}, got {}",
1316 assertion.node_id, assertion.expected_status, stage.status
1317 ));
1318 }
1319 if stage.outcome != assertion.expected_outcome {
1320 failures.push(format!(
1321 "stage {} outcome mismatch: expected {}, got {}",
1322 assertion.node_id, assertion.expected_outcome, stage.outcome
1323 ));
1324 }
1325 if stage.branch != assertion.expected_branch {
1326 failures.push(format!(
1327 "stage {} branch mismatch: expected {:?}, got {:?}",
1328 assertion.node_id, assertion.expected_branch, stage.branch
1329 ));
1330 }
1331 for required_kind in &assertion.required_artifact_kinds {
1332 if !stage
1333 .artifacts
1334 .iter()
1335 .any(|artifact| &artifact.kind == required_kind)
1336 {
1337 failures.push(format!(
1338 "stage {} missing artifact kind {}",
1339 assertion.node_id, required_kind
1340 ));
1341 }
1342 }
1343 if let Some(snippet) = &assertion.visible_text_contains {
1344 let actual = stage.visible_text.clone().unwrap_or_default();
1345 if !actual.contains(snippet) {
1346 failures.push(format!(
1347 "stage {} visible text does not contain expected snippet {:?}",
1348 assertion.node_id, snippet
1349 ));
1350 }
1351 }
1352 }
1353
1354 ReplayEvalReport {
1355 pass: failures.is_empty(),
1356 failures,
1357 stage_count: run.stages.len(),
1358 }
1359}
1360
1361pub fn evaluate_run_suite(
1362 cases: Vec<(RunRecord, ReplayFixture, Option<String>)>,
1363) -> ReplayEvalSuiteReport {
1364 let mut reports = Vec::new();
1365 for (run, fixture, source_path) in cases {
1366 let report = evaluate_run_against_fixture(&run, &fixture);
1367 reports.push(ReplayEvalCaseReport {
1368 run_id: run.id.clone(),
1369 workflow_id: run.workflow_id.clone(),
1370 label: None,
1371 pass: report.pass,
1372 failures: report.failures,
1373 stage_count: report.stage_count,
1374 source_path,
1375 comparison: None,
1376 });
1377 }
1378 let total = reports.len();
1379 let passed = reports.iter().filter(|report| report.pass).count();
1380 let failed = total.saturating_sub(passed);
1381 ReplayEvalSuiteReport {
1382 pass: failed == 0,
1383 total,
1384 passed,
1385 failed,
1386 cases: reports,
1387 }
1388}
1389
1390pub fn diff_run_records(left: &RunRecord, right: &RunRecord) -> RunDiffReport {
1391 let mut stage_diffs = Vec::new();
1392 let mut all_node_ids = BTreeSet::new();
1393 all_node_ids.extend(left.stages.iter().map(|stage| stage.node_id.clone()));
1394 all_node_ids.extend(right.stages.iter().map(|stage| stage.node_id.clone()));
1395
1396 for node_id in all_node_ids {
1397 let left_stage = left.stages.iter().find(|stage| stage.node_id == node_id);
1398 let right_stage = right.stages.iter().find(|stage| stage.node_id == node_id);
1399 match (left_stage, right_stage) {
1400 (Some(_), None) => stage_diffs.push(RunStageDiffRecord {
1401 node_id,
1402 change: "removed".to_string(),
1403 details: vec!["stage missing from right run".to_string()],
1404 }),
1405 (None, Some(_)) => stage_diffs.push(RunStageDiffRecord {
1406 node_id,
1407 change: "added".to_string(),
1408 details: vec!["stage missing from left run".to_string()],
1409 }),
1410 (Some(left_stage), Some(right_stage)) => {
1411 let mut details = Vec::new();
1412 if left_stage.status != right_stage.status {
1413 details.push(format!(
1414 "status: {} -> {}",
1415 left_stage.status, right_stage.status
1416 ));
1417 }
1418 if left_stage.outcome != right_stage.outcome {
1419 details.push(format!(
1420 "outcome: {} -> {}",
1421 left_stage.outcome, right_stage.outcome
1422 ));
1423 }
1424 if left_stage.branch != right_stage.branch {
1425 details.push(format!(
1426 "branch: {:?} -> {:?}",
1427 left_stage.branch, right_stage.branch
1428 ));
1429 }
1430 if left_stage.produced_artifact_ids.len() != right_stage.produced_artifact_ids.len()
1431 {
1432 details.push(format!(
1433 "produced_artifacts: {} -> {}",
1434 left_stage.produced_artifact_ids.len(),
1435 right_stage.produced_artifact_ids.len()
1436 ));
1437 }
1438 if left_stage.artifacts.len() != right_stage.artifacts.len() {
1439 details.push(format!(
1440 "artifact_records: {} -> {}",
1441 left_stage.artifacts.len(),
1442 right_stage.artifacts.len()
1443 ));
1444 }
1445 if !details.is_empty() {
1446 stage_diffs.push(RunStageDiffRecord {
1447 node_id,
1448 change: "changed".to_string(),
1449 details,
1450 });
1451 }
1452 }
1453 (None, None) => {}
1454 }
1455 }
1456
1457 let status_changed = left.status != right.status;
1458 let identical = !status_changed
1459 && stage_diffs.is_empty()
1460 && left.transitions.len() == right.transitions.len()
1461 && left.artifacts.len() == right.artifacts.len()
1462 && left.checkpoints.len() == right.checkpoints.len();
1463
1464 RunDiffReport {
1465 left_run_id: left.id.clone(),
1466 right_run_id: right.id.clone(),
1467 identical,
1468 status_changed,
1469 left_status: left.status.clone(),
1470 right_status: right.status.clone(),
1471 stage_diffs,
1472 transition_count_delta: right.transitions.len() as isize - left.transitions.len() as isize,
1473 artifact_count_delta: right.artifacts.len() as isize - left.artifacts.len() as isize,
1474 checkpoint_count_delta: right.checkpoints.len() as isize - left.checkpoints.len() as isize,
1475 }
1476}
1477
1478pub fn push_execution_policy(policy: CapabilityPolicy) {
1479 EXECUTION_POLICY_STACK.with(|stack| stack.borrow_mut().push(policy));
1480}
1481
1482pub fn pop_execution_policy() {
1483 EXECUTION_POLICY_STACK.with(|stack| {
1484 stack.borrow_mut().pop();
1485 });
1486}
1487
1488pub fn current_execution_policy() -> Option<CapabilityPolicy> {
1489 EXECUTION_POLICY_STACK.with(|stack| stack.borrow().last().cloned())
1490}
1491
1492fn policy_allows_tool(policy: &CapabilityPolicy, tool: &str) -> bool {
1493 policy.tools.is_empty() || policy.tools.iter().any(|allowed| allowed == tool)
1494}
1495
1496fn policy_allows_capability(policy: &CapabilityPolicy, capability: &str, op: &str) -> bool {
1497 policy.capabilities.is_empty()
1498 || policy
1499 .capabilities
1500 .get(capability)
1501 .is_some_and(|ops| ops.is_empty() || ops.iter().any(|allowed| allowed == op))
1502}
1503
1504fn policy_allows_side_effect(policy: &CapabilityPolicy, requested: &str) -> bool {
1505 fn rank(v: &str) -> usize {
1506 match v {
1507 "none" => 0,
1508 "read_only" => 1,
1509 "workspace_write" => 2,
1510 "process_exec" => 3,
1511 "network" => 4,
1512 _ => 5,
1513 }
1514 }
1515 policy
1516 .side_effect_level
1517 .as_ref()
1518 .map(|allowed| rank(allowed) >= rank(requested))
1519 .unwrap_or(true)
1520}
1521
1522fn reject_policy(reason: String) -> Result<(), VmError> {
1523 Err(VmError::CategorizedError {
1524 message: reason,
1525 category: crate::value::ErrorCategory::ToolRejected,
1526 })
1527}
1528
1529pub fn enforce_current_policy_for_builtin(name: &str, args: &[VmValue]) -> Result<(), VmError> {
1530 let Some(policy) = current_execution_policy() else {
1531 return Ok(());
1532 };
1533 match name {
1534 "read" | "read_file" => {
1535 if !policy_allows_tool(&policy, name)
1536 || !policy_allows_capability(&policy, "workspace", "read_text")
1537 {
1538 return reject_policy(format!(
1539 "builtin '{name}' exceeds workspace.read_text ceiling"
1540 ));
1541 }
1542 }
1543 "search" | "list_dir" => {
1544 if !policy_allows_tool(&policy, name)
1545 || !policy_allows_capability(&policy, "workspace", "list")
1546 {
1547 return reject_policy(format!("builtin '{name}' exceeds workspace.list ceiling"));
1548 }
1549 }
1550 "file_exists" | "stat" => {
1551 if !policy_allows_capability(&policy, "workspace", "exists") {
1552 return reject_policy(format!("builtin '{name}' exceeds workspace.exists ceiling"));
1553 }
1554 }
1555 "edit" | "write_file" | "append_file" | "mkdir" | "copy_file" => {
1556 if !policy_allows_tool(&policy, "edit")
1557 || !policy_allows_capability(&policy, "workspace", "write_text")
1558 || !policy_allows_side_effect(&policy, "workspace_write")
1559 {
1560 return reject_policy(format!("builtin '{name}' exceeds workspace write ceiling"));
1561 }
1562 }
1563 "delete_file" => {
1564 if !policy_allows_capability(&policy, "workspace", "delete")
1565 || !policy_allows_side_effect(&policy, "workspace_write")
1566 {
1567 return reject_policy(
1568 "builtin 'delete_file' exceeds workspace.delete ceiling".to_string(),
1569 );
1570 }
1571 }
1572 "apply_edit" => {
1573 if !policy_allows_capability(&policy, "workspace", "apply_edit")
1574 || !policy_allows_side_effect(&policy, "workspace_write")
1575 {
1576 return reject_policy(
1577 "builtin 'apply_edit' exceeds workspace.apply_edit ceiling".to_string(),
1578 );
1579 }
1580 }
1581 "exec" | "exec_at" | "shell" | "shell_at" | "run_command" => {
1582 if !policy_allows_tool(&policy, "run")
1583 || !policy_allows_capability(&policy, "process", "exec")
1584 || !policy_allows_side_effect(&policy, "process_exec")
1585 {
1586 return reject_policy(format!("builtin '{name}' exceeds process.exec ceiling"));
1587 }
1588 }
1589 "http_get" | "http_post" | "http_put" | "http_patch" | "http_delete" | "http_request" => {
1590 if !policy_allows_side_effect(&policy, "network") {
1591 return reject_policy(format!("builtin '{name}' exceeds network ceiling"));
1592 }
1593 }
1594 "mcp_connect"
1595 | "mcp_call"
1596 | "mcp_list_tools"
1597 | "mcp_list_resources"
1598 | "mcp_list_resource_templates"
1599 | "mcp_read_resource"
1600 | "mcp_list_prompts"
1601 | "mcp_get_prompt"
1602 | "mcp_server_info"
1603 | "mcp_disconnect" => {
1604 if !policy_allows_tool(&policy, "run")
1605 || !policy_allows_capability(&policy, "process", "exec")
1606 || !policy_allows_side_effect(&policy, "process_exec")
1607 {
1608 return reject_policy(format!("builtin '{name}' exceeds process.exec ceiling"));
1609 }
1610 }
1611 "host_invoke" => {
1612 let capability = args.first().map(|v| v.display()).unwrap_or_default();
1613 let op = args.get(1).map(|v| v.display()).unwrap_or_default();
1614 if !policy_allows_capability(&policy, &capability, &op) {
1615 return reject_policy(format!(
1616 "host_invoke {capability}.{op} exceeds capability ceiling"
1617 ));
1618 }
1619 let requested_side_effect = match (capability.as_str(), op.as_str()) {
1620 ("workspace", "write_text" | "apply_edit" | "delete") => "workspace_write",
1621 ("process", "exec") => "process_exec",
1622 _ => "read_only",
1623 };
1624 if !policy_allows_side_effect(&policy, requested_side_effect) {
1625 return reject_policy(format!(
1626 "host_invoke {capability}.{op} exceeds side-effect ceiling"
1627 ));
1628 }
1629 }
1630 _ => {}
1631 }
1632 Ok(())
1633}
1634
1635pub fn enforce_current_policy_for_bridge_builtin(name: &str) -> Result<(), VmError> {
1636 if current_execution_policy().is_some() {
1637 return reject_policy(format!(
1638 "bridged builtin '{name}' exceeds execution policy; declare an explicit capability/tool surface instead"
1639 ));
1640 }
1641 Ok(())
1642}
1643
1644pub fn enforce_current_policy_for_tool(tool_name: &str) -> Result<(), VmError> {
1645 let Some(policy) = current_execution_policy() else {
1646 return Ok(());
1647 };
1648 if !policy_allows_tool(&policy, tool_name) {
1649 return reject_policy(format!("tool '{tool_name}' exceeds tool ceiling"));
1650 }
1651 Ok(())
1652}
1653
1654fn compact_transcript(transcript: &VmValue, keep_last: usize) -> Option<VmValue> {
1655 let dict = transcript.as_dict()?;
1656 let messages = match dict.get("messages") {
1657 Some(VmValue::List(list)) => list.iter().cloned().collect::<Vec<_>>(),
1658 _ => Vec::new(),
1659 };
1660 let retained = messages
1661 .into_iter()
1662 .rev()
1663 .take(keep_last)
1664 .collect::<Vec<_>>()
1665 .into_iter()
1666 .rev()
1667 .collect::<Vec<_>>();
1668 let mut compacted = dict.clone();
1669 compacted.insert(
1670 "messages".to_string(),
1671 VmValue::List(Rc::new(retained.clone())),
1672 );
1673 compacted.insert(
1674 "events".to_string(),
1675 VmValue::List(Rc::new(
1676 crate::llm::helpers::transcript_events_from_messages(&retained),
1677 )),
1678 );
1679 Some(VmValue::Dict(Rc::new(compacted)))
1680}
1681
1682fn redact_transcript_visibility(transcript: &VmValue, visibility: Option<&str>) -> Option<VmValue> {
1683 let Some(visibility) = visibility else {
1684 return Some(transcript.clone());
1685 };
1686 if visibility != "public" && visibility != "public_only" {
1687 return Some(transcript.clone());
1688 }
1689 let dict = transcript.as_dict()?;
1690 let public_messages = match dict.get("messages") {
1691 Some(VmValue::List(list)) => list
1692 .iter()
1693 .filter(|message| {
1694 message
1695 .as_dict()
1696 .and_then(|d| d.get("role"))
1697 .map(|v| v.display())
1698 .map(|role| role != "tool_result")
1699 .unwrap_or(true)
1700 })
1701 .cloned()
1702 .collect::<Vec<_>>(),
1703 _ => Vec::new(),
1704 };
1705 let public_events = match dict.get("events") {
1706 Some(VmValue::List(list)) => list
1707 .iter()
1708 .filter(|event| {
1709 event
1710 .as_dict()
1711 .and_then(|d| d.get("visibility"))
1712 .map(|v| v.display())
1713 .map(|value| value == "public")
1714 .unwrap_or(true)
1715 })
1716 .cloned()
1717 .collect::<Vec<_>>(),
1718 _ => Vec::new(),
1719 };
1720 let mut redacted = dict.clone();
1721 redacted.insert(
1722 "messages".to_string(),
1723 VmValue::List(Rc::new(public_messages)),
1724 );
1725 redacted.insert("events".to_string(), VmValue::List(Rc::new(public_events)));
1726 Some(VmValue::Dict(Rc::new(redacted)))
1727}
1728
1729pub(crate) fn apply_input_transcript_policy(
1730 transcript: Option<VmValue>,
1731 policy: &TranscriptPolicy,
1732) -> Option<VmValue> {
1733 let mut transcript = transcript;
1734 match policy.mode.as_deref() {
1735 Some("reset") => return None,
1736 Some("fork") => {
1737 if let Some(VmValue::Dict(dict)) = transcript.as_ref() {
1738 let mut forked = dict.as_ref().clone();
1739 forked.insert(
1740 "id".to_string(),
1741 VmValue::String(Rc::from(new_id("transcript"))),
1742 );
1743 transcript = Some(VmValue::Dict(Rc::new(forked)));
1744 }
1745 }
1746 _ => {}
1747 }
1748 if policy.compact {
1749 let keep_last = policy.keep_last.unwrap_or(6);
1750 transcript = transcript.and_then(|value| compact_transcript(&value, keep_last));
1751 }
1752 transcript
1753}
1754
1755fn apply_output_transcript_policy(
1756 transcript: Option<VmValue>,
1757 policy: &TranscriptPolicy,
1758) -> Option<VmValue> {
1759 let mut transcript = transcript;
1760 if policy.compact {
1761 let keep_last = policy.keep_last.unwrap_or(6);
1762 transcript = transcript.and_then(|value| compact_transcript(&value, keep_last));
1763 }
1764 transcript.and_then(|value| redact_transcript_visibility(&value, policy.visibility.as_deref()))
1765}
1766
1767pub async fn execute_stage_node(
1768 node_id: &str,
1769 node: &WorkflowNode,
1770 task: &str,
1771 artifacts: &[ArtifactRecord],
1772 transcript: Option<VmValue>,
1773) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
1774 let mut selection_policy = node.context_policy.clone();
1775 if selection_policy.include_kinds.is_empty() && !node.input_contract.input_kinds.is_empty() {
1776 selection_policy.include_kinds = node.input_contract.input_kinds.clone();
1777 }
1778 let selected = select_artifacts(artifacts.to_vec(), &selection_policy);
1779 let rendered_context = render_artifacts_context(&selected, &node.context_policy);
1780 let transcript = apply_input_transcript_policy(transcript, &node.transcript_policy);
1781 if node.input_contract.require_transcript && transcript.is_none() {
1782 return Err(VmError::Runtime(format!(
1783 "workflow stage {node_id} requires transcript input"
1784 )));
1785 }
1786 if let Some(min_inputs) = node.input_contract.min_inputs {
1787 if selected.len() < min_inputs {
1788 return Err(VmError::Runtime(format!(
1789 "workflow stage {node_id} requires at least {min_inputs} input artifacts"
1790 )));
1791 }
1792 }
1793 if let Some(max_inputs) = node.input_contract.max_inputs {
1794 if selected.len() > max_inputs {
1795 return Err(VmError::Runtime(format!(
1796 "workflow stage {node_id} accepts at most {max_inputs} input artifacts"
1797 )));
1798 }
1799 }
1800 let prompt = if rendered_context.is_empty() {
1801 task.to_string()
1802 } else {
1803 format!(
1804 "{rendered_context}\n\n{}:\n{task}",
1805 node.task_label
1806 .clone()
1807 .unwrap_or_else(|| "Task".to_string())
1808 )
1809 };
1810
1811 let mut options = BTreeMap::new();
1812 if let Some(provider) = &node.model_policy.provider {
1813 options.insert(
1814 "provider".to_string(),
1815 VmValue::String(Rc::from(provider.clone())),
1816 );
1817 }
1818 if let Some(model) = &node.model_policy.model {
1819 options.insert(
1820 "model".to_string(),
1821 VmValue::String(Rc::from(model.clone())),
1822 );
1823 }
1824 if let Some(model_tier) = &node.model_policy.model_tier {
1825 options.insert(
1826 "model_tier".to_string(),
1827 VmValue::String(Rc::from(model_tier.clone())),
1828 );
1829 }
1830 if let Some(temperature) = node.model_policy.temperature {
1831 options.insert("temperature".to_string(), VmValue::Float(temperature));
1832 }
1833 if let Some(max_tokens) = node.model_policy.max_tokens {
1834 options.insert("max_tokens".to_string(), VmValue::Int(max_tokens));
1835 }
1836 if !node.tools.is_empty() {
1837 options.insert(
1838 "tools".to_string(),
1839 VmValue::List(Rc::new(
1840 node.tools
1841 .iter()
1842 .map(|tool| VmValue::String(Rc::from(tool.clone())))
1843 .collect(),
1844 )),
1845 );
1846 }
1847 if let Some(transcript) = transcript.clone() {
1848 options.insert("transcript".to_string(), transcript);
1849 }
1850
1851 let args = vec![
1852 VmValue::String(Rc::from(prompt)),
1853 node.system
1854 .clone()
1855 .map(|s| VmValue::String(Rc::from(s)))
1856 .unwrap_or(VmValue::Nil),
1857 VmValue::Dict(Rc::new(options)),
1858 ];
1859 let mut opts = extract_llm_options(&args)?;
1860
1861 let llm_result = if node.mode.as_deref() == Some("agent") || !node.tools.is_empty() {
1862 crate::llm::run_agent_loop_internal(
1863 &mut opts,
1864 crate::llm::AgentLoopConfig {
1865 persistent: true,
1866 max_iterations: 12,
1867 max_nudges: 3,
1868 nudge: None,
1869 tool_retries: 0,
1870 tool_backoff_ms: 1000,
1871 tool_format: "text".to_string(),
1872 },
1873 )
1874 .await?
1875 } else {
1876 let result = vm_call_llm_full(&opts).await?;
1877 crate::llm::agent_loop_result_from_llm(&result, opts)
1878 };
1879
1880 let visible_text = llm_result["text"].as_str().unwrap_or_default().to_string();
1881 let transcript = llm_result
1882 .get("transcript")
1883 .cloned()
1884 .map(|value| crate::stdlib::json_to_vm_value(&value));
1885 let transcript = apply_output_transcript_policy(transcript, &node.transcript_policy);
1886 let output_kind = node
1887 .output_contract
1888 .output_kinds
1889 .first()
1890 .cloned()
1891 .unwrap_or_else(|| {
1892 if node.kind == "verify" {
1893 "verification_result".to_string()
1894 } else {
1895 "artifact".to_string()
1896 }
1897 });
1898 let mut metadata = BTreeMap::new();
1899 metadata.insert(
1900 "input_artifact_ids".to_string(),
1901 serde_json::json!(selected
1902 .iter()
1903 .map(|artifact| artifact.id.clone())
1904 .collect::<Vec<_>>()),
1905 );
1906 metadata.insert("node_kind".to_string(), serde_json::json!(node.kind));
1907 let artifact = ArtifactRecord {
1908 type_name: "artifact".to_string(),
1909 id: new_id("artifact"),
1910 kind: output_kind,
1911 title: Some(format!("stage {node_id} output")),
1912 text: Some(visible_text),
1913 data: Some(llm_result.clone()),
1914 source: Some(node_id.to_string()),
1915 created_at: now_rfc3339(),
1916 freshness: Some("fresh".to_string()),
1917 priority: None,
1918 lineage: selected
1919 .iter()
1920 .map(|artifact| artifact.id.clone())
1921 .collect(),
1922 relevance: Some(1.0),
1923 estimated_tokens: None,
1924 stage: Some(node_id.to_string()),
1925 metadata,
1926 }
1927 .normalize();
1928
1929 Ok((llm_result, vec![artifact], transcript))
1930}
1931
1932pub fn next_nodes_for(
1933 graph: &WorkflowGraph,
1934 current: &str,
1935 branch: Option<&str>,
1936) -> Vec<WorkflowEdge> {
1937 let mut matching: Vec<WorkflowEdge> = graph
1938 .edges
1939 .iter()
1940 .filter(|edge| edge.from == current && edge.branch.as_deref() == branch)
1941 .cloned()
1942 .collect();
1943 if matching.is_empty() {
1944 matching = graph
1945 .edges
1946 .iter()
1947 .filter(|edge| edge.from == current && edge.branch.is_none())
1948 .cloned()
1949 .collect();
1950 }
1951 matching
1952}
1953
1954pub fn next_node_for(graph: &WorkflowGraph, current: &str, branch: &str) -> Option<String> {
1955 next_nodes_for(graph, current, Some(branch))
1956 .into_iter()
1957 .next()
1958 .map(|edge| edge.to)
1959}
1960
1961pub fn append_audit_entry(
1962 graph: &mut WorkflowGraph,
1963 op: &str,
1964 node_id: Option<String>,
1965 reason: Option<String>,
1966 metadata: BTreeMap<String, serde_json::Value>,
1967) {
1968 graph.audit_log.push(WorkflowAuditEntry {
1969 id: new_id("audit"),
1970 op: op.to_string(),
1971 node_id,
1972 timestamp: now_rfc3339(),
1973 reason,
1974 metadata,
1975 });
1976}
1977
1978pub fn builtin_ceiling() -> CapabilityPolicy {
1979 CapabilityPolicy {
1980 tools: vec![
1981 "read".to_string(),
1982 "read_file".to_string(),
1983 "search".to_string(),
1984 "edit".to_string(),
1985 "run".to_string(),
1986 "exec".to_string(),
1987 "outline".to_string(),
1988 "list_directory".to_string(),
1989 "lsp_hover".to_string(),
1990 "lsp_definition".to_string(),
1991 "lsp_references".to_string(),
1992 "web_search".to_string(),
1993 "web_fetch".to_string(),
1994 ],
1995 capabilities: BTreeMap::from([
1996 (
1997 "workspace".to_string(),
1998 vec![
1999 "read_text".to_string(),
2000 "write_text".to_string(),
2001 "apply_edit".to_string(),
2002 "delete".to_string(),
2003 "exists".to_string(),
2004 "list".to_string(),
2005 ],
2006 ),
2007 ("process".to_string(), vec!["exec".to_string()]),
2008 ]),
2009 workspace_roots: Vec::new(),
2010 side_effect_level: Some("network".to_string()),
2011 recursion_limit: Some(8),
2012 }
2013}
2014
2015#[cfg(test)]
2016mod tests {
2017 use super::*;
2018
2019 #[test]
2020 fn capability_intersection_rejects_privilege_expansion() {
2021 let ceiling = CapabilityPolicy {
2022 tools: vec!["read".to_string()],
2023 capabilities: BTreeMap::new(),
2024 workspace_roots: Vec::new(),
2025 side_effect_level: Some("read_only".to_string()),
2026 recursion_limit: Some(2),
2027 };
2028 let requested = CapabilityPolicy {
2029 tools: vec!["read".to_string(), "edit".to_string()],
2030 ..Default::default()
2031 };
2032 let error = ceiling.intersect(&requested).unwrap_err();
2033 assert!(error.contains("host ceiling"));
2034 }
2035
2036 #[test]
2037 fn active_execution_policy_rejects_unknown_bridge_builtin() {
2038 push_execution_policy(CapabilityPolicy {
2039 tools: vec!["read".to_string()],
2040 capabilities: BTreeMap::from([(
2041 "workspace".to_string(),
2042 vec!["read_text".to_string()],
2043 )]),
2044 workspace_roots: Vec::new(),
2045 side_effect_level: Some("read_only".to_string()),
2046 recursion_limit: Some(1),
2047 });
2048 let error = enforce_current_policy_for_bridge_builtin("custom_host_builtin").unwrap_err();
2049 pop_execution_policy();
2050 assert!(matches!(
2051 error,
2052 VmError::CategorizedError {
2053 category: crate::value::ErrorCategory::ToolRejected,
2054 ..
2055 }
2056 ));
2057 }
2058
2059 #[test]
2060 fn active_execution_policy_rejects_mcp_escape_hatch() {
2061 push_execution_policy(CapabilityPolicy {
2062 tools: vec!["read".to_string()],
2063 capabilities: BTreeMap::from([(
2064 "workspace".to_string(),
2065 vec!["read_text".to_string()],
2066 )]),
2067 workspace_roots: Vec::new(),
2068 side_effect_level: Some("read_only".to_string()),
2069 recursion_limit: Some(1),
2070 });
2071 let error = enforce_current_policy_for_builtin("mcp_connect", &[]).unwrap_err();
2072 pop_execution_policy();
2073 assert!(matches!(
2074 error,
2075 VmError::CategorizedError {
2076 category: crate::value::ErrorCategory::ToolRejected,
2077 ..
2078 }
2079 ));
2080 }
2081
2082 #[test]
2083 fn workflow_normalization_upgrades_legacy_act_verify_repair_shape() {
2084 let value = crate::stdlib::json_to_vm_value(&serde_json::json!({
2085 "name": "legacy",
2086 "act": {"mode": "llm"},
2087 "verify": {"kind": "verify"},
2088 "repair": {"mode": "agent"},
2089 }));
2090 let graph = normalize_workflow_value(&value).unwrap();
2091 assert_eq!(graph.type_name, "workflow_graph");
2092 assert!(graph.nodes.contains_key("act"));
2093 assert!(graph.nodes.contains_key("verify"));
2094 assert!(graph.nodes.contains_key("repair"));
2095 assert_eq!(graph.entry, "act");
2096 }
2097
2098 #[test]
2099 fn artifact_selection_honors_budget_and_priority() {
2100 let policy = ContextPolicy {
2101 max_artifacts: Some(2),
2102 max_tokens: Some(30),
2103 prefer_recent: true,
2104 prefer_fresh: true,
2105 prioritize_kinds: vec!["verification_result".to_string()],
2106 ..Default::default()
2107 };
2108 let artifacts = vec![
2109 ArtifactRecord {
2110 type_name: "artifact".to_string(),
2111 id: "a".to_string(),
2112 kind: "summary".to_string(),
2113 text: Some("short".to_string()),
2114 relevance: Some(0.9),
2115 created_at: now_rfc3339(),
2116 ..Default::default()
2117 }
2118 .normalize(),
2119 ArtifactRecord {
2120 type_name: "artifact".to_string(),
2121 id: "b".to_string(),
2122 kind: "summary".to_string(),
2123 text: Some("this is a much larger artifact body".to_string()),
2124 relevance: Some(1.0),
2125 created_at: now_rfc3339(),
2126 ..Default::default()
2127 }
2128 .normalize(),
2129 ArtifactRecord {
2130 type_name: "artifact".to_string(),
2131 id: "c".to_string(),
2132 kind: "summary".to_string(),
2133 text: Some("tiny".to_string()),
2134 relevance: Some(0.5),
2135 created_at: now_rfc3339(),
2136 ..Default::default()
2137 }
2138 .normalize(),
2139 ];
2140 let selected = select_artifacts(artifacts, &policy);
2141 assert_eq!(selected.len(), 2);
2142 assert!(selected.iter().all(|artifact| artifact.kind == "summary"));
2143 }
2144
2145 #[test]
2146 fn workflow_validation_rejects_condition_without_true_false_edges() {
2147 let graph = WorkflowGraph {
2148 entry: "gate".to_string(),
2149 nodes: BTreeMap::from([(
2150 "gate".to_string(),
2151 WorkflowNode {
2152 id: Some("gate".to_string()),
2153 kind: "condition".to_string(),
2154 ..Default::default()
2155 },
2156 )]),
2157 edges: vec![WorkflowEdge {
2158 from: "gate".to_string(),
2159 to: "next".to_string(),
2160 branch: Some("true".to_string()),
2161 label: None,
2162 }],
2163 ..Default::default()
2164 };
2165 let report = validate_workflow(&graph, None);
2166 assert!(!report.valid);
2167 assert!(report
2168 .errors
2169 .iter()
2170 .any(|error| error.contains("true") && error.contains("false")));
2171 }
2172
2173 #[test]
2174 fn replay_fixture_round_trip_passes() {
2175 let run = RunRecord {
2176 type_name: "run_record".to_string(),
2177 id: "run_1".to_string(),
2178 workflow_id: "wf".to_string(),
2179 workflow_name: Some("demo".to_string()),
2180 task: "demo".to_string(),
2181 status: "completed".to_string(),
2182 started_at: "1".to_string(),
2183 finished_at: Some("2".to_string()),
2184 parent_run_id: None,
2185 root_run_id: Some("run_1".to_string()),
2186 stages: vec![RunStageRecord {
2187 id: "stage_1".to_string(),
2188 node_id: "act".to_string(),
2189 kind: "stage".to_string(),
2190 status: "completed".to_string(),
2191 outcome: "success".to_string(),
2192 branch: Some("success".to_string()),
2193 started_at: "1".to_string(),
2194 finished_at: Some("2".to_string()),
2195 visible_text: Some("done".to_string()),
2196 private_reasoning: None,
2197 transcript: None,
2198 verification: None,
2199 artifacts: vec![ArtifactRecord {
2200 type_name: "artifact".to_string(),
2201 id: "a1".to_string(),
2202 kind: "summary".to_string(),
2203 text: Some("done".to_string()),
2204 created_at: "1".to_string(),
2205 ..Default::default()
2206 }
2207 .normalize()],
2208 consumed_artifact_ids: vec![],
2209 produced_artifact_ids: vec!["a1".to_string()],
2210 attempts: vec![],
2211 metadata: BTreeMap::new(),
2212 }],
2213 transitions: vec![],
2214 checkpoints: vec![],
2215 pending_nodes: vec![],
2216 completed_nodes: vec!["act".to_string()],
2217 child_runs: vec![],
2218 artifacts: vec![],
2219 policy: CapabilityPolicy::default(),
2220 execution: None,
2221 transcript: None,
2222 replay_fixture: None,
2223 metadata: BTreeMap::new(),
2224 persisted_path: None,
2225 };
2226 let fixture = replay_fixture_from_run(&run);
2227 let report = evaluate_run_against_fixture(&run, &fixture);
2228 assert!(report.pass);
2229 assert!(report.failures.is_empty());
2230 }
2231
2232 #[test]
2233 fn replay_eval_suite_reports_failed_case() {
2234 let good = RunRecord {
2235 id: "run_good".to_string(),
2236 workflow_id: "wf".to_string(),
2237 status: "completed".to_string(),
2238 stages: vec![RunStageRecord {
2239 node_id: "act".to_string(),
2240 status: "completed".to_string(),
2241 outcome: "success".to_string(),
2242 ..Default::default()
2243 }],
2244 ..Default::default()
2245 };
2246 let bad = RunRecord {
2247 id: "run_bad".to_string(),
2248 workflow_id: "wf".to_string(),
2249 status: "failed".to_string(),
2250 stages: vec![RunStageRecord {
2251 node_id: "act".to_string(),
2252 status: "failed".to_string(),
2253 outcome: "error".to_string(),
2254 ..Default::default()
2255 }],
2256 ..Default::default()
2257 };
2258 let suite = evaluate_run_suite(vec![
2259 (
2260 good.clone(),
2261 replay_fixture_from_run(&good),
2262 Some("good.json".to_string()),
2263 ),
2264 (
2265 bad.clone(),
2266 replay_fixture_from_run(&good),
2267 Some("bad.json".to_string()),
2268 ),
2269 ]);
2270 assert!(!suite.pass);
2271 assert_eq!(suite.total, 2);
2272 assert_eq!(suite.failed, 1);
2273 assert!(suite.cases.iter().any(|case| !case.pass));
2274 }
2275
2276 #[test]
2277 fn run_diff_reports_changed_stage() {
2278 let left = RunRecord {
2279 id: "left".to_string(),
2280 workflow_id: "wf".to_string(),
2281 status: "completed".to_string(),
2282 stages: vec![RunStageRecord {
2283 node_id: "act".to_string(),
2284 status: "completed".to_string(),
2285 outcome: "success".to_string(),
2286 ..Default::default()
2287 }],
2288 ..Default::default()
2289 };
2290 let right = RunRecord {
2291 id: "right".to_string(),
2292 workflow_id: "wf".to_string(),
2293 status: "failed".to_string(),
2294 stages: vec![RunStageRecord {
2295 node_id: "act".to_string(),
2296 status: "failed".to_string(),
2297 outcome: "error".to_string(),
2298 ..Default::default()
2299 }],
2300 ..Default::default()
2301 };
2302 let diff = diff_run_records(&left, &right);
2303 assert!(diff.status_changed);
2304 assert!(!diff.identical);
2305 assert_eq!(diff.stage_diffs.len(), 1);
2306 }
2307
2308 #[test]
2309 fn eval_suite_manifest_can_fail_on_baseline_diff() {
2310 let temp_dir =
2311 std::env::temp_dir().join(format!("harn-eval-suite-{}", uuid::Uuid::now_v7()));
2312 std::fs::create_dir_all(&temp_dir).unwrap();
2313 let baseline_path = temp_dir.join("baseline.json");
2314 let candidate_path = temp_dir.join("candidate.json");
2315
2316 let baseline = RunRecord {
2317 id: "baseline".to_string(),
2318 workflow_id: "wf".to_string(),
2319 status: "completed".to_string(),
2320 stages: vec![RunStageRecord {
2321 node_id: "act".to_string(),
2322 status: "completed".to_string(),
2323 outcome: "success".to_string(),
2324 ..Default::default()
2325 }],
2326 ..Default::default()
2327 };
2328 let candidate = RunRecord {
2329 id: "candidate".to_string(),
2330 workflow_id: "wf".to_string(),
2331 status: "failed".to_string(),
2332 stages: vec![RunStageRecord {
2333 node_id: "act".to_string(),
2334 status: "failed".to_string(),
2335 outcome: "error".to_string(),
2336 ..Default::default()
2337 }],
2338 ..Default::default()
2339 };
2340
2341 save_run_record(&baseline, Some(baseline_path.to_str().unwrap())).unwrap();
2342 save_run_record(&candidate, Some(candidate_path.to_str().unwrap())).unwrap();
2343
2344 let manifest = EvalSuiteManifest {
2345 base_dir: Some(temp_dir.display().to_string()),
2346 cases: vec![EvalSuiteCase {
2347 label: Some("candidate".to_string()),
2348 run_path: "candidate.json".to_string(),
2349 fixture_path: None,
2350 compare_to: Some("baseline.json".to_string()),
2351 }],
2352 ..Default::default()
2353 };
2354 let suite = evaluate_run_suite_manifest(&manifest).unwrap();
2355 assert!(!suite.pass);
2356 assert_eq!(suite.failed, 1);
2357 assert!(suite.cases[0].comparison.is_some());
2358 assert!(suite.cases[0]
2359 .failures
2360 .iter()
2361 .any(|failure| failure.contains("baseline")));
2362 }
2363
2364 #[test]
2365 fn render_unified_diff_marks_removed_and_added_lines() {
2366 let diff = render_unified_diff(Some("src/main.rs"), "old\nsame", "new\nsame");
2367 assert!(diff.contains("--- a/src/main.rs"));
2368 assert!(diff.contains("+++ b/src/main.rs"));
2369 assert!(diff.contains("-old"));
2370 assert!(diff.contains("+new"));
2371 assert!(diff.contains(" same"));
2372 }
2373
2374 #[test]
2375 fn execution_policy_rejects_process_exec_when_read_only() {
2376 push_execution_policy(CapabilityPolicy {
2377 side_effect_level: Some("read_only".to_string()),
2378 capabilities: BTreeMap::from([("process".to_string(), vec!["exec".to_string()])]),
2379 ..Default::default()
2380 });
2381 let result = enforce_current_policy_for_builtin("exec", &[]);
2382 pop_execution_policy();
2383 assert!(result.is_err());
2384 }
2385
2386 #[test]
2387 fn execution_policy_rejects_unlisted_tool() {
2388 push_execution_policy(CapabilityPolicy {
2389 tools: vec!["read".to_string()],
2390 ..Default::default()
2391 });
2392 let result = enforce_current_policy_for_tool("edit");
2393 pop_execution_policy();
2394 assert!(result.is_err());
2395 }
2396}