1use std::collections::{BTreeMap, BTreeSet};
2use std::path::{Path, PathBuf};
3use std::rc::Rc;
4use std::{cell::RefCell, thread_local};
5
6use serde::{Deserialize, Serialize};
7
8use crate::llm::{extract_llm_options, vm_call_llm_full, vm_value_to_json};
9use crate::value::{VmError, VmValue};
10
11fn now_rfc3339() -> String {
12 use std::time::{SystemTime, UNIX_EPOCH};
13 let ts = SystemTime::now()
14 .duration_since(UNIX_EPOCH)
15 .unwrap_or_default()
16 .as_secs();
17 format!("{ts}")
18}
19
20fn new_id(prefix: &str) -> String {
21 format!("{prefix}_{}", uuid::Uuid::now_v7())
22}
23
24fn default_run_dir() -> PathBuf {
25 std::env::var("HARN_RUN_DIR")
26 .map(PathBuf::from)
27 .unwrap_or_else(|_| PathBuf::from(".harn-runs"))
28}
29
30thread_local! {
31 static EXECUTION_POLICY_STACK: RefCell<Vec<CapabilityPolicy>> = const { RefCell::new(Vec::new()) };
32 static TOOL_HOOKS: RefCell<Vec<ToolHook>> = const { RefCell::new(Vec::new()) };
33 static CURRENT_MUTATION_SESSION: RefCell<Option<MutationSessionRecord>> = const { RefCell::new(None) };
34}
35
36#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
37#[serde(default)]
38pub struct MutationSessionRecord {
39 pub session_id: String,
40 pub parent_session_id: Option<String>,
41 pub run_id: Option<String>,
42 pub worker_id: Option<String>,
43 pub execution_kind: Option<String>,
44 pub mutation_scope: String,
45 pub approval_mode: String,
46}
47
48impl MutationSessionRecord {
49 pub fn normalize(mut self) -> Self {
50 if self.session_id.is_empty() {
51 self.session_id = new_id("session");
52 }
53 if self.mutation_scope.is_empty() {
54 self.mutation_scope = "read_only".to_string();
55 }
56 if self.approval_mode.is_empty() {
57 self.approval_mode = "host_enforced".to_string();
58 }
59 self
60 }
61}
62
63pub fn install_current_mutation_session(session: Option<MutationSessionRecord>) {
64 CURRENT_MUTATION_SESSION.with(|slot| {
65 *slot.borrow_mut() = session.map(MutationSessionRecord::normalize);
66 });
67}
68
69pub fn current_mutation_session() -> Option<MutationSessionRecord> {
70 CURRENT_MUTATION_SESSION.with(|slot| slot.borrow().clone())
71}
72
73#[derive(Clone, Debug)]
77pub enum PreToolAction {
78 Allow,
80 Deny(String),
82 Modify(serde_json::Value),
84}
85
86#[derive(Clone, Debug)]
88pub enum PostToolAction {
89 Pass,
91 Modify(String),
93}
94
95pub type PreToolHookFn = Rc<dyn Fn(&str, &serde_json::Value) -> PreToolAction>;
97pub type PostToolHookFn = Rc<dyn Fn(&str, &str) -> PostToolAction>;
98
99#[derive(Clone)]
101pub struct ToolHook {
102 pub pattern: String,
104 pub pre: Option<PreToolHookFn>,
106 pub post: Option<PostToolHookFn>,
108}
109
110impl std::fmt::Debug for ToolHook {
111 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112 f.debug_struct("ToolHook")
113 .field("pattern", &self.pattern)
114 .field("has_pre", &self.pre.is_some())
115 .field("has_post", &self.post.is_some())
116 .finish()
117 }
118}
119
120fn glob_match(pattern: &str, name: &str) -> bool {
121 if pattern == "*" {
122 return true;
123 }
124 if let Some(prefix) = pattern.strip_suffix('*') {
125 return name.starts_with(prefix);
126 }
127 if let Some(suffix) = pattern.strip_prefix('*') {
128 return name.ends_with(suffix);
129 }
130 pattern == name
131}
132
133pub fn register_tool_hook(hook: ToolHook) {
134 TOOL_HOOKS.with(|hooks| hooks.borrow_mut().push(hook));
135}
136
137pub fn clear_tool_hooks() {
138 TOOL_HOOKS.with(|hooks| hooks.borrow_mut().clear());
139}
140
141pub fn run_pre_tool_hooks(tool_name: &str, args: &serde_json::Value) -> PreToolAction {
143 TOOL_HOOKS.with(|hooks| {
144 let hooks = hooks.borrow();
145 let mut current_args = args.clone();
146 for hook in hooks.iter() {
147 if !glob_match(&hook.pattern, tool_name) {
148 continue;
149 }
150 if let Some(ref pre) = hook.pre {
151 match pre(tool_name, ¤t_args) {
152 PreToolAction::Allow => {}
153 PreToolAction::Deny(reason) => return PreToolAction::Deny(reason),
154 PreToolAction::Modify(new_args) => {
155 current_args = new_args;
156 }
157 }
158 }
159 }
160 if current_args != *args {
161 PreToolAction::Modify(current_args)
162 } else {
163 PreToolAction::Allow
164 }
165 })
166}
167
168pub fn run_post_tool_hooks(tool_name: &str, result: &str) -> String {
170 TOOL_HOOKS.with(|hooks| {
171 let hooks = hooks.borrow();
172 let mut current = result.to_string();
173 for hook in hooks.iter() {
174 if !glob_match(&hook.pattern, tool_name) {
175 continue;
176 }
177 if let Some(ref post) = hook.post {
178 match post(tool_name, ¤t) {
179 PostToolAction::Pass => {}
180 PostToolAction::Modify(new_result) => {
181 current = new_result;
182 }
183 }
184 }
185 }
186 current
187 })
188}
189
190#[derive(Clone, Debug, PartialEq, Eq)]
193pub enum CompactStrategy {
194 Llm,
195 Truncate,
196 Custom,
197}
198
199pub fn parse_compact_strategy(value: &str) -> Result<CompactStrategy, VmError> {
200 match value {
201 "llm" => Ok(CompactStrategy::Llm),
202 "truncate" => Ok(CompactStrategy::Truncate),
203 "custom" => Ok(CompactStrategy::Custom),
204 other => Err(VmError::Runtime(format!(
205 "unknown compact_strategy '{other}' (expected 'llm', 'truncate', or 'custom')"
206 ))),
207 }
208}
209
210#[derive(Clone, Debug)]
212pub struct AutoCompactConfig {
213 pub token_threshold: usize,
215 pub tool_output_max_chars: usize,
217 pub keep_last: usize,
219 pub compact_strategy: CompactStrategy,
221 pub custom_compactor: Option<VmValue>,
223}
224
225impl Default for AutoCompactConfig {
226 fn default() -> Self {
227 Self {
228 token_threshold: 80_000,
229 tool_output_max_chars: 20_000,
230 keep_last: 8,
231 compact_strategy: CompactStrategy::Llm,
232 custom_compactor: None,
233 }
234 }
235}
236
237pub fn estimate_message_tokens(messages: &[serde_json::Value]) -> usize {
239 messages
240 .iter()
241 .map(|m| {
242 m.get("content")
243 .and_then(|c| c.as_str())
244 .map(|s| s.len())
245 .unwrap_or(0)
246 })
247 .sum::<usize>()
248 / 4
249}
250
251pub fn microcompact_tool_output(output: &str, max_chars: usize) -> String {
254 if output.len() <= max_chars || max_chars < 200 {
255 return output.to_string();
256 }
257 let diagnostic_lines = output
258 .lines()
259 .filter(|line| {
260 let trimmed = line.trim();
261 let lower = trimmed.to_lowercase();
262 let has_file_line = {
264 let bytes = trimmed.as_bytes();
265 let mut i = 0;
266 let mut found_colon = false;
267 while i < bytes.len() {
268 if bytes[i] == b':' {
269 found_colon = true;
270 break;
271 }
272 i += 1;
273 }
274 found_colon && i + 1 < bytes.len() && bytes[i + 1].is_ascii_digit()
275 };
276 let has_keyword = trimmed.contains("error")
278 || trimmed.contains("FAIL")
279 || trimmed.contains("panic")
280 || trimmed.contains("undefined")
281 || trimmed.contains("expected")
282 || trimmed.contains("got")
283 || lower.contains("cannot find")
284 || lower.contains("not found")
285 || lower.contains("no such")
286 || lower.contains("unresolved")
287 || lower.contains("missing")
288 || lower.contains("declared but not used")
289 || lower.contains("unused")
290 || lower.contains("mismatch");
291 let positional = lower.contains(" error ")
292 || lower.starts_with("error:")
293 || lower.starts_with("fail")
294 || lower.contains("panic:");
295 (has_file_line && has_keyword) || positional
296 })
297 .take(32)
298 .collect::<Vec<_>>();
299 if !diagnostic_lines.is_empty() {
300 let diagnostics = diagnostic_lines.join("\n");
301 let budget = max_chars.saturating_sub(diagnostics.len() + 64);
302 let keep = budget / 2;
303 if keep >= 80 && output.len() > keep * 2 {
304 let head = &output[..keep];
305 let tail = &output[output.len() - keep..];
306 return format!(
307 "{head}\n\n[diagnostic lines preserved]\n{diagnostics}\n\n[... output compacted ...]\n\n{tail}"
308 );
309 }
310 }
311 let keep = max_chars / 2;
312 let head = &output[..keep];
313 let tail = &output[output.len() - keep..];
314 let snipped = output.len() - max_chars;
315 format!("{head}\n\n[... {snipped} characters snipped ...]\n\n{tail}")
316}
317
318fn format_compaction_messages(messages: &[serde_json::Value]) -> String {
319 messages
320 .iter()
321 .map(|msg| {
322 let role = msg
323 .get("role")
324 .and_then(|v| v.as_str())
325 .unwrap_or("user")
326 .to_uppercase();
327 let content = msg
328 .get("content")
329 .and_then(|v| v.as_str())
330 .unwrap_or_default();
331 format!("{role}: {content}")
332 })
333 .collect::<Vec<_>>()
334 .join("\n")
335}
336
337fn truncate_compaction_summary(
338 old_messages: &[serde_json::Value],
339 archived_count: usize,
340) -> String {
341 let summary_parts: Vec<String> = old_messages
342 .iter()
343 .filter_map(|m| {
344 let role = m.get("role")?.as_str()?;
345 let content = m.get("content")?.as_str()?;
346 if content.is_empty() {
347 return None;
348 }
349 let truncated = if content.len() > 500 {
350 format!("{}...", &content[..500])
351 } else {
352 content.to_string()
353 };
354 Some(format!("[{role}] {truncated}"))
355 })
356 .take(15)
357 .collect();
358 format!(
359 "[auto-compacted {archived_count} older messages]\n{}{}",
360 summary_parts.join("\n"),
361 if archived_count > 15 {
362 format!("\n... and {} more", archived_count - 15)
363 } else {
364 String::new()
365 }
366 )
367}
368
369fn compact_summary_text_from_value(value: &VmValue) -> Result<String, VmError> {
370 if let Some(map) = value.as_dict() {
371 if let Some(summary) = map.get("summary").or_else(|| map.get("text")) {
372 return Ok(summary.display());
373 }
374 }
375 match value {
376 VmValue::String(text) => Ok(text.to_string()),
377 VmValue::Nil => Ok(String::new()),
378 _ => serde_json::to_string_pretty(&vm_value_to_json(value))
379 .map_err(|e| VmError::Runtime(format!("custom compactor encode error: {e}"))),
380 }
381}
382
383async fn llm_compaction_summary(
384 old_messages: &[serde_json::Value],
385 archived_count: usize,
386 llm_opts: &crate::llm::api::LlmCallOptions,
387) -> Result<String, VmError> {
388 let mut compact_opts = llm_opts.clone();
389 let formatted = format_compaction_messages(old_messages);
390 compact_opts.system = None;
391 compact_opts.transcript_id = None;
392 compact_opts.transcript_summary = None;
393 compact_opts.transcript_metadata = None;
394 compact_opts.native_tools = None;
395 compact_opts.tool_choice = None;
396 compact_opts.response_format = None;
397 compact_opts.json_schema = None;
398 compact_opts.messages = vec![serde_json::json!({
399 "role": "user",
400 "content": format!(
401 "Summarize these archived conversation messages for a follow-on coding agent. Preserve goals, constraints, decisions, completed tool work, unresolved issues, and next actions. Output only the summary text.\n\nArchived message count: {archived_count}\n\nConversation:\n{formatted}"
402 ),
403 })];
404 let result = vm_call_llm_full(&compact_opts).await?;
405 let summary = result.text.trim();
406 if summary.is_empty() {
407 Ok(truncate_compaction_summary(old_messages, archived_count))
408 } else {
409 Ok(format!(
410 "[auto-compacted {archived_count} older messages]\n{summary}"
411 ))
412 }
413}
414
415async fn custom_compaction_summary(
416 old_messages: &[serde_json::Value],
417 archived_count: usize,
418 callback: &VmValue,
419) -> Result<String, VmError> {
420 let Some(VmValue::Closure(closure)) = Some(callback.clone()) else {
421 return Err(VmError::Runtime(
422 "compact_callback must be a closure when compact_strategy is 'custom'".to_string(),
423 ));
424 };
425 let mut vm = crate::vm::take_async_builtin_child_vm().ok_or_else(|| {
426 VmError::Runtime(
427 "custom transcript compaction requires an async builtin VM context".to_string(),
428 )
429 })?;
430 let messages_vm = VmValue::List(Rc::new(
431 old_messages
432 .iter()
433 .map(crate::stdlib::json_to_vm_value)
434 .collect(),
435 ));
436 let result = vm.call_closure_pub(&closure, &[messages_vm], &[]).await;
437 crate::vm::restore_async_builtin_child_vm(vm);
438 let summary = compact_summary_text_from_value(&result?)?;
439 if summary.trim().is_empty() {
440 Ok(truncate_compaction_summary(old_messages, archived_count))
441 } else {
442 Ok(format!(
443 "[auto-compacted {archived_count} older messages]\n{summary}"
444 ))
445 }
446}
447
448pub(crate) async fn auto_compact_messages(
451 messages: &mut Vec<serde_json::Value>,
452 config: &AutoCompactConfig,
453 llm_opts: Option<&crate::llm::api::LlmCallOptions>,
454) -> Result<bool, VmError> {
455 if messages.len() <= config.keep_last {
456 return Ok(false);
457 }
458 let split_at = messages.len().saturating_sub(config.keep_last);
459 let old_messages: Vec<_> = messages.drain(..split_at).collect();
460 let archived_count = old_messages.len();
461 let summary = match config.compact_strategy {
462 CompactStrategy::Truncate => truncate_compaction_summary(&old_messages, archived_count),
463 CompactStrategy::Llm => {
464 llm_compaction_summary(
465 &old_messages,
466 archived_count,
467 llm_opts.ok_or_else(|| {
468 VmError::Runtime(
469 "LLM transcript compaction requires active LLM call options".to_string(),
470 )
471 })?,
472 )
473 .await?
474 }
475 CompactStrategy::Custom => {
476 custom_compaction_summary(
477 &old_messages,
478 archived_count,
479 config.custom_compactor.as_ref().ok_or_else(|| {
480 VmError::Runtime(
481 "compact_callback is required when compact_strategy is 'custom'"
482 .to_string(),
483 )
484 })?,
485 )
486 .await?
487 }
488 };
489 messages.insert(
490 0,
491 serde_json::json!({
492 "role": "user",
493 "content": summary,
494 }),
495 );
496 Ok(true)
497}
498
499pub fn microcompact_artifact(artifact: &mut ArtifactRecord, max_tokens: usize) {
503 let max_chars = max_tokens * 4;
504 if let Some(ref text) = artifact.text {
505 if text.len() > max_chars && max_chars >= 200 {
506 artifact.text = Some(microcompact_tool_output(text, max_chars));
507 artifact.estimated_tokens = Some(max_tokens);
508 }
509 }
510}
511
512pub fn dedup_artifacts(artifacts: &mut Vec<ArtifactRecord>) {
515 let mut seen_hashes: BTreeSet<u64> = BTreeSet::new();
516 artifacts.retain(|artifact| {
517 let text = artifact.text.as_deref().unwrap_or("");
518 if text.is_empty() {
519 return true;
520 }
521 let hash = {
523 use std::hash::{Hash, Hasher};
524 let mut hasher = std::collections::hash_map::DefaultHasher::new();
525 text.hash(&mut hasher);
526 hasher.finish()
527 };
528 seen_hashes.insert(hash)
529 });
530}
531
532pub fn select_artifacts_adaptive(
535 mut artifacts: Vec<ArtifactRecord>,
536 policy: &ContextPolicy,
537) -> Vec<ArtifactRecord> {
538 dedup_artifacts(&mut artifacts);
540
541 if let Some(max_tokens) = policy.max_tokens {
545 let count = artifacts.len().max(1);
546 let per_artifact_budget = max_tokens / count;
547 let cap = per_artifact_budget.max(500).min(max_tokens);
549 for artifact in &mut artifacts {
550 let est = artifact.estimated_tokens.unwrap_or(0);
551 if est > cap * 2 {
552 microcompact_artifact(artifact, cap);
553 }
554 }
555 }
556
557 select_artifacts(artifacts, policy)
559}
560
561#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
565#[serde(default)]
566pub struct ToolArgConstraint {
567 pub tool: String,
569 pub arg_patterns: Vec<String>,
572}
573
574pub fn enforce_tool_arg_constraints(
576 policy: &CapabilityPolicy,
577 tool_name: &str,
578 args: &serde_json::Value,
579) -> Result<(), VmError> {
580 for constraint in &policy.tool_arg_constraints {
581 if !glob_match(&constraint.tool, tool_name) {
582 continue;
583 }
584 if constraint.arg_patterns.is_empty() {
585 continue;
586 }
587 let first_arg = args
589 .as_object()
590 .and_then(|o| o.values().next())
591 .and_then(|v| v.as_str())
592 .or_else(|| args.as_str())
593 .unwrap_or("");
594 let matches = constraint
595 .arg_patterns
596 .iter()
597 .any(|pattern| glob_match(pattern, first_arg));
598 if !matches {
599 return reject_policy(format!(
600 "tool '{tool_name}' argument '{first_arg}' does not match allowed patterns: {:?}",
601 constraint.arg_patterns
602 ));
603 }
604 }
605 Ok(())
606}
607
608fn normalize_artifact_kind(kind: &str) -> String {
609 match kind {
610 "resource"
611 | "workspace_file"
612 | "editor_selection"
613 | "workspace_snapshot"
614 | "transcript_summary"
615 | "summary"
616 | "plan"
617 | "diff"
618 | "git_diff"
619 | "patch"
620 | "patch_set"
621 | "patch_proposal"
622 | "diff_review"
623 | "review_decision"
624 | "verification_bundle"
625 | "apply_intent"
626 | "verification_result"
627 | "test_result"
628 | "command_result"
629 | "provider_payload"
630 | "worker_result"
631 | "worker_notification"
632 | "artifact" => kind.to_string(),
633 "file" => "workspace_file".to_string(),
634 "transcript" => "transcript_summary".to_string(),
635 "verification" => "verification_result".to_string(),
636 "test" => "test_result".to_string(),
637 other if other.trim().is_empty() => "artifact".to_string(),
638 other => other.to_string(),
639 }
640}
641
642fn default_artifact_priority(kind: &str) -> i64 {
643 match kind {
644 "verification_result" | "test_result" => 100,
645 "verification_bundle" => 95,
646 "diff" | "git_diff" | "patch" | "patch_set" | "patch_proposal" | "diff_review"
647 | "review_decision" | "apply_intent" => 90,
648 "plan" => 80,
649 "workspace_file" | "workspace_snapshot" | "editor_selection" | "resource" => 70,
650 "summary" | "transcript_summary" => 60,
651 "command_result" => 50,
652 _ => 40,
653 }
654}
655
656fn freshness_rank(value: Option<&str>) -> i64 {
657 match value.unwrap_or_default() {
658 "fresh" | "live" => 3,
659 "recent" => 2,
660 "stale" => 0,
661 _ => 1,
662 }
663}
664
665#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
666#[serde(default)]
667pub struct CapabilityPolicy {
668 pub tools: Vec<String>,
669 pub capabilities: BTreeMap<String, Vec<String>>,
670 pub workspace_roots: Vec<String>,
671 pub side_effect_level: Option<String>,
672 pub recursion_limit: Option<usize>,
673 #[serde(default)]
675 pub tool_arg_constraints: Vec<ToolArgConstraint>,
676}
677
678impl CapabilityPolicy {
679 pub fn intersect(&self, requested: &CapabilityPolicy) -> Result<CapabilityPolicy, String> {
680 let side_effect_level = match (&self.side_effect_level, &requested.side_effect_level) {
681 (Some(a), Some(b)) => Some(min_side_effect(a, b).to_string()),
682 (Some(a), None) => Some(a.clone()),
683 (None, Some(b)) => Some(b.clone()),
684 (None, None) => None,
685 };
686
687 if !self.tools.is_empty() {
688 let denied: Vec<String> = requested
689 .tools
690 .iter()
691 .filter(|tool| !self.tools.contains(*tool))
692 .cloned()
693 .collect();
694 if !denied.is_empty() {
695 return Err(format!(
696 "requested tools exceed host ceiling: {}",
697 denied.join(", ")
698 ));
699 }
700 }
701
702 for (capability, requested_ops) in &requested.capabilities {
703 if let Some(allowed_ops) = self.capabilities.get(capability) {
704 let denied: Vec<String> = requested_ops
705 .iter()
706 .filter(|op| !allowed_ops.contains(*op))
707 .cloned()
708 .collect();
709 if !denied.is_empty() {
710 return Err(format!(
711 "requested capability operations exceed host ceiling: {}.{}",
712 capability,
713 denied.join(",")
714 ));
715 }
716 } else if !self.capabilities.is_empty() {
717 return Err(format!(
718 "requested capability exceeds host ceiling: {capability}"
719 ));
720 }
721 }
722
723 let tools = if self.tools.is_empty() {
724 requested.tools.clone()
725 } else if requested.tools.is_empty() {
726 self.tools.clone()
727 } else {
728 requested
729 .tools
730 .iter()
731 .filter(|tool| self.tools.contains(*tool))
732 .cloned()
733 .collect()
734 };
735
736 let capabilities = if self.capabilities.is_empty() {
737 requested.capabilities.clone()
738 } else if requested.capabilities.is_empty() {
739 self.capabilities.clone()
740 } else {
741 requested
742 .capabilities
743 .iter()
744 .filter_map(|(capability, requested_ops)| {
745 self.capabilities.get(capability).map(|allowed_ops| {
746 (
747 capability.clone(),
748 requested_ops
749 .iter()
750 .filter(|op| allowed_ops.contains(*op))
751 .cloned()
752 .collect::<Vec<_>>(),
753 )
754 })
755 })
756 .collect()
757 };
758
759 let workspace_roots = if self.workspace_roots.is_empty() {
760 requested.workspace_roots.clone()
761 } else if requested.workspace_roots.is_empty() {
762 self.workspace_roots.clone()
763 } else {
764 requested
765 .workspace_roots
766 .iter()
767 .filter(|root| self.workspace_roots.contains(*root))
768 .cloned()
769 .collect()
770 };
771
772 let recursion_limit = match (self.recursion_limit, requested.recursion_limit) {
773 (Some(a), Some(b)) => Some(a.min(b)),
774 (Some(a), None) => Some(a),
775 (None, Some(b)) => Some(b),
776 (None, None) => None,
777 };
778
779 let mut tool_arg_constraints = self.tool_arg_constraints.clone();
781 tool_arg_constraints.extend(requested.tool_arg_constraints.clone());
782
783 Ok(CapabilityPolicy {
784 tools,
785 capabilities,
786 workspace_roots,
787 side_effect_level,
788 recursion_limit,
789 tool_arg_constraints,
790 })
791 }
792}
793
794fn min_side_effect<'a>(a: &'a str, b: &'a str) -> &'a str {
795 fn rank(v: &str) -> usize {
796 match v {
797 "none" => 0,
798 "read_only" => 1,
799 "workspace_write" => 2,
800 "process_exec" => 3,
801 "network" => 4,
802 _ => 5,
803 }
804 }
805 if rank(a) <= rank(b) {
806 a
807 } else {
808 b
809 }
810}
811
812#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
813#[serde(default)]
814pub struct ModelPolicy {
815 pub provider: Option<String>,
816 pub model: Option<String>,
817 pub model_tier: Option<String>,
818 pub temperature: Option<f64>,
819 pub max_tokens: Option<i64>,
820}
821
822#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
823#[serde(default)]
824pub struct TranscriptPolicy {
825 pub mode: Option<String>,
826 pub visibility: Option<String>,
827 pub summarize: bool,
828 pub compact: bool,
829 pub keep_last: Option<usize>,
830}
831
832#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
833#[serde(default)]
834pub struct ContextPolicy {
835 pub max_artifacts: Option<usize>,
836 pub max_tokens: Option<usize>,
837 pub reserve_tokens: Option<usize>,
838 pub include_kinds: Vec<String>,
839 pub exclude_kinds: Vec<String>,
840 pub prioritize_kinds: Vec<String>,
841 pub pinned_ids: Vec<String>,
842 pub include_stages: Vec<String>,
843 pub prefer_recent: bool,
844 pub prefer_fresh: bool,
845 pub render: Option<String>,
846}
847
848#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
849#[serde(default)]
850pub struct RetryPolicy {
851 pub max_attempts: usize,
852 pub verify: bool,
853 pub repair: bool,
854}
855
856#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
857#[serde(default)]
858pub struct StageContract {
859 pub input_kinds: Vec<String>,
860 pub output_kinds: Vec<String>,
861 pub min_inputs: Option<usize>,
862 pub max_inputs: Option<usize>,
863 pub require_transcript: bool,
864 pub schema: Option<serde_json::Value>,
865}
866
867#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
868#[serde(default)]
869pub struct BranchSemantics {
870 pub success: Option<String>,
871 pub failure: Option<String>,
872 pub verify_pass: Option<String>,
873 pub verify_fail: Option<String>,
874 pub condition_true: Option<String>,
875 pub condition_false: Option<String>,
876 pub loop_continue: Option<String>,
877 pub loop_exit: Option<String>,
878 pub escalation: Option<String>,
879}
880
881#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
882#[serde(default)]
883pub struct MapPolicy {
884 pub items: Vec<serde_json::Value>,
885 pub item_artifact_kind: Option<String>,
886 pub output_kind: Option<String>,
887 pub max_items: Option<usize>,
888}
889
890#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
891#[serde(default)]
892pub struct JoinPolicy {
893 pub strategy: String,
894 pub require_all_inputs: bool,
895 pub min_completed: Option<usize>,
896}
897
898#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
899#[serde(default)]
900pub struct ReducePolicy {
901 pub strategy: String,
902 pub separator: Option<String>,
903 pub output_kind: Option<String>,
904}
905
906#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
907#[serde(default)]
908pub struct EscalationPolicy {
909 pub level: Option<String>,
910 pub queue: Option<String>,
911 pub reason: Option<String>,
912}
913
914#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
915#[serde(default)]
916pub struct ArtifactRecord {
917 #[serde(rename = "_type")]
918 pub type_name: String,
919 pub id: String,
920 pub kind: String,
921 pub title: Option<String>,
922 pub text: Option<String>,
923 pub data: Option<serde_json::Value>,
924 pub source: Option<String>,
925 pub created_at: String,
926 pub freshness: Option<String>,
927 pub priority: Option<i64>,
928 pub lineage: Vec<String>,
929 pub relevance: Option<f64>,
930 pub estimated_tokens: Option<usize>,
931 pub stage: Option<String>,
932 pub metadata: BTreeMap<String, serde_json::Value>,
933}
934
935impl ArtifactRecord {
936 pub fn normalize(mut self) -> Self {
937 if self.type_name.is_empty() {
938 self.type_name = "artifact".to_string();
939 }
940 if self.id.is_empty() {
941 self.id = new_id("artifact");
942 }
943 if self.created_at.is_empty() {
944 self.created_at = now_rfc3339();
945 }
946 if self.kind.is_empty() {
947 self.kind = "artifact".to_string();
948 }
949 self.kind = normalize_artifact_kind(&self.kind);
950 if self.estimated_tokens.is_none() {
951 self.estimated_tokens = self
952 .text
953 .as_ref()
954 .map(|text| ((text.len() as f64) / 4.0).ceil() as usize);
955 }
956 if self.priority.is_none() {
957 self.priority = Some(default_artifact_priority(&self.kind));
958 }
959 self
960 }
961}
962
963#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
964#[serde(default)]
965pub struct WorkflowNode {
966 pub id: Option<String>,
967 pub kind: String,
968 pub mode: Option<String>,
969 pub prompt: Option<String>,
970 pub system: Option<String>,
971 pub task_label: Option<String>,
972 pub done_sentinel: Option<String>,
973 pub tools: serde_json::Value,
974 pub model_policy: ModelPolicy,
975 pub transcript_policy: TranscriptPolicy,
976 pub context_policy: ContextPolicy,
977 pub retry_policy: RetryPolicy,
978 pub capability_policy: CapabilityPolicy,
979 pub input_contract: StageContract,
980 pub output_contract: StageContract,
981 pub branch_semantics: BranchSemantics,
982 pub map_policy: MapPolicy,
983 pub join_policy: JoinPolicy,
984 pub reduce_policy: ReducePolicy,
985 pub escalation_policy: EscalationPolicy,
986 pub verify: Option<serde_json::Value>,
987 pub metadata: BTreeMap<String, serde_json::Value>,
988}
989
990pub fn workflow_tool_names(value: &serde_json::Value) -> Vec<String> {
991 match value {
992 serde_json::Value::Null => Vec::new(),
993 serde_json::Value::Array(items) => items
994 .iter()
995 .filter_map(|item| match item {
996 serde_json::Value::Object(map) => map
997 .get("name")
998 .and_then(|value| value.as_str())
999 .filter(|name| !name.is_empty())
1000 .map(|name| name.to_string()),
1001 _ => None,
1002 })
1003 .collect(),
1004 serde_json::Value::Object(map) => {
1005 if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
1006 return map
1007 .get("tools")
1008 .map(workflow_tool_names)
1009 .unwrap_or_default();
1010 }
1011 map.get("name")
1012 .and_then(|value| value.as_str())
1013 .filter(|name| !name.is_empty())
1014 .map(|name| vec![name.to_string()])
1015 .unwrap_or_default()
1016 }
1017 _ => Vec::new(),
1018 }
1019}
1020
1021#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1022#[serde(default)]
1023pub struct WorkflowEdge {
1024 pub from: String,
1025 pub to: String,
1026 pub branch: Option<String>,
1027 pub label: Option<String>,
1028}
1029
1030#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
1031#[serde(default)]
1032pub struct WorkflowGraph {
1033 #[serde(rename = "_type")]
1034 pub type_name: String,
1035 pub id: String,
1036 pub name: Option<String>,
1037 pub version: usize,
1038 pub entry: String,
1039 pub nodes: BTreeMap<String, WorkflowNode>,
1040 pub edges: Vec<WorkflowEdge>,
1041 pub capability_policy: CapabilityPolicy,
1042 pub metadata: BTreeMap<String, serde_json::Value>,
1043 pub audit_log: Vec<WorkflowAuditEntry>,
1044}
1045
1046#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
1047#[serde(default)]
1048pub struct WorkflowAuditEntry {
1049 pub id: String,
1050 pub op: String,
1051 pub node_id: Option<String>,
1052 pub timestamp: String,
1053 pub reason: Option<String>,
1054 pub metadata: BTreeMap<String, serde_json::Value>,
1055}
1056
1057#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
1058#[serde(default)]
1059pub struct LlmUsageRecord {
1060 pub input_tokens: i64,
1061 pub output_tokens: i64,
1062 pub total_duration_ms: i64,
1063 pub call_count: i64,
1064 pub total_cost: f64,
1065 pub models: Vec<String>,
1066}
1067
1068#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
1069#[serde(default)]
1070pub struct RunStageRecord {
1071 pub id: String,
1072 pub node_id: String,
1073 pub kind: String,
1074 pub status: String,
1075 pub outcome: String,
1076 pub branch: Option<String>,
1077 pub started_at: String,
1078 pub finished_at: Option<String>,
1079 pub visible_text: Option<String>,
1080 pub private_reasoning: Option<String>,
1081 pub transcript: Option<serde_json::Value>,
1082 pub verification: Option<serde_json::Value>,
1083 pub usage: Option<LlmUsageRecord>,
1084 pub artifacts: Vec<ArtifactRecord>,
1085 pub consumed_artifact_ids: Vec<String>,
1086 pub produced_artifact_ids: Vec<String>,
1087 pub attempts: Vec<RunStageAttemptRecord>,
1088 pub metadata: BTreeMap<String, serde_json::Value>,
1089}
1090
1091#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
1092#[serde(default)]
1093pub struct RunStageAttemptRecord {
1094 pub attempt: usize,
1095 pub status: String,
1096 pub outcome: String,
1097 pub branch: Option<String>,
1098 pub error: Option<String>,
1099 pub verification: Option<serde_json::Value>,
1100 pub started_at: String,
1101 pub finished_at: Option<String>,
1102}
1103
1104#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1105#[serde(default)]
1106pub struct RunTransitionRecord {
1107 pub id: String,
1108 pub from_stage_id: Option<String>,
1109 pub from_node_id: Option<String>,
1110 pub to_node_id: String,
1111 pub branch: Option<String>,
1112 pub timestamp: String,
1113 pub consumed_artifact_ids: Vec<String>,
1114 pub produced_artifact_ids: Vec<String>,
1115}
1116
1117#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1118#[serde(default)]
1119pub struct RunCheckpointRecord {
1120 pub id: String,
1121 pub ready_nodes: Vec<String>,
1122 pub completed_nodes: Vec<String>,
1123 pub last_stage_id: Option<String>,
1124 pub persisted_at: String,
1125 pub reason: String,
1126}
1127
1128#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1129#[serde(default)]
1130pub struct ReplayFixture {
1131 #[serde(rename = "_type")]
1132 pub type_name: String,
1133 pub id: String,
1134 pub source_run_id: String,
1135 pub workflow_id: String,
1136 pub workflow_name: Option<String>,
1137 pub created_at: String,
1138 pub expected_status: String,
1139 pub stage_assertions: Vec<ReplayStageAssertion>,
1140}
1141
1142#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1143#[serde(default)]
1144pub struct ReplayStageAssertion {
1145 pub node_id: String,
1146 pub expected_status: String,
1147 pub expected_outcome: String,
1148 pub expected_branch: Option<String>,
1149 pub required_artifact_kinds: Vec<String>,
1150 pub visible_text_contains: Option<String>,
1151}
1152
1153#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1154#[serde(default)]
1155pub struct ReplayEvalReport {
1156 pub pass: bool,
1157 pub failures: Vec<String>,
1158 pub stage_count: usize,
1159}
1160
1161#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1162#[serde(default)]
1163pub struct ReplayEvalCaseReport {
1164 pub run_id: String,
1165 pub workflow_id: String,
1166 pub label: Option<String>,
1167 pub pass: bool,
1168 pub failures: Vec<String>,
1169 pub stage_count: usize,
1170 pub source_path: Option<String>,
1171 pub comparison: Option<RunDiffReport>,
1172}
1173
1174#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1175#[serde(default)]
1176pub struct ReplayEvalSuiteReport {
1177 pub pass: bool,
1178 pub total: usize,
1179 pub passed: usize,
1180 pub failed: usize,
1181 pub cases: Vec<ReplayEvalCaseReport>,
1182}
1183
1184#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1185#[serde(default)]
1186pub struct RunStageDiffRecord {
1187 pub node_id: String,
1188 pub change: String,
1189 pub details: Vec<String>,
1190}
1191
1192#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1193#[serde(default)]
1194pub struct RunDiffReport {
1195 pub left_run_id: String,
1196 pub right_run_id: String,
1197 pub identical: bool,
1198 pub status_changed: bool,
1199 pub left_status: String,
1200 pub right_status: String,
1201 pub stage_diffs: Vec<RunStageDiffRecord>,
1202 pub transition_count_delta: isize,
1203 pub artifact_count_delta: isize,
1204 pub checkpoint_count_delta: isize,
1205}
1206
1207#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1208#[serde(default)]
1209pub struct EvalSuiteManifest {
1210 #[serde(rename = "_type")]
1211 pub type_name: String,
1212 pub id: String,
1213 pub name: Option<String>,
1214 pub base_dir: Option<String>,
1215 pub cases: Vec<EvalSuiteCase>,
1216}
1217
1218#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1219#[serde(default)]
1220pub struct EvalSuiteCase {
1221 pub label: Option<String>,
1222 pub run_path: String,
1223 pub fixture_path: Option<String>,
1224 pub compare_to: Option<String>,
1225}
1226
1227#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
1228#[serde(default)]
1229pub struct RunRecord {
1230 #[serde(rename = "_type")]
1231 pub type_name: String,
1232 pub id: String,
1233 pub workflow_id: String,
1234 pub workflow_name: Option<String>,
1235 pub task: String,
1236 pub status: String,
1237 pub started_at: String,
1238 pub finished_at: Option<String>,
1239 pub parent_run_id: Option<String>,
1240 pub root_run_id: Option<String>,
1241 pub stages: Vec<RunStageRecord>,
1242 pub transitions: Vec<RunTransitionRecord>,
1243 pub checkpoints: Vec<RunCheckpointRecord>,
1244 pub pending_nodes: Vec<String>,
1245 pub completed_nodes: Vec<String>,
1246 pub child_runs: Vec<RunChildRecord>,
1247 pub artifacts: Vec<ArtifactRecord>,
1248 pub policy: CapabilityPolicy,
1249 pub execution: Option<RunExecutionRecord>,
1250 pub transcript: Option<serde_json::Value>,
1251 pub usage: Option<LlmUsageRecord>,
1252 pub replay_fixture: Option<ReplayFixture>,
1253 pub metadata: BTreeMap<String, serde_json::Value>,
1254 pub persisted_path: Option<String>,
1255}
1256
1257#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1258#[serde(default)]
1259pub struct RunChildRecord {
1260 pub worker_id: String,
1261 pub worker_name: String,
1262 pub parent_stage_id: Option<String>,
1263 pub session_id: Option<String>,
1264 pub parent_session_id: Option<String>,
1265 pub mutation_scope: Option<String>,
1266 pub approval_mode: Option<String>,
1267 pub task: String,
1268 pub status: String,
1269 pub started_at: String,
1270 pub finished_at: Option<String>,
1271 pub run_id: Option<String>,
1272 pub run_path: Option<String>,
1273 pub snapshot_path: Option<String>,
1274 pub execution: Option<RunExecutionRecord>,
1275}
1276
1277#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1278#[serde(default)]
1279pub struct RunExecutionRecord {
1280 pub cwd: Option<String>,
1281 pub source_dir: Option<String>,
1282 pub env: BTreeMap<String, String>,
1283 pub adapter: Option<String>,
1284 pub repo_path: Option<String>,
1285 pub worktree_path: Option<String>,
1286 pub branch: Option<String>,
1287 pub base_ref: Option<String>,
1288 pub cleanup: Option<String>,
1289}
1290
1291#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1292#[serde(default)]
1293pub struct WorkflowValidationReport {
1294 pub valid: bool,
1295 pub errors: Vec<String>,
1296 pub warnings: Vec<String>,
1297 pub reachable_nodes: Vec<String>,
1298}
1299
1300fn parse_json_payload<T: for<'de> Deserialize<'de>>(
1301 json: serde_json::Value,
1302 label: &str,
1303) -> Result<T, VmError> {
1304 let payload = json.to_string();
1305 let mut deserializer = serde_json::Deserializer::from_str(&payload);
1306 let mut tracker = serde_path_to_error::Track::new();
1307 let path_deserializer = serde_path_to_error::Deserializer::new(&mut deserializer, &mut tracker);
1308 T::deserialize(path_deserializer).map_err(|error| {
1309 let snippet = if payload.len() > 600 {
1310 format!("{}...", &payload[..600])
1311 } else {
1312 payload.clone()
1313 };
1314 VmError::Runtime(format!(
1315 "{label} parse error at {}: {} | payload={}",
1316 tracker.path(),
1317 error,
1318 snippet
1319 ))
1320 })
1321}
1322
1323fn parse_json_value<T: for<'de> Deserialize<'de>>(value: &VmValue) -> Result<T, VmError> {
1324 parse_json_payload(vm_value_to_json(value), "orchestration")
1325}
1326
1327pub fn parse_workflow_node_value(value: &VmValue, label: &str) -> Result<WorkflowNode, VmError> {
1328 parse_json_payload(vm_value_to_json(value), label)
1329}
1330
1331pub fn parse_workflow_node_json(
1332 json: serde_json::Value,
1333 label: &str,
1334) -> Result<WorkflowNode, VmError> {
1335 parse_json_payload(json, label)
1336}
1337
1338pub fn parse_workflow_edge_json(
1339 json: serde_json::Value,
1340 label: &str,
1341) -> Result<WorkflowEdge, VmError> {
1342 parse_json_payload(json, label)
1343}
1344
1345pub fn normalize_workflow_value(value: &VmValue) -> Result<WorkflowGraph, VmError> {
1346 let mut graph: WorkflowGraph = parse_json_value(value)?;
1347 let as_dict = value.as_dict().cloned().unwrap_or_default();
1348
1349 if graph.nodes.is_empty() {
1350 for key in ["act", "verify", "repair"] {
1351 if let Some(node_value) = as_dict.get(key) {
1352 let mut node: WorkflowNode = parse_json_value(node_value)?;
1353 let raw_node = node_value.as_dict().cloned().unwrap_or_default();
1354 node.id = Some(key.to_string());
1355 if node.kind.is_empty() {
1356 node.kind = if key == "verify" {
1357 "verify".to_string()
1358 } else {
1359 "stage".to_string()
1360 };
1361 }
1362 if node.model_policy.provider.is_none() {
1363 node.model_policy.provider = as_dict
1364 .get("provider")
1365 .map(|value| value.display())
1366 .filter(|value| !value.is_empty());
1367 }
1368 if node.model_policy.model.is_none() {
1369 node.model_policy.model = as_dict
1370 .get("model")
1371 .map(|value| value.display())
1372 .filter(|value| !value.is_empty());
1373 }
1374 if node.model_policy.model_tier.is_none() {
1375 node.model_policy.model_tier = as_dict
1376 .get("model_tier")
1377 .or_else(|| as_dict.get("tier"))
1378 .map(|value| value.display())
1379 .filter(|value| !value.is_empty());
1380 }
1381 if node.model_policy.temperature.is_none() {
1382 node.model_policy.temperature = as_dict.get("temperature").and_then(|value| {
1383 if let VmValue::Float(number) = value {
1384 Some(*number)
1385 } else {
1386 value.as_int().map(|number| number as f64)
1387 }
1388 });
1389 }
1390 if node.model_policy.max_tokens.is_none() {
1391 node.model_policy.max_tokens =
1392 as_dict.get("max_tokens").and_then(|value| value.as_int());
1393 }
1394 if node.mode.is_none() {
1395 node.mode = as_dict
1396 .get("mode")
1397 .map(|value| value.display())
1398 .filter(|value| !value.is_empty());
1399 }
1400 if node.done_sentinel.is_none() {
1401 node.done_sentinel = as_dict
1402 .get("done_sentinel")
1403 .map(|value| value.display())
1404 .filter(|value| !value.is_empty());
1405 }
1406 if key == "verify"
1407 && node.verify.is_none()
1408 && (raw_node.contains_key("assert_text")
1409 || raw_node.contains_key("command")
1410 || raw_node.contains_key("expect_status")
1411 || raw_node.contains_key("expect_text"))
1412 {
1413 node.verify = Some(serde_json::json!({
1414 "assert_text": raw_node.get("assert_text").map(vm_value_to_json),
1415 "command": raw_node.get("command").map(vm_value_to_json),
1416 "expect_status": raw_node.get("expect_status").map(vm_value_to_json),
1417 "expect_text": raw_node.get("expect_text").map(vm_value_to_json),
1418 }));
1419 }
1420 graph.nodes.insert(key.to_string(), node);
1421 }
1422 }
1423 if graph.entry.is_empty() && graph.nodes.contains_key("act") {
1424 graph.entry = "act".to_string();
1425 }
1426 if graph.edges.is_empty() && graph.nodes.contains_key("act") {
1427 if graph.nodes.contains_key("verify") {
1428 graph.edges.push(WorkflowEdge {
1429 from: "act".to_string(),
1430 to: "verify".to_string(),
1431 branch: None,
1432 label: None,
1433 });
1434 }
1435 if graph.nodes.contains_key("repair") {
1436 graph.edges.push(WorkflowEdge {
1437 from: "verify".to_string(),
1438 to: "repair".to_string(),
1439 branch: Some("failed".to_string()),
1440 label: None,
1441 });
1442 graph.edges.push(WorkflowEdge {
1443 from: "repair".to_string(),
1444 to: "verify".to_string(),
1445 branch: Some("retry".to_string()),
1446 label: None,
1447 });
1448 }
1449 }
1450 }
1451
1452 if graph.type_name.is_empty() {
1453 graph.type_name = "workflow_graph".to_string();
1454 }
1455 if graph.id.is_empty() {
1456 graph.id = new_id("workflow");
1457 }
1458 if graph.version == 0 {
1459 graph.version = 1;
1460 }
1461 if graph.entry.is_empty() {
1462 graph.entry = graph
1463 .nodes
1464 .keys()
1465 .next()
1466 .cloned()
1467 .unwrap_or_else(|| "act".to_string());
1468 }
1469 for (node_id, node) in &mut graph.nodes {
1470 if node.id.is_none() {
1471 node.id = Some(node_id.clone());
1472 }
1473 if node.kind.is_empty() {
1474 node.kind = "stage".to_string();
1475 }
1476 if node.join_policy.strategy.is_empty() {
1477 node.join_policy.strategy = "all".to_string();
1478 }
1479 if node.reduce_policy.strategy.is_empty() {
1480 node.reduce_policy.strategy = "concat".to_string();
1481 }
1482 if node.output_contract.output_kinds.is_empty() {
1483 node.output_contract.output_kinds = vec![match node.kind.as_str() {
1484 "verify" => "verification_result".to_string(),
1485 "reduce" => node
1486 .reduce_policy
1487 .output_kind
1488 .clone()
1489 .unwrap_or_else(|| "summary".to_string()),
1490 "map" => node
1491 .map_policy
1492 .output_kind
1493 .clone()
1494 .unwrap_or_else(|| "artifact".to_string()),
1495 "escalation" => "plan".to_string(),
1496 _ => "artifact".to_string(),
1497 }];
1498 }
1499 if node.retry_policy.max_attempts == 0 {
1500 node.retry_policy.max_attempts = 1;
1501 }
1502 }
1503 Ok(graph)
1504}
1505
1506pub fn validate_workflow(
1507 graph: &WorkflowGraph,
1508 ceiling: Option<&CapabilityPolicy>,
1509) -> WorkflowValidationReport {
1510 let mut errors = Vec::new();
1511 let mut warnings = Vec::new();
1512
1513 if !graph.nodes.contains_key(&graph.entry) {
1514 errors.push(format!("entry node does not exist: {}", graph.entry));
1515 }
1516
1517 let node_ids: BTreeSet<String> = graph.nodes.keys().cloned().collect();
1518 for edge in &graph.edges {
1519 if !node_ids.contains(&edge.from) {
1520 errors.push(format!("edge.from references unknown node: {}", edge.from));
1521 }
1522 if !node_ids.contains(&edge.to) {
1523 errors.push(format!("edge.to references unknown node: {}", edge.to));
1524 }
1525 }
1526
1527 let reachable_nodes = reachable_nodes(graph);
1528 for node_id in &node_ids {
1529 if !reachable_nodes.contains(node_id) {
1530 warnings.push(format!("node is unreachable: {node_id}"));
1531 }
1532 }
1533
1534 for (node_id, node) in &graph.nodes {
1535 let incoming = graph
1536 .edges
1537 .iter()
1538 .filter(|edge| edge.to == *node_id)
1539 .count();
1540 let outgoing: Vec<&WorkflowEdge> = graph
1541 .edges
1542 .iter()
1543 .filter(|edge| edge.from == *node_id)
1544 .collect();
1545 if let Some(min_inputs) = node.input_contract.min_inputs {
1546 if let Some(max_inputs) = node.input_contract.max_inputs {
1547 if min_inputs > max_inputs {
1548 errors.push(format!(
1549 "node {node_id}: input contract min_inputs exceeds max_inputs"
1550 ));
1551 }
1552 }
1553 }
1554 match node.kind.as_str() {
1555 "condition" => {
1556 let has_true = outgoing
1557 .iter()
1558 .any(|edge| edge.branch.as_deref() == Some("true"));
1559 let has_false = outgoing
1560 .iter()
1561 .any(|edge| edge.branch.as_deref() == Some("false"));
1562 if !has_true || !has_false {
1563 errors.push(format!(
1564 "node {node_id}: condition nodes require both 'true' and 'false' branch edges"
1565 ));
1566 }
1567 }
1568 "fork" => {
1569 if outgoing.len() < 2 {
1570 errors.push(format!(
1571 "node {node_id}: fork nodes require at least two outgoing edges"
1572 ));
1573 }
1574 }
1575 "join" => {
1576 if incoming < 2 {
1577 warnings.push(format!(
1578 "node {node_id}: join node has fewer than two incoming edges"
1579 ));
1580 }
1581 }
1582 "map" => {
1583 if node.map_policy.items.is_empty()
1584 && node.map_policy.item_artifact_kind.is_none()
1585 && node.input_contract.input_kinds.is_empty()
1586 {
1587 errors.push(format!(
1588 "node {node_id}: map nodes require items, item_artifact_kind, or input_contract.input_kinds"
1589 ));
1590 }
1591 }
1592 "reduce" => {
1593 if node.input_contract.input_kinds.is_empty() {
1594 warnings.push(format!(
1595 "node {node_id}: reduce node has no input_contract.input_kinds; it will consume all available artifacts"
1596 ));
1597 }
1598 }
1599 _ => {}
1600 }
1601 }
1602
1603 if let Some(ceiling) = ceiling {
1604 if let Err(error) = ceiling.intersect(&graph.capability_policy) {
1605 errors.push(error);
1606 }
1607 for (node_id, node) in &graph.nodes {
1608 if let Err(error) = ceiling.intersect(&node.capability_policy) {
1609 errors.push(format!("node {node_id}: {error}"));
1610 }
1611 }
1612 }
1613
1614 WorkflowValidationReport {
1615 valid: errors.is_empty(),
1616 errors,
1617 warnings,
1618 reachable_nodes: reachable_nodes.into_iter().collect(),
1619 }
1620}
1621
1622fn reachable_nodes(graph: &WorkflowGraph) -> BTreeSet<String> {
1623 let mut seen = BTreeSet::new();
1624 let mut stack = vec![graph.entry.clone()];
1625 while let Some(node_id) = stack.pop() {
1626 if !seen.insert(node_id.clone()) {
1627 continue;
1628 }
1629 for edge in graph.edges.iter().filter(|edge| edge.from == node_id) {
1630 stack.push(edge.to.clone());
1631 }
1632 }
1633 seen
1634}
1635
1636pub fn select_artifacts(
1637 mut artifacts: Vec<ArtifactRecord>,
1638 policy: &ContextPolicy,
1639) -> Vec<ArtifactRecord> {
1640 artifacts.retain(|artifact| {
1641 (policy.include_kinds.is_empty() || policy.include_kinds.contains(&artifact.kind))
1642 && !policy.exclude_kinds.contains(&artifact.kind)
1643 && (policy.include_stages.is_empty()
1644 || artifact
1645 .stage
1646 .as_ref()
1647 .is_some_and(|stage| policy.include_stages.contains(stage)))
1648 });
1649 artifacts.sort_by(|a, b| {
1650 let b_pinned = policy.pinned_ids.contains(&b.id);
1651 let a_pinned = policy.pinned_ids.contains(&a.id);
1652 b_pinned
1653 .cmp(&a_pinned)
1654 .then_with(|| {
1655 let b_prio_kind = policy.prioritize_kinds.contains(&b.kind);
1656 let a_prio_kind = policy.prioritize_kinds.contains(&a.kind);
1657 b_prio_kind.cmp(&a_prio_kind)
1658 })
1659 .then_with(|| {
1660 b.priority
1661 .unwrap_or_default()
1662 .cmp(&a.priority.unwrap_or_default())
1663 })
1664 .then_with(|| {
1665 if policy.prefer_fresh {
1666 freshness_rank(b.freshness.as_deref())
1667 .cmp(&freshness_rank(a.freshness.as_deref()))
1668 } else {
1669 std::cmp::Ordering::Equal
1670 }
1671 })
1672 .then_with(|| {
1673 if policy.prefer_recent {
1674 b.created_at.cmp(&a.created_at)
1675 } else {
1676 std::cmp::Ordering::Equal
1677 }
1678 })
1679 .then_with(|| {
1680 b.relevance
1681 .partial_cmp(&a.relevance)
1682 .unwrap_or(std::cmp::Ordering::Equal)
1683 })
1684 .then_with(|| {
1685 a.estimated_tokens
1686 .unwrap_or(usize::MAX)
1687 .cmp(&b.estimated_tokens.unwrap_or(usize::MAX))
1688 })
1689 });
1690
1691 let mut selected = Vec::new();
1692 let mut used_tokens = 0usize;
1693 let reserve_tokens = policy.reserve_tokens.unwrap_or(0);
1694 let effective_max_tokens = policy
1695 .max_tokens
1696 .map(|max| max.saturating_sub(reserve_tokens));
1697 for artifact in artifacts {
1698 if let Some(max_artifacts) = policy.max_artifacts {
1699 if selected.len() >= max_artifacts {
1700 break;
1701 }
1702 }
1703 let next_tokens = artifact.estimated_tokens.unwrap_or(0);
1704 if let Some(max_tokens) = effective_max_tokens {
1705 if used_tokens + next_tokens > max_tokens {
1706 continue;
1707 }
1708 }
1709 used_tokens += next_tokens;
1710 selected.push(artifact);
1711 }
1712 selected
1713}
1714
1715pub fn render_artifacts_context(artifacts: &[ArtifactRecord], policy: &ContextPolicy) -> String {
1716 let mut parts = Vec::new();
1717 for artifact in artifacts {
1718 let title = artifact
1719 .title
1720 .clone()
1721 .unwrap_or_else(|| format!("{} {}", artifact.kind, artifact.id));
1722 let body = artifact
1723 .text
1724 .clone()
1725 .or_else(|| artifact.data.as_ref().map(|v| v.to_string()))
1726 .unwrap_or_default();
1727 match policy.render.as_deref() {
1728 Some("json") => {
1729 parts.push(
1730 serde_json::json!({
1731 "id": artifact.id,
1732 "kind": artifact.kind,
1733 "title": title,
1734 "source": artifact.source,
1735 "freshness": artifact.freshness,
1736 "priority": artifact.priority,
1737 "text": body,
1738 })
1739 .to_string(),
1740 );
1741 }
1742 _ => parts.push(format!(
1743 "[{title}] kind={} source={} freshness={} priority={}\n{}",
1744 artifact.kind,
1745 artifact
1746 .source
1747 .clone()
1748 .unwrap_or_else(|| "unknown".to_string()),
1749 artifact
1750 .freshness
1751 .clone()
1752 .unwrap_or_else(|| "normal".to_string()),
1753 artifact.priority.unwrap_or_default(),
1754 body
1755 )),
1756 }
1757 }
1758 parts.join("\n\n")
1759}
1760
1761pub fn normalize_artifact(value: &VmValue) -> Result<ArtifactRecord, VmError> {
1762 let artifact: ArtifactRecord = parse_json_value(value)?;
1763 Ok(artifact.normalize())
1764}
1765
1766pub fn normalize_run_record(value: &VmValue) -> Result<RunRecord, VmError> {
1767 let json = vm_value_to_json(value);
1768 let payload = json.to_string();
1769 let mut deserializer = serde_json::Deserializer::from_str(&payload);
1770 let mut tracker = serde_path_to_error::Track::new();
1771 let path_deserializer = serde_path_to_error::Deserializer::new(&mut deserializer, &mut tracker);
1772 let mut run: RunRecord = RunRecord::deserialize(path_deserializer).map_err(|error| {
1773 let snippet = if payload.len() > 600 {
1774 format!("{}...", &payload[..600])
1775 } else {
1776 payload.clone()
1777 };
1778 VmError::Runtime(format!(
1779 "orchestration parse error at {}: {} | payload={}",
1780 tracker.path(),
1781 error,
1782 snippet
1783 ))
1784 })?;
1785 if run.type_name.is_empty() {
1786 run.type_name = "run_record".to_string();
1787 }
1788 if run.id.is_empty() {
1789 run.id = new_id("run");
1790 }
1791 if run.started_at.is_empty() {
1792 run.started_at = now_rfc3339();
1793 }
1794 if run.status.is_empty() {
1795 run.status = "running".to_string();
1796 }
1797 if run.root_run_id.is_none() {
1798 run.root_run_id = Some(run.id.clone());
1799 }
1800 if run.replay_fixture.is_none() {
1801 run.replay_fixture = Some(replay_fixture_from_run(&run));
1802 }
1803 Ok(run)
1804}
1805
1806pub fn normalize_eval_suite_manifest(value: &VmValue) -> Result<EvalSuiteManifest, VmError> {
1807 let mut manifest: EvalSuiteManifest = parse_json_value(value)?;
1808 if manifest.type_name.is_empty() {
1809 manifest.type_name = "eval_suite_manifest".to_string();
1810 }
1811 if manifest.id.is_empty() {
1812 manifest.id = new_id("eval_suite");
1813 }
1814 Ok(manifest)
1815}
1816
1817fn load_replay_fixture(path: &Path) -> Result<ReplayFixture, VmError> {
1818 let content = std::fs::read_to_string(path)
1819 .map_err(|e| VmError::Runtime(format!("failed to read replay fixture: {e}")))?;
1820 serde_json::from_str(&content)
1821 .map_err(|e| VmError::Runtime(format!("failed to parse replay fixture: {e}")))
1822}
1823
1824fn resolve_manifest_path(base_dir: Option<&Path>, path: &str) -> PathBuf {
1825 let path_buf = PathBuf::from(path);
1826 if path_buf.is_absolute() {
1827 path_buf
1828 } else if let Some(base_dir) = base_dir {
1829 base_dir.join(path_buf)
1830 } else {
1831 path_buf
1832 }
1833}
1834
1835pub fn evaluate_run_suite_manifest(
1836 manifest: &EvalSuiteManifest,
1837) -> Result<ReplayEvalSuiteReport, VmError> {
1838 let base_dir = manifest.base_dir.as_deref().map(Path::new);
1839 let mut reports = Vec::new();
1840 for case in &manifest.cases {
1841 let run_path = resolve_manifest_path(base_dir, &case.run_path);
1842 let run = load_run_record(&run_path)?;
1843 let fixture = match &case.fixture_path {
1844 Some(path) => load_replay_fixture(&resolve_manifest_path(base_dir, path))?,
1845 None => run
1846 .replay_fixture
1847 .clone()
1848 .unwrap_or_else(|| replay_fixture_from_run(&run)),
1849 };
1850 let eval = evaluate_run_against_fixture(&run, &fixture);
1851 let mut pass = eval.pass;
1852 let mut failures = eval.failures;
1853 let comparison = match &case.compare_to {
1854 Some(path) => {
1855 let baseline_path = resolve_manifest_path(base_dir, path);
1856 let baseline = load_run_record(&baseline_path)?;
1857 let diff = diff_run_records(&baseline, &run);
1858 if !diff.identical {
1859 pass = false;
1860 failures.push(format!(
1861 "run differs from baseline {} with {} stage changes",
1862 baseline_path.display(),
1863 diff.stage_diffs.len()
1864 ));
1865 }
1866 Some(diff)
1867 }
1868 None => None,
1869 };
1870 reports.push(ReplayEvalCaseReport {
1871 run_id: run.id.clone(),
1872 workflow_id: run.workflow_id.clone(),
1873 label: case.label.clone(),
1874 pass,
1875 failures,
1876 stage_count: eval.stage_count,
1877 source_path: Some(run_path.display().to_string()),
1878 comparison,
1879 });
1880 }
1881 let total = reports.len();
1882 let passed = reports.iter().filter(|report| report.pass).count();
1883 let failed = total.saturating_sub(passed);
1884 Ok(ReplayEvalSuiteReport {
1885 pass: failed == 0,
1886 total,
1887 passed,
1888 failed,
1889 cases: reports,
1890 })
1891}
1892
1893pub fn render_unified_diff(path: Option<&str>, before: &str, after: &str) -> String {
1894 let before_lines: Vec<&str> = before.lines().collect();
1895 let after_lines: Vec<&str> = after.lines().collect();
1896 let mut table = vec![vec![0usize; after_lines.len() + 1]; before_lines.len() + 1];
1897 for i in (0..before_lines.len()).rev() {
1898 for j in (0..after_lines.len()).rev() {
1899 table[i][j] = if before_lines[i] == after_lines[j] {
1900 table[i + 1][j + 1] + 1
1901 } else {
1902 table[i + 1][j].max(table[i][j + 1])
1903 };
1904 }
1905 }
1906
1907 let mut diff = String::new();
1908 let file = path.unwrap_or("artifact");
1909 diff.push_str(&format!("--- a/{file}\n+++ b/{file}\n"));
1910 let mut i = 0;
1911 let mut j = 0;
1912 while i < before_lines.len() && j < after_lines.len() {
1913 if before_lines[i] == after_lines[j] {
1914 diff.push_str(&format!(" {}\n", before_lines[i]));
1915 i += 1;
1916 j += 1;
1917 } else if table[i + 1][j] >= table[i][j + 1] {
1918 diff.push_str(&format!("-{}\n", before_lines[i]));
1919 i += 1;
1920 } else {
1921 diff.push_str(&format!("+{}\n", after_lines[j]));
1922 j += 1;
1923 }
1924 }
1925 while i < before_lines.len() {
1926 diff.push_str(&format!("-{}\n", before_lines[i]));
1927 i += 1;
1928 }
1929 while j < after_lines.len() {
1930 diff.push_str(&format!("+{}\n", after_lines[j]));
1931 j += 1;
1932 }
1933 diff
1934}
1935
1936pub fn save_run_record(run: &RunRecord, path: Option<&str>) -> Result<String, VmError> {
1937 let path = path
1938 .map(PathBuf::from)
1939 .unwrap_or_else(|| default_run_dir().join(format!("{}.json", run.id)));
1940 if let Some(parent) = path.parent() {
1941 std::fs::create_dir_all(parent)
1942 .map_err(|e| VmError::Runtime(format!("failed to create run directory: {e}")))?;
1943 }
1944 let json = serde_json::to_string_pretty(run)
1945 .map_err(|e| VmError::Runtime(format!("failed to encode run record: {e}")))?;
1946 let tmp_path = path.with_extension("json.tmp");
1948 std::fs::write(&tmp_path, &json)
1949 .map_err(|e| VmError::Runtime(format!("failed to persist run record: {e}")))?;
1950 std::fs::rename(&tmp_path, &path).map_err(|e| {
1951 let _ = std::fs::write(&path, &json);
1953 VmError::Runtime(format!("failed to finalize run record: {e}"))
1954 })?;
1955 Ok(path.to_string_lossy().to_string())
1956}
1957
1958pub fn load_run_record(path: &Path) -> Result<RunRecord, VmError> {
1959 let content = std::fs::read_to_string(path)
1960 .map_err(|e| VmError::Runtime(format!("failed to read run record: {e}")))?;
1961 serde_json::from_str(&content)
1962 .map_err(|e| VmError::Runtime(format!("failed to parse run record: {e}")))
1963}
1964
1965pub fn replay_fixture_from_run(run: &RunRecord) -> ReplayFixture {
1966 ReplayFixture {
1967 type_name: "replay_fixture".to_string(),
1968 id: new_id("fixture"),
1969 source_run_id: run.id.clone(),
1970 workflow_id: run.workflow_id.clone(),
1971 workflow_name: run.workflow_name.clone(),
1972 created_at: now_rfc3339(),
1973 expected_status: run.status.clone(),
1974 stage_assertions: run
1975 .stages
1976 .iter()
1977 .map(|stage| ReplayStageAssertion {
1978 node_id: stage.node_id.clone(),
1979 expected_status: stage.status.clone(),
1980 expected_outcome: stage.outcome.clone(),
1981 expected_branch: stage.branch.clone(),
1982 required_artifact_kinds: stage
1983 .artifacts
1984 .iter()
1985 .map(|artifact| artifact.kind.clone())
1986 .collect(),
1987 visible_text_contains: stage
1988 .visible_text
1989 .as_ref()
1990 .filter(|text| !text.is_empty())
1991 .map(|text| text.chars().take(80).collect()),
1992 })
1993 .collect(),
1994 }
1995}
1996
1997pub fn evaluate_run_against_fixture(run: &RunRecord, fixture: &ReplayFixture) -> ReplayEvalReport {
1998 let mut failures = Vec::new();
1999 if run.status != fixture.expected_status {
2000 failures.push(format!(
2001 "run status mismatch: expected {}, got {}",
2002 fixture.expected_status, run.status
2003 ));
2004 }
2005 for assertion in &fixture.stage_assertions {
2006 let Some(stage) = run
2007 .stages
2008 .iter()
2009 .find(|stage| stage.node_id == assertion.node_id)
2010 else {
2011 failures.push(format!("missing stage {}", assertion.node_id));
2012 continue;
2013 };
2014 if stage.status != assertion.expected_status {
2015 failures.push(format!(
2016 "stage {} status mismatch: expected {}, got {}",
2017 assertion.node_id, assertion.expected_status, stage.status
2018 ));
2019 }
2020 if stage.outcome != assertion.expected_outcome {
2021 failures.push(format!(
2022 "stage {} outcome mismatch: expected {}, got {}",
2023 assertion.node_id, assertion.expected_outcome, stage.outcome
2024 ));
2025 }
2026 if stage.branch != assertion.expected_branch {
2027 failures.push(format!(
2028 "stage {} branch mismatch: expected {:?}, got {:?}",
2029 assertion.node_id, assertion.expected_branch, stage.branch
2030 ));
2031 }
2032 for required_kind in &assertion.required_artifact_kinds {
2033 if !stage
2034 .artifacts
2035 .iter()
2036 .any(|artifact| &artifact.kind == required_kind)
2037 {
2038 failures.push(format!(
2039 "stage {} missing artifact kind {}",
2040 assertion.node_id, required_kind
2041 ));
2042 }
2043 }
2044 if let Some(snippet) = &assertion.visible_text_contains {
2045 let actual = stage.visible_text.clone().unwrap_or_default();
2046 if !actual.contains(snippet) {
2047 failures.push(format!(
2048 "stage {} visible text does not contain expected snippet {:?}",
2049 assertion.node_id, snippet
2050 ));
2051 }
2052 }
2053 }
2054
2055 ReplayEvalReport {
2056 pass: failures.is_empty(),
2057 failures,
2058 stage_count: run.stages.len(),
2059 }
2060}
2061
2062pub fn evaluate_run_suite(
2063 cases: Vec<(RunRecord, ReplayFixture, Option<String>)>,
2064) -> ReplayEvalSuiteReport {
2065 let mut reports = Vec::new();
2066 for (run, fixture, source_path) in cases {
2067 let report = evaluate_run_against_fixture(&run, &fixture);
2068 reports.push(ReplayEvalCaseReport {
2069 run_id: run.id.clone(),
2070 workflow_id: run.workflow_id.clone(),
2071 label: None,
2072 pass: report.pass,
2073 failures: report.failures,
2074 stage_count: report.stage_count,
2075 source_path,
2076 comparison: None,
2077 });
2078 }
2079 let total = reports.len();
2080 let passed = reports.iter().filter(|report| report.pass).count();
2081 let failed = total.saturating_sub(passed);
2082 ReplayEvalSuiteReport {
2083 pass: failed == 0,
2084 total,
2085 passed,
2086 failed,
2087 cases: reports,
2088 }
2089}
2090
2091pub fn diff_run_records(left: &RunRecord, right: &RunRecord) -> RunDiffReport {
2092 let mut stage_diffs = Vec::new();
2093 let mut all_node_ids = BTreeSet::new();
2094 all_node_ids.extend(left.stages.iter().map(|stage| stage.node_id.clone()));
2095 all_node_ids.extend(right.stages.iter().map(|stage| stage.node_id.clone()));
2096
2097 for node_id in all_node_ids {
2098 let left_stage = left.stages.iter().find(|stage| stage.node_id == node_id);
2099 let right_stage = right.stages.iter().find(|stage| stage.node_id == node_id);
2100 match (left_stage, right_stage) {
2101 (Some(_), None) => stage_diffs.push(RunStageDiffRecord {
2102 node_id,
2103 change: "removed".to_string(),
2104 details: vec!["stage missing from right run".to_string()],
2105 }),
2106 (None, Some(_)) => stage_diffs.push(RunStageDiffRecord {
2107 node_id,
2108 change: "added".to_string(),
2109 details: vec!["stage missing from left run".to_string()],
2110 }),
2111 (Some(left_stage), Some(right_stage)) => {
2112 let mut details = Vec::new();
2113 if left_stage.status != right_stage.status {
2114 details.push(format!(
2115 "status: {} -> {}",
2116 left_stage.status, right_stage.status
2117 ));
2118 }
2119 if left_stage.outcome != right_stage.outcome {
2120 details.push(format!(
2121 "outcome: {} -> {}",
2122 left_stage.outcome, right_stage.outcome
2123 ));
2124 }
2125 if left_stage.branch != right_stage.branch {
2126 details.push(format!(
2127 "branch: {:?} -> {:?}",
2128 left_stage.branch, right_stage.branch
2129 ));
2130 }
2131 if left_stage.produced_artifact_ids.len() != right_stage.produced_artifact_ids.len()
2132 {
2133 details.push(format!(
2134 "produced_artifacts: {} -> {}",
2135 left_stage.produced_artifact_ids.len(),
2136 right_stage.produced_artifact_ids.len()
2137 ));
2138 }
2139 if left_stage.artifacts.len() != right_stage.artifacts.len() {
2140 details.push(format!(
2141 "artifact_records: {} -> {}",
2142 left_stage.artifacts.len(),
2143 right_stage.artifacts.len()
2144 ));
2145 }
2146 if !details.is_empty() {
2147 stage_diffs.push(RunStageDiffRecord {
2148 node_id,
2149 change: "changed".to_string(),
2150 details,
2151 });
2152 }
2153 }
2154 (None, None) => {}
2155 }
2156 }
2157
2158 let status_changed = left.status != right.status;
2159 let identical = !status_changed
2160 && stage_diffs.is_empty()
2161 && left.transitions.len() == right.transitions.len()
2162 && left.artifacts.len() == right.artifacts.len()
2163 && left.checkpoints.len() == right.checkpoints.len();
2164
2165 RunDiffReport {
2166 left_run_id: left.id.clone(),
2167 right_run_id: right.id.clone(),
2168 identical,
2169 status_changed,
2170 left_status: left.status.clone(),
2171 right_status: right.status.clone(),
2172 stage_diffs,
2173 transition_count_delta: right.transitions.len() as isize - left.transitions.len() as isize,
2174 artifact_count_delta: right.artifacts.len() as isize - left.artifacts.len() as isize,
2175 checkpoint_count_delta: right.checkpoints.len() as isize - left.checkpoints.len() as isize,
2176 }
2177}
2178
2179pub fn push_execution_policy(policy: CapabilityPolicy) {
2180 EXECUTION_POLICY_STACK.with(|stack| stack.borrow_mut().push(policy));
2181}
2182
2183pub fn pop_execution_policy() {
2184 EXECUTION_POLICY_STACK.with(|stack| {
2185 stack.borrow_mut().pop();
2186 });
2187}
2188
2189pub fn current_execution_policy() -> Option<CapabilityPolicy> {
2190 EXECUTION_POLICY_STACK.with(|stack| stack.borrow().last().cloned())
2191}
2192
2193fn policy_allows_tool(policy: &CapabilityPolicy, tool: &str) -> bool {
2194 policy.tools.is_empty() || policy.tools.iter().any(|allowed| allowed == tool)
2195}
2196
2197fn policy_allows_capability(policy: &CapabilityPolicy, capability: &str, op: &str) -> bool {
2198 policy.capabilities.is_empty()
2199 || policy
2200 .capabilities
2201 .get(capability)
2202 .is_some_and(|ops| ops.is_empty() || ops.iter().any(|allowed| allowed == op))
2203}
2204
2205fn policy_allows_side_effect(policy: &CapabilityPolicy, requested: &str) -> bool {
2206 fn rank(v: &str) -> usize {
2207 match v {
2208 "none" => 0,
2209 "read_only" => 1,
2210 "workspace_write" => 2,
2211 "process_exec" => 3,
2212 "network" => 4,
2213 _ => 5,
2214 }
2215 }
2216 policy
2217 .side_effect_level
2218 .as_ref()
2219 .map(|allowed| rank(allowed) >= rank(requested))
2220 .unwrap_or(true)
2221}
2222
2223fn reject_policy(reason: String) -> Result<(), VmError> {
2224 Err(VmError::CategorizedError {
2225 message: reason,
2226 category: crate::value::ErrorCategory::ToolRejected,
2227 })
2228}
2229
2230pub fn enforce_current_policy_for_builtin(name: &str, args: &[VmValue]) -> Result<(), VmError> {
2231 let Some(policy) = current_execution_policy() else {
2232 return Ok(());
2233 };
2234 match name {
2235 "read" | "read_file" => {
2236 if !policy_allows_tool(&policy, name)
2237 || !policy_allows_capability(&policy, "workspace", "read_text")
2238 {
2239 return reject_policy(format!(
2240 "builtin '{name}' exceeds workspace.read_text ceiling"
2241 ));
2242 }
2243 }
2244 "search" | "list_dir" => {
2245 if !policy_allows_tool(&policy, name)
2246 || !policy_allows_capability(&policy, "workspace", "list")
2247 {
2248 return reject_policy(format!("builtin '{name}' exceeds workspace.list ceiling"));
2249 }
2250 }
2251 "file_exists" | "stat" => {
2252 if !policy_allows_capability(&policy, "workspace", "exists") {
2253 return reject_policy(format!("builtin '{name}' exceeds workspace.exists ceiling"));
2254 }
2255 }
2256 "edit" | "write_file" | "append_file" | "mkdir" | "copy_file" => {
2257 if !policy_allows_tool(&policy, "edit")
2258 || !policy_allows_capability(&policy, "workspace", "write_text")
2259 || !policy_allows_side_effect(&policy, "workspace_write")
2260 {
2261 return reject_policy(format!("builtin '{name}' exceeds workspace write ceiling"));
2262 }
2263 }
2264 "delete_file" => {
2265 if !policy_allows_capability(&policy, "workspace", "delete")
2266 || !policy_allows_side_effect(&policy, "workspace_write")
2267 {
2268 return reject_policy(
2269 "builtin 'delete_file' exceeds workspace.delete ceiling".to_string(),
2270 );
2271 }
2272 }
2273 "apply_edit" => {
2274 if !policy_allows_capability(&policy, "workspace", "apply_edit")
2275 || !policy_allows_side_effect(&policy, "workspace_write")
2276 {
2277 return reject_policy(
2278 "builtin 'apply_edit' exceeds workspace.apply_edit ceiling".to_string(),
2279 );
2280 }
2281 }
2282 "exec" | "exec_at" | "shell" | "shell_at" | "run_command" => {
2283 if !policy_allows_tool(&policy, "run")
2284 || !policy_allows_capability(&policy, "process", "exec")
2285 || !policy_allows_side_effect(&policy, "process_exec")
2286 {
2287 return reject_policy(format!("builtin '{name}' exceeds process.exec ceiling"));
2288 }
2289 }
2290 "http_get" | "http_post" | "http_put" | "http_patch" | "http_delete" | "http_request" => {
2291 if !policy_allows_side_effect(&policy, "network") {
2292 return reject_policy(format!("builtin '{name}' exceeds network ceiling"));
2293 }
2294 }
2295 "mcp_connect"
2296 | "mcp_call"
2297 | "mcp_list_tools"
2298 | "mcp_list_resources"
2299 | "mcp_list_resource_templates"
2300 | "mcp_read_resource"
2301 | "mcp_list_prompts"
2302 | "mcp_get_prompt"
2303 | "mcp_server_info"
2304 | "mcp_disconnect" => {
2305 if !policy_allows_tool(&policy, "run")
2306 || !policy_allows_capability(&policy, "process", "exec")
2307 || !policy_allows_side_effect(&policy, "process_exec")
2308 {
2309 return reject_policy(format!("builtin '{name}' exceeds process.exec ceiling"));
2310 }
2311 }
2312 "host_invoke" => {
2313 let capability = args.first().map(|v| v.display()).unwrap_or_default();
2314 let op = args.get(1).map(|v| v.display()).unwrap_or_default();
2315 if !policy_allows_capability(&policy, &capability, &op) {
2316 return reject_policy(format!(
2317 "host_invoke {capability}.{op} exceeds capability ceiling"
2318 ));
2319 }
2320 let requested_side_effect = match (capability.as_str(), op.as_str()) {
2321 ("workspace", "write_text" | "apply_edit" | "delete") => "workspace_write",
2322 ("process", "exec") => "process_exec",
2323 _ => "read_only",
2324 };
2325 if !policy_allows_side_effect(&policy, requested_side_effect) {
2326 return reject_policy(format!(
2327 "host_invoke {capability}.{op} exceeds side-effect ceiling"
2328 ));
2329 }
2330 }
2331 _ => {}
2332 }
2333 Ok(())
2334}
2335
2336pub fn enforce_current_policy_for_bridge_builtin(name: &str) -> Result<(), VmError> {
2337 if current_execution_policy().is_some() {
2338 return reject_policy(format!(
2339 "bridged builtin '{name}' exceeds execution policy; declare an explicit capability/tool surface instead"
2340 ));
2341 }
2342 Ok(())
2343}
2344
2345pub fn enforce_current_policy_for_tool(tool_name: &str) -> Result<(), VmError> {
2346 let Some(policy) = current_execution_policy() else {
2347 return Ok(());
2348 };
2349 if !policy_allows_tool(&policy, tool_name) {
2350 return reject_policy(format!("tool '{tool_name}' exceeds tool ceiling"));
2351 }
2352 Ok(())
2353}
2354
2355fn compact_transcript(transcript: &VmValue, keep_last: usize) -> Option<VmValue> {
2356 let dict = transcript.as_dict()?;
2357 let messages = match dict.get("messages") {
2358 Some(VmValue::List(list)) => list.iter().cloned().collect::<Vec<_>>(),
2359 _ => Vec::new(),
2360 };
2361 let retained = messages
2362 .into_iter()
2363 .rev()
2364 .take(keep_last)
2365 .collect::<Vec<_>>()
2366 .into_iter()
2367 .rev()
2368 .collect::<Vec<_>>();
2369 let mut compacted = dict.clone();
2370 compacted.insert(
2371 "messages".to_string(),
2372 VmValue::List(Rc::new(retained.clone())),
2373 );
2374 compacted.insert(
2375 "events".to_string(),
2376 VmValue::List(Rc::new(
2377 crate::llm::helpers::transcript_events_from_messages(&retained),
2378 )),
2379 );
2380 Some(VmValue::Dict(Rc::new(compacted)))
2381}
2382
2383fn redact_transcript_visibility(transcript: &VmValue, visibility: Option<&str>) -> Option<VmValue> {
2384 let Some(visibility) = visibility else {
2385 return Some(transcript.clone());
2386 };
2387 if visibility != "public" && visibility != "public_only" {
2388 return Some(transcript.clone());
2389 }
2390 let dict = transcript.as_dict()?;
2391 let public_messages = match dict.get("messages") {
2392 Some(VmValue::List(list)) => list
2393 .iter()
2394 .filter(|message| {
2395 message
2396 .as_dict()
2397 .and_then(|d| d.get("role"))
2398 .map(|v| v.display())
2399 .map(|role| role != "tool_result")
2400 .unwrap_or(true)
2401 })
2402 .cloned()
2403 .collect::<Vec<_>>(),
2404 _ => Vec::new(),
2405 };
2406 let public_events = match dict.get("events") {
2407 Some(VmValue::List(list)) => list
2408 .iter()
2409 .filter(|event| {
2410 event
2411 .as_dict()
2412 .and_then(|d| d.get("visibility"))
2413 .map(|v| v.display())
2414 .map(|value| value == "public")
2415 .unwrap_or(true)
2416 })
2417 .cloned()
2418 .collect::<Vec<_>>(),
2419 _ => Vec::new(),
2420 };
2421 let mut redacted = dict.clone();
2422 redacted.insert(
2423 "messages".to_string(),
2424 VmValue::List(Rc::new(public_messages)),
2425 );
2426 redacted.insert("events".to_string(), VmValue::List(Rc::new(public_events)));
2427 Some(VmValue::Dict(Rc::new(redacted)))
2428}
2429
2430pub(crate) fn apply_input_transcript_policy(
2431 transcript: Option<VmValue>,
2432 policy: &TranscriptPolicy,
2433) -> Option<VmValue> {
2434 let mut transcript = transcript;
2435 match policy.mode.as_deref() {
2436 Some("reset") => return None,
2437 Some("fork") => {
2438 if let Some(VmValue::Dict(dict)) = transcript.as_ref() {
2439 let mut forked = dict.as_ref().clone();
2440 forked.insert(
2441 "id".to_string(),
2442 VmValue::String(Rc::from(new_id("transcript"))),
2443 );
2444 transcript = Some(VmValue::Dict(Rc::new(forked)));
2445 }
2446 }
2447 _ => {}
2448 }
2449 if policy.compact {
2450 let keep_last = policy.keep_last.unwrap_or(6);
2451 transcript = transcript.and_then(|value| compact_transcript(&value, keep_last));
2452 }
2453 transcript
2454}
2455
2456fn apply_output_transcript_policy(
2457 transcript: Option<VmValue>,
2458 policy: &TranscriptPolicy,
2459) -> Option<VmValue> {
2460 let mut transcript = transcript;
2461 if policy.compact {
2462 let keep_last = policy.keep_last.unwrap_or(6);
2463 transcript = transcript.and_then(|value| compact_transcript(&value, keep_last));
2464 }
2465 transcript.and_then(|value| redact_transcript_visibility(&value, policy.visibility.as_deref()))
2466}
2467
2468pub async fn execute_stage_node(
2469 node_id: &str,
2470 node: &WorkflowNode,
2471 task: &str,
2472 artifacts: &[ArtifactRecord],
2473 transcript: Option<VmValue>,
2474) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
2475 let mut selection_policy = node.context_policy.clone();
2476 if selection_policy.include_kinds.is_empty() && !node.input_contract.input_kinds.is_empty() {
2477 selection_policy.include_kinds = node.input_contract.input_kinds.clone();
2478 }
2479 let selected = select_artifacts_adaptive(artifacts.to_vec(), &selection_policy);
2480 let rendered_context = render_artifacts_context(&selected, &node.context_policy);
2481 let transcript = apply_input_transcript_policy(transcript, &node.transcript_policy);
2482 if node.input_contract.require_transcript && transcript.is_none() {
2483 return Err(VmError::Runtime(format!(
2484 "workflow stage {node_id} requires transcript input"
2485 )));
2486 }
2487 if let Some(min_inputs) = node.input_contract.min_inputs {
2488 if selected.len() < min_inputs {
2489 return Err(VmError::Runtime(format!(
2490 "workflow stage {node_id} requires at least {min_inputs} input artifacts"
2491 )));
2492 }
2493 }
2494 if let Some(max_inputs) = node.input_contract.max_inputs {
2495 if selected.len() > max_inputs {
2496 return Err(VmError::Runtime(format!(
2497 "workflow stage {node_id} accepts at most {max_inputs} input artifacts"
2498 )));
2499 }
2500 }
2501 let prompt = if rendered_context.is_empty() {
2502 task.to_string()
2503 } else {
2504 format!(
2505 "{rendered_context}\n\n{}:\n{task}",
2506 node.task_label
2507 .clone()
2508 .unwrap_or_else(|| "Task".to_string())
2509 )
2510 };
2511
2512 let mut options = BTreeMap::new();
2513 if let Some(provider) = &node.model_policy.provider {
2514 options.insert(
2515 "provider".to_string(),
2516 VmValue::String(Rc::from(provider.clone())),
2517 );
2518 }
2519 if let Some(model) = &node.model_policy.model {
2520 options.insert(
2521 "model".to_string(),
2522 VmValue::String(Rc::from(model.clone())),
2523 );
2524 }
2525 if let Some(model_tier) = &node.model_policy.model_tier {
2526 options.insert(
2527 "model_tier".to_string(),
2528 VmValue::String(Rc::from(model_tier.clone())),
2529 );
2530 }
2531 if let Some(temperature) = node.model_policy.temperature {
2532 options.insert("temperature".to_string(), VmValue::Float(temperature));
2533 }
2534 if let Some(max_tokens) = node.model_policy.max_tokens {
2535 options.insert("max_tokens".to_string(), VmValue::Int(max_tokens));
2536 }
2537 let tool_names = workflow_tool_names(&node.tools);
2538 if !matches!(node.tools, serde_json::Value::Null) && !tool_names.is_empty() {
2539 options.insert(
2540 "tools".to_string(),
2541 crate::stdlib::json_to_vm_value(&node.tools),
2542 );
2543 }
2544 if let Some(transcript) = transcript.clone() {
2545 options.insert("transcript".to_string(), transcript);
2546 }
2547
2548 let args = vec![
2549 VmValue::String(Rc::from(prompt.clone())),
2550 node.system
2551 .clone()
2552 .map(|s| VmValue::String(Rc::from(s)))
2553 .unwrap_or(VmValue::Nil),
2554 VmValue::Dict(Rc::new(options)),
2555 ];
2556 let mut opts = extract_llm_options(&args)?;
2557
2558 let tool_format = std::env::var("HARN_AGENT_TOOL_FORMAT")
2559 .ok()
2560 .filter(|value| !value.trim().is_empty())
2561 .unwrap_or_else(|| "text".to_string());
2562 let mut llm_result = if node.mode.as_deref() == Some("agent") || !tool_names.is_empty() {
2563 crate::llm::run_agent_loop_internal(
2564 &mut opts,
2565 crate::llm::AgentLoopConfig {
2566 persistent: true,
2567 max_iterations: 12,
2568 max_nudges: 3,
2569 nudge: None,
2570 done_sentinel: node.done_sentinel.clone(),
2571 tool_retries: 0,
2572 tool_backoff_ms: 1000,
2573 tool_format: tool_format.clone(),
2574 auto_compact: None,
2575 policy: None,
2576 daemon: false,
2577 llm_retries: 2,
2578 llm_backoff_ms: 2000,
2579 },
2580 )
2581 .await?
2582 } else {
2583 let result = vm_call_llm_full(&opts).await?;
2584 crate::llm::agent_loop_result_from_llm(&result, opts)
2585 };
2586 if let Some(payload) = llm_result.as_object_mut() {
2587 payload.insert("prompt".to_string(), serde_json::json!(prompt));
2588 payload.insert(
2589 "system_prompt".to_string(),
2590 serde_json::json!(node.system.clone().unwrap_or_default()),
2591 );
2592 payload.insert(
2593 "rendered_context".to_string(),
2594 serde_json::json!(rendered_context),
2595 );
2596 payload.insert(
2597 "selected_artifact_ids".to_string(),
2598 serde_json::json!(selected
2599 .iter()
2600 .map(|artifact| artifact.id.clone())
2601 .collect::<Vec<_>>()),
2602 );
2603 payload.insert(
2604 "selected_artifact_titles".to_string(),
2605 serde_json::json!(selected
2606 .iter()
2607 .map(|artifact| artifact.title.clone())
2608 .collect::<Vec<_>>()),
2609 );
2610 payload.insert(
2611 "tool_calling_mode".to_string(),
2612 serde_json::json!(tool_format.clone()),
2613 );
2614 }
2615
2616 let visible_text = llm_result["text"].as_str().unwrap_or_default().to_string();
2617 let transcript = llm_result
2618 .get("transcript")
2619 .cloned()
2620 .map(|value| crate::stdlib::json_to_vm_value(&value));
2621 let transcript = apply_output_transcript_policy(transcript, &node.transcript_policy);
2622 let output_kind = node
2623 .output_contract
2624 .output_kinds
2625 .first()
2626 .cloned()
2627 .unwrap_or_else(|| {
2628 if node.kind == "verify" {
2629 "verification_result".to_string()
2630 } else {
2631 "artifact".to_string()
2632 }
2633 });
2634 let mut metadata = BTreeMap::new();
2635 metadata.insert(
2636 "input_artifact_ids".to_string(),
2637 serde_json::json!(selected
2638 .iter()
2639 .map(|artifact| artifact.id.clone())
2640 .collect::<Vec<_>>()),
2641 );
2642 metadata.insert("node_kind".to_string(), serde_json::json!(node.kind));
2643 let artifact = ArtifactRecord {
2644 type_name: "artifact".to_string(),
2645 id: new_id("artifact"),
2646 kind: output_kind,
2647 title: Some(format!("stage {node_id} output")),
2648 text: Some(visible_text),
2649 data: Some(llm_result.clone()),
2650 source: Some(node_id.to_string()),
2651 created_at: now_rfc3339(),
2652 freshness: Some("fresh".to_string()),
2653 priority: None,
2654 lineage: selected
2655 .iter()
2656 .map(|artifact| artifact.id.clone())
2657 .collect(),
2658 relevance: Some(1.0),
2659 estimated_tokens: None,
2660 stage: Some(node_id.to_string()),
2661 metadata,
2662 }
2663 .normalize();
2664
2665 Ok((llm_result, vec![artifact], transcript))
2666}
2667
2668pub fn next_nodes_for(
2669 graph: &WorkflowGraph,
2670 current: &str,
2671 branch: Option<&str>,
2672) -> Vec<WorkflowEdge> {
2673 let mut matching: Vec<WorkflowEdge> = graph
2674 .edges
2675 .iter()
2676 .filter(|edge| edge.from == current && edge.branch.as_deref() == branch)
2677 .cloned()
2678 .collect();
2679 if matching.is_empty() {
2680 matching = graph
2681 .edges
2682 .iter()
2683 .filter(|edge| edge.from == current && edge.branch.is_none())
2684 .cloned()
2685 .collect();
2686 }
2687 matching
2688}
2689
2690pub fn next_node_for(graph: &WorkflowGraph, current: &str, branch: &str) -> Option<String> {
2691 next_nodes_for(graph, current, Some(branch))
2692 .into_iter()
2693 .next()
2694 .map(|edge| edge.to)
2695}
2696
2697pub fn append_audit_entry(
2698 graph: &mut WorkflowGraph,
2699 op: &str,
2700 node_id: Option<String>,
2701 reason: Option<String>,
2702 metadata: BTreeMap<String, serde_json::Value>,
2703) {
2704 graph.audit_log.push(WorkflowAuditEntry {
2705 id: new_id("audit"),
2706 op: op.to_string(),
2707 node_id,
2708 timestamp: now_rfc3339(),
2709 reason,
2710 metadata,
2711 });
2712}
2713
2714pub fn builtin_ceiling() -> CapabilityPolicy {
2715 CapabilityPolicy {
2716 tools: vec![
2717 "read".to_string(),
2718 "read_file".to_string(),
2719 "search".to_string(),
2720 "edit".to_string(),
2721 "run".to_string(),
2722 "exec".to_string(),
2723 "outline".to_string(),
2724 "list_directory".to_string(),
2725 "lsp_hover".to_string(),
2726 "lsp_definition".to_string(),
2727 "lsp_references".to_string(),
2728 "web_search".to_string(),
2729 "web_fetch".to_string(),
2730 ],
2731 capabilities: BTreeMap::from([
2732 (
2733 "workspace".to_string(),
2734 vec![
2735 "read_text".to_string(),
2736 "write_text".to_string(),
2737 "apply_edit".to_string(),
2738 "delete".to_string(),
2739 "exists".to_string(),
2740 "list".to_string(),
2741 ],
2742 ),
2743 ("process".to_string(), vec!["exec".to_string()]),
2744 ]),
2745 workspace_roots: Vec::new(),
2746 side_effect_level: Some("network".to_string()),
2747 recursion_limit: Some(8),
2748 tool_arg_constraints: Vec::new(),
2749 }
2750}
2751
2752#[cfg(test)]
2753mod tests {
2754 use super::*;
2755
2756 #[test]
2757 fn capability_intersection_rejects_privilege_expansion() {
2758 let ceiling = CapabilityPolicy {
2759 tools: vec!["read".to_string()],
2760 side_effect_level: Some("read_only".to_string()),
2761 recursion_limit: Some(2),
2762 ..Default::default()
2763 };
2764 let requested = CapabilityPolicy {
2765 tools: vec!["read".to_string(), "edit".to_string()],
2766 ..Default::default()
2767 };
2768 let error = ceiling.intersect(&requested).unwrap_err();
2769 assert!(error.contains("host ceiling"));
2770 }
2771
2772 #[test]
2773 fn mutation_session_normalize_fills_defaults() {
2774 let normalized = MutationSessionRecord::default().normalize();
2775 assert!(normalized.session_id.starts_with("session_"));
2776 assert_eq!(normalized.mutation_scope, "read_only");
2777 assert_eq!(normalized.approval_mode, "host_enforced");
2778 }
2779
2780 #[test]
2781 fn install_current_mutation_session_round_trips() {
2782 install_current_mutation_session(Some(MutationSessionRecord {
2783 session_id: "session_test".to_string(),
2784 mutation_scope: "apply_workspace".to_string(),
2785 approval_mode: "explicit".to_string(),
2786 ..Default::default()
2787 }));
2788 let current = current_mutation_session().expect("session installed");
2789 assert_eq!(current.session_id, "session_test");
2790 assert_eq!(current.mutation_scope, "apply_workspace");
2791 assert_eq!(current.approval_mode, "explicit");
2792
2793 install_current_mutation_session(None);
2794 assert!(current_mutation_session().is_none());
2795 }
2796
2797 #[test]
2798 fn active_execution_policy_rejects_unknown_bridge_builtin() {
2799 push_execution_policy(CapabilityPolicy {
2800 tools: vec!["read".to_string()],
2801 capabilities: BTreeMap::from([(
2802 "workspace".to_string(),
2803 vec!["read_text".to_string()],
2804 )]),
2805 side_effect_level: Some("read_only".to_string()),
2806 recursion_limit: Some(1),
2807 ..Default::default()
2808 });
2809 let error = enforce_current_policy_for_bridge_builtin("custom_host_builtin").unwrap_err();
2810 pop_execution_policy();
2811 assert!(matches!(
2812 error,
2813 VmError::CategorizedError {
2814 category: crate::value::ErrorCategory::ToolRejected,
2815 ..
2816 }
2817 ));
2818 }
2819
2820 #[test]
2821 fn active_execution_policy_rejects_mcp_escape_hatch() {
2822 push_execution_policy(CapabilityPolicy {
2823 tools: vec!["read".to_string()],
2824 capabilities: BTreeMap::from([(
2825 "workspace".to_string(),
2826 vec!["read_text".to_string()],
2827 )]),
2828 side_effect_level: Some("read_only".to_string()),
2829 recursion_limit: Some(1),
2830 ..Default::default()
2831 });
2832 let error = enforce_current_policy_for_builtin("mcp_connect", &[]).unwrap_err();
2833 pop_execution_policy();
2834 assert!(matches!(
2835 error,
2836 VmError::CategorizedError {
2837 category: crate::value::ErrorCategory::ToolRejected,
2838 ..
2839 }
2840 ));
2841 }
2842
2843 #[test]
2844 fn workflow_normalization_upgrades_legacy_act_verify_repair_shape() {
2845 let value = crate::stdlib::json_to_vm_value(&serde_json::json!({
2846 "name": "legacy",
2847 "act": {"mode": "llm"},
2848 "verify": {"kind": "verify"},
2849 "repair": {"mode": "agent"},
2850 }));
2851 let graph = normalize_workflow_value(&value).unwrap();
2852 assert_eq!(graph.type_name, "workflow_graph");
2853 assert!(graph.nodes.contains_key("act"));
2854 assert!(graph.nodes.contains_key("verify"));
2855 assert!(graph.nodes.contains_key("repair"));
2856 assert_eq!(graph.entry, "act");
2857 }
2858
2859 #[test]
2860 fn workflow_normalization_accepts_tool_registry_nodes() {
2861 let value = crate::stdlib::json_to_vm_value(&serde_json::json!({
2862 "name": "registry_tools",
2863 "entry": "implement",
2864 "nodes": {
2865 "implement": {
2866 "kind": "stage",
2867 "mode": "agent",
2868 "tools": {
2869 "_type": "tool_registry",
2870 "tools": [
2871 {"name": "read", "description": "Read files"},
2872 {"name": "run", "description": "Run commands"}
2873 ]
2874 }
2875 }
2876 },
2877 "edges": []
2878 }));
2879 let graph = normalize_workflow_value(&value).unwrap();
2880 let node = graph.nodes.get("implement").unwrap();
2881 assert_eq!(workflow_tool_names(&node.tools), vec!["read", "run"]);
2882 }
2883
2884 #[test]
2885 fn artifact_selection_honors_budget_and_priority() {
2886 let policy = ContextPolicy {
2887 max_artifacts: Some(2),
2888 max_tokens: Some(30),
2889 prefer_recent: true,
2890 prefer_fresh: true,
2891 prioritize_kinds: vec!["verification_result".to_string()],
2892 ..Default::default()
2893 };
2894 let artifacts = vec![
2895 ArtifactRecord {
2896 type_name: "artifact".to_string(),
2897 id: "a".to_string(),
2898 kind: "summary".to_string(),
2899 text: Some("short".to_string()),
2900 relevance: Some(0.9),
2901 created_at: now_rfc3339(),
2902 ..Default::default()
2903 }
2904 .normalize(),
2905 ArtifactRecord {
2906 type_name: "artifact".to_string(),
2907 id: "b".to_string(),
2908 kind: "summary".to_string(),
2909 text: Some("this is a much larger artifact body".to_string()),
2910 relevance: Some(1.0),
2911 created_at: now_rfc3339(),
2912 ..Default::default()
2913 }
2914 .normalize(),
2915 ArtifactRecord {
2916 type_name: "artifact".to_string(),
2917 id: "c".to_string(),
2918 kind: "summary".to_string(),
2919 text: Some("tiny".to_string()),
2920 relevance: Some(0.5),
2921 created_at: now_rfc3339(),
2922 ..Default::default()
2923 }
2924 .normalize(),
2925 ];
2926 let selected = select_artifacts(artifacts, &policy);
2927 assert_eq!(selected.len(), 2);
2928 assert!(selected.iter().all(|artifact| artifact.kind == "summary"));
2929 }
2930
2931 #[test]
2932 fn workflow_validation_rejects_condition_without_true_false_edges() {
2933 let graph = WorkflowGraph {
2934 entry: "gate".to_string(),
2935 nodes: BTreeMap::from([(
2936 "gate".to_string(),
2937 WorkflowNode {
2938 id: Some("gate".to_string()),
2939 kind: "condition".to_string(),
2940 ..Default::default()
2941 },
2942 )]),
2943 edges: vec![WorkflowEdge {
2944 from: "gate".to_string(),
2945 to: "next".to_string(),
2946 branch: Some("true".to_string()),
2947 label: None,
2948 }],
2949 ..Default::default()
2950 };
2951 let report = validate_workflow(&graph, None);
2952 assert!(!report.valid);
2953 assert!(report
2954 .errors
2955 .iter()
2956 .any(|error| error.contains("true") && error.contains("false")));
2957 }
2958
2959 #[test]
2960 fn replay_fixture_round_trip_passes() {
2961 let run = RunRecord {
2962 type_name: "run_record".to_string(),
2963 id: "run_1".to_string(),
2964 workflow_id: "wf".to_string(),
2965 workflow_name: Some("demo".to_string()),
2966 task: "demo".to_string(),
2967 status: "completed".to_string(),
2968 started_at: "1".to_string(),
2969 finished_at: Some("2".to_string()),
2970 parent_run_id: None,
2971 root_run_id: Some("run_1".to_string()),
2972 stages: vec![RunStageRecord {
2973 id: "stage_1".to_string(),
2974 node_id: "act".to_string(),
2975 kind: "stage".to_string(),
2976 status: "completed".to_string(),
2977 outcome: "success".to_string(),
2978 branch: Some("success".to_string()),
2979 started_at: "1".to_string(),
2980 finished_at: Some("2".to_string()),
2981 visible_text: Some("done".to_string()),
2982 private_reasoning: None,
2983 transcript: None,
2984 verification: None,
2985 usage: None,
2986 artifacts: vec![ArtifactRecord {
2987 type_name: "artifact".to_string(),
2988 id: "a1".to_string(),
2989 kind: "summary".to_string(),
2990 text: Some("done".to_string()),
2991 created_at: "1".to_string(),
2992 ..Default::default()
2993 }
2994 .normalize()],
2995 consumed_artifact_ids: vec![],
2996 produced_artifact_ids: vec!["a1".to_string()],
2997 attempts: vec![],
2998 metadata: BTreeMap::new(),
2999 }],
3000 transitions: vec![],
3001 checkpoints: vec![],
3002 pending_nodes: vec![],
3003 completed_nodes: vec!["act".to_string()],
3004 child_runs: vec![],
3005 artifacts: vec![],
3006 policy: CapabilityPolicy::default(),
3007 execution: None,
3008 transcript: None,
3009 usage: None,
3010 replay_fixture: None,
3011 metadata: BTreeMap::new(),
3012 persisted_path: None,
3013 };
3014 let fixture = replay_fixture_from_run(&run);
3015 let report = evaluate_run_against_fixture(&run, &fixture);
3016 assert!(report.pass);
3017 assert!(report.failures.is_empty());
3018 }
3019
3020 #[test]
3021 fn replay_eval_suite_reports_failed_case() {
3022 let good = RunRecord {
3023 id: "run_good".to_string(),
3024 workflow_id: "wf".to_string(),
3025 status: "completed".to_string(),
3026 stages: vec![RunStageRecord {
3027 node_id: "act".to_string(),
3028 status: "completed".to_string(),
3029 outcome: "success".to_string(),
3030 ..Default::default()
3031 }],
3032 ..Default::default()
3033 };
3034 let bad = RunRecord {
3035 id: "run_bad".to_string(),
3036 workflow_id: "wf".to_string(),
3037 status: "failed".to_string(),
3038 stages: vec![RunStageRecord {
3039 node_id: "act".to_string(),
3040 status: "failed".to_string(),
3041 outcome: "error".to_string(),
3042 ..Default::default()
3043 }],
3044 ..Default::default()
3045 };
3046 let suite = evaluate_run_suite(vec![
3047 (
3048 good.clone(),
3049 replay_fixture_from_run(&good),
3050 Some("good.json".to_string()),
3051 ),
3052 (
3053 bad.clone(),
3054 replay_fixture_from_run(&good),
3055 Some("bad.json".to_string()),
3056 ),
3057 ]);
3058 assert!(!suite.pass);
3059 assert_eq!(suite.total, 2);
3060 assert_eq!(suite.failed, 1);
3061 assert!(suite.cases.iter().any(|case| !case.pass));
3062 }
3063
3064 #[test]
3065 fn run_diff_reports_changed_stage() {
3066 let left = RunRecord {
3067 id: "left".to_string(),
3068 workflow_id: "wf".to_string(),
3069 status: "completed".to_string(),
3070 stages: vec![RunStageRecord {
3071 node_id: "act".to_string(),
3072 status: "completed".to_string(),
3073 outcome: "success".to_string(),
3074 ..Default::default()
3075 }],
3076 ..Default::default()
3077 };
3078 let right = RunRecord {
3079 id: "right".to_string(),
3080 workflow_id: "wf".to_string(),
3081 status: "failed".to_string(),
3082 stages: vec![RunStageRecord {
3083 node_id: "act".to_string(),
3084 status: "failed".to_string(),
3085 outcome: "error".to_string(),
3086 ..Default::default()
3087 }],
3088 ..Default::default()
3089 };
3090 let diff = diff_run_records(&left, &right);
3091 assert!(diff.status_changed);
3092 assert!(!diff.identical);
3093 assert_eq!(diff.stage_diffs.len(), 1);
3094 }
3095
3096 #[test]
3097 fn eval_suite_manifest_can_fail_on_baseline_diff() {
3098 let temp_dir =
3099 std::env::temp_dir().join(format!("harn-eval-suite-{}", uuid::Uuid::now_v7()));
3100 std::fs::create_dir_all(&temp_dir).unwrap();
3101 let baseline_path = temp_dir.join("baseline.json");
3102 let candidate_path = temp_dir.join("candidate.json");
3103
3104 let baseline = RunRecord {
3105 id: "baseline".to_string(),
3106 workflow_id: "wf".to_string(),
3107 status: "completed".to_string(),
3108 stages: vec![RunStageRecord {
3109 node_id: "act".to_string(),
3110 status: "completed".to_string(),
3111 outcome: "success".to_string(),
3112 ..Default::default()
3113 }],
3114 ..Default::default()
3115 };
3116 let candidate = RunRecord {
3117 id: "candidate".to_string(),
3118 workflow_id: "wf".to_string(),
3119 status: "failed".to_string(),
3120 stages: vec![RunStageRecord {
3121 node_id: "act".to_string(),
3122 status: "failed".to_string(),
3123 outcome: "error".to_string(),
3124 ..Default::default()
3125 }],
3126 ..Default::default()
3127 };
3128
3129 save_run_record(&baseline, Some(baseline_path.to_str().unwrap())).unwrap();
3130 save_run_record(&candidate, Some(candidate_path.to_str().unwrap())).unwrap();
3131
3132 let manifest = EvalSuiteManifest {
3133 base_dir: Some(temp_dir.display().to_string()),
3134 cases: vec![EvalSuiteCase {
3135 label: Some("candidate".to_string()),
3136 run_path: "candidate.json".to_string(),
3137 fixture_path: None,
3138 compare_to: Some("baseline.json".to_string()),
3139 }],
3140 ..Default::default()
3141 };
3142 let suite = evaluate_run_suite_manifest(&manifest).unwrap();
3143 assert!(!suite.pass);
3144 assert_eq!(suite.failed, 1);
3145 assert!(suite.cases[0].comparison.is_some());
3146 assert!(suite.cases[0]
3147 .failures
3148 .iter()
3149 .any(|failure| failure.contains("baseline")));
3150 }
3151
3152 #[test]
3153 fn render_unified_diff_marks_removed_and_added_lines() {
3154 let diff = render_unified_diff(Some("src/main.rs"), "old\nsame", "new\nsame");
3155 assert!(diff.contains("--- a/src/main.rs"));
3156 assert!(diff.contains("+++ b/src/main.rs"));
3157 assert!(diff.contains("-old"));
3158 assert!(diff.contains("+new"));
3159 assert!(diff.contains(" same"));
3160 }
3161
3162 #[test]
3163 fn execution_policy_rejects_process_exec_when_read_only() {
3164 push_execution_policy(CapabilityPolicy {
3165 side_effect_level: Some("read_only".to_string()),
3166 capabilities: BTreeMap::from([("process".to_string(), vec!["exec".to_string()])]),
3167 ..Default::default()
3168 });
3169 let result = enforce_current_policy_for_builtin("exec", &[]);
3170 pop_execution_policy();
3171 assert!(result.is_err());
3172 }
3173
3174 #[test]
3175 fn execution_policy_rejects_unlisted_tool() {
3176 push_execution_policy(CapabilityPolicy {
3177 tools: vec!["read".to_string()],
3178 ..Default::default()
3179 });
3180 let result = enforce_current_policy_for_tool("edit");
3181 pop_execution_policy();
3182 assert!(result.is_err());
3183 }
3184
3185 #[test]
3188 fn pre_tool_hook_deny_blocks_execution() {
3189 clear_tool_hooks();
3190 register_tool_hook(ToolHook {
3191 pattern: "dangerous_*".to_string(),
3192 pre: Some(Rc::new(|_name, _args| {
3193 PreToolAction::Deny("blocked by policy".to_string())
3194 })),
3195 post: None,
3196 });
3197 let result = run_pre_tool_hooks("dangerous_delete", &serde_json::json!({}));
3198 clear_tool_hooks();
3199 assert!(matches!(result, PreToolAction::Deny(_)));
3200 }
3201
3202 #[test]
3203 fn pre_tool_hook_allow_passes_through() {
3204 clear_tool_hooks();
3205 register_tool_hook(ToolHook {
3206 pattern: "safe_*".to_string(),
3207 pre: Some(Rc::new(|_name, _args| PreToolAction::Allow)),
3208 post: None,
3209 });
3210 let result = run_pre_tool_hooks("safe_read", &serde_json::json!({}));
3211 clear_tool_hooks();
3212 assert!(matches!(result, PreToolAction::Allow));
3213 }
3214
3215 #[test]
3216 fn pre_tool_hook_modify_rewrites_args() {
3217 clear_tool_hooks();
3218 register_tool_hook(ToolHook {
3219 pattern: "*".to_string(),
3220 pre: Some(Rc::new(|_name, _args| {
3221 PreToolAction::Modify(serde_json::json!({"path": "/sanitized"}))
3222 })),
3223 post: None,
3224 });
3225 let result = run_pre_tool_hooks("read_file", &serde_json::json!({"path": "/etc/passwd"}));
3226 clear_tool_hooks();
3227 match result {
3228 PreToolAction::Modify(args) => assert_eq!(args["path"], "/sanitized"),
3229 _ => panic!("expected Modify"),
3230 }
3231 }
3232
3233 #[test]
3234 fn post_tool_hook_modifies_result() {
3235 clear_tool_hooks();
3236 register_tool_hook(ToolHook {
3237 pattern: "exec".to_string(),
3238 pre: None,
3239 post: Some(Rc::new(|_name, result| {
3240 if result.contains("SECRET") {
3241 PostToolAction::Modify("[REDACTED]".to_string())
3242 } else {
3243 PostToolAction::Pass
3244 }
3245 })),
3246 });
3247 let result = run_post_tool_hooks("exec", "output with SECRET data");
3248 let clean = run_post_tool_hooks("exec", "clean output");
3249 clear_tool_hooks();
3250 assert_eq!(result, "[REDACTED]");
3251 assert_eq!(clean, "clean output");
3252 }
3253
3254 #[test]
3255 fn unmatched_hook_pattern_does_not_fire() {
3256 clear_tool_hooks();
3257 register_tool_hook(ToolHook {
3258 pattern: "exec".to_string(),
3259 pre: Some(Rc::new(|_name, _args| {
3260 PreToolAction::Deny("should not match".to_string())
3261 })),
3262 post: None,
3263 });
3264 let result = run_pre_tool_hooks("read_file", &serde_json::json!({}));
3265 clear_tool_hooks();
3266 assert!(matches!(result, PreToolAction::Allow));
3267 }
3268
3269 #[test]
3270 fn glob_match_patterns() {
3271 assert!(glob_match("*", "anything"));
3272 assert!(glob_match("exec*", "exec_at"));
3273 assert!(glob_match("*_file", "read_file"));
3274 assert!(!glob_match("exec*", "read_file"));
3275 assert!(glob_match("read_file", "read_file"));
3276 assert!(!glob_match("read_file", "write_file"));
3277 }
3278
3279 #[test]
3282 fn microcompact_snips_large_output() {
3283 let large = "x".repeat(50_000);
3284 let result = microcompact_tool_output(&large, 10_000);
3285 assert!(result.len() < 15_000);
3286 assert!(result.contains("snipped"));
3287 }
3288
3289 #[test]
3290 fn microcompact_preserves_small_output() {
3291 let small = "hello world";
3292 let result = microcompact_tool_output(small, 10_000);
3293 assert_eq!(result, small);
3294 }
3295
3296 #[test]
3297 fn auto_compact_messages_reduces_count() {
3298 let mut messages: Vec<serde_json::Value> = (0..20)
3299 .map(|i| serde_json::json!({"role": "user", "content": format!("message {i}")}))
3300 .collect();
3301 let runtime = tokio::runtime::Builder::new_current_thread()
3302 .enable_all()
3303 .build()
3304 .unwrap();
3305 let compacted = runtime.block_on(auto_compact_messages(
3306 &mut messages,
3307 &AutoCompactConfig {
3308 compact_strategy: CompactStrategy::Truncate,
3309 keep_last: 6,
3310 ..Default::default()
3311 },
3312 None,
3313 ));
3314 assert!(compacted.unwrap());
3315 assert!(messages.len() <= 7); assert!(messages[0]["content"]
3317 .as_str()
3318 .unwrap()
3319 .contains("auto-compacted"));
3320 }
3321
3322 #[test]
3323 fn auto_compact_noop_when_under_threshold() {
3324 let mut messages: Vec<serde_json::Value> = (0..4)
3325 .map(|i| serde_json::json!({"role": "user", "content": format!("msg {i}")}))
3326 .collect();
3327 let runtime = tokio::runtime::Builder::new_current_thread()
3328 .enable_all()
3329 .build()
3330 .unwrap();
3331 let compacted = runtime.block_on(auto_compact_messages(
3332 &mut messages,
3333 &AutoCompactConfig {
3334 compact_strategy: CompactStrategy::Truncate,
3335 keep_last: 6,
3336 ..Default::default()
3337 },
3338 None,
3339 ));
3340 assert!(!compacted.unwrap());
3341 assert_eq!(messages.len(), 4);
3342 }
3343
3344 #[test]
3345 fn estimate_message_tokens_basic() {
3346 let messages = vec![
3347 serde_json::json!({"role": "user", "content": "a".repeat(400)}),
3348 serde_json::json!({"role": "assistant", "content": "b".repeat(400)}),
3349 ];
3350 let tokens = estimate_message_tokens(&messages);
3351 assert_eq!(tokens, 200); }
3353
3354 #[test]
3357 fn dedup_artifacts_removes_duplicates() {
3358 let mut artifacts = vec![
3359 ArtifactRecord {
3360 id: "a1".to_string(),
3361 kind: "test".to_string(),
3362 text: Some("duplicate content".to_string()),
3363 ..Default::default()
3364 },
3365 ArtifactRecord {
3366 id: "a2".to_string(),
3367 kind: "test".to_string(),
3368 text: Some("duplicate content".to_string()),
3369 ..Default::default()
3370 },
3371 ArtifactRecord {
3372 id: "a3".to_string(),
3373 kind: "test".to_string(),
3374 text: Some("unique content".to_string()),
3375 ..Default::default()
3376 },
3377 ];
3378 dedup_artifacts(&mut artifacts);
3379 assert_eq!(artifacts.len(), 2);
3380 }
3381
3382 #[test]
3383 fn microcompact_artifact_snips_oversized() {
3384 let mut artifact = ArtifactRecord {
3385 id: "a1".to_string(),
3386 kind: "test".to_string(),
3387 text: Some("x".repeat(10_000)),
3388 estimated_tokens: Some(2_500),
3389 ..Default::default()
3390 };
3391 microcompact_artifact(&mut artifact, 500);
3392 assert!(artifact.text.as_ref().unwrap().len() < 5_000);
3393 assert_eq!(artifact.estimated_tokens, Some(500));
3394 }
3395
3396 #[test]
3399 fn arg_constraint_allows_matching_pattern() {
3400 let policy = CapabilityPolicy {
3401 tool_arg_constraints: vec![ToolArgConstraint {
3402 tool: "exec".to_string(),
3403 arg_patterns: vec!["cargo *".to_string()],
3404 }],
3405 ..Default::default()
3406 };
3407 let result = enforce_tool_arg_constraints(
3408 &policy,
3409 "exec",
3410 &serde_json::json!({"command": "cargo test"}),
3411 );
3412 assert!(result.is_ok());
3413 }
3414
3415 #[test]
3416 fn arg_constraint_rejects_non_matching_pattern() {
3417 let policy = CapabilityPolicy {
3418 tool_arg_constraints: vec![ToolArgConstraint {
3419 tool: "exec".to_string(),
3420 arg_patterns: vec!["cargo *".to_string()],
3421 }],
3422 ..Default::default()
3423 };
3424 let result = enforce_tool_arg_constraints(
3425 &policy,
3426 "exec",
3427 &serde_json::json!({"command": "rm -rf /"}),
3428 );
3429 assert!(result.is_err());
3430 }
3431
3432 #[test]
3433 fn arg_constraint_ignores_unmatched_tool() {
3434 let policy = CapabilityPolicy {
3435 tool_arg_constraints: vec![ToolArgConstraint {
3436 tool: "exec".to_string(),
3437 arg_patterns: vec!["cargo *".to_string()],
3438 }],
3439 ..Default::default()
3440 };
3441 let result = enforce_tool_arg_constraints(
3442 &policy,
3443 "read_file",
3444 &serde_json::json!({"path": "/etc/passwd"}),
3445 );
3446 assert!(result.is_ok());
3447 }
3448}