1use std::collections::{BTreeMap, BTreeSet};
2use std::path::{Path, PathBuf};
3use std::rc::Rc;
4use std::{cell::RefCell, thread_local};
5
6use serde::{Deserialize, Serialize};
7
8use crate::llm::{extract_llm_options, vm_call_llm_full, vm_value_to_json};
9use crate::value::{VmError, VmValue};
10
11fn now_rfc3339() -> String {
12 use std::time::{SystemTime, UNIX_EPOCH};
13 let ts = SystemTime::now()
14 .duration_since(UNIX_EPOCH)
15 .unwrap_or_default()
16 .as_secs();
17 format!("{ts}")
18}
19
20fn new_id(prefix: &str) -> String {
21 format!("{prefix}_{}", uuid::Uuid::now_v7())
22}
23
24fn default_run_dir() -> PathBuf {
25 std::env::var("HARN_RUN_DIR")
26 .map(PathBuf::from)
27 .unwrap_or_else(|_| PathBuf::from(".harn-runs"))
28}
29
30thread_local! {
31 static EXECUTION_POLICY_STACK: RefCell<Vec<CapabilityPolicy>> = const { RefCell::new(Vec::new()) };
32 static TOOL_HOOKS: RefCell<Vec<ToolHook>> = const { RefCell::new(Vec::new()) };
33 static CURRENT_MUTATION_SESSION: RefCell<Option<MutationSessionRecord>> = const { RefCell::new(None) };
34}
35
36#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
37#[serde(default)]
38pub struct MutationSessionRecord {
39 pub session_id: String,
40 pub parent_session_id: Option<String>,
41 pub run_id: Option<String>,
42 pub worker_id: Option<String>,
43 pub execution_kind: Option<String>,
44 pub mutation_scope: String,
45 pub approval_mode: String,
46}
47
48impl MutationSessionRecord {
49 pub fn normalize(mut self) -> Self {
50 if self.session_id.is_empty() {
51 self.session_id = new_id("session");
52 }
53 if self.mutation_scope.is_empty() {
54 self.mutation_scope = "read_only".to_string();
55 }
56 if self.approval_mode.is_empty() {
57 self.approval_mode = "host_enforced".to_string();
58 }
59 self
60 }
61}
62
63pub fn install_current_mutation_session(session: Option<MutationSessionRecord>) {
64 CURRENT_MUTATION_SESSION.with(|slot| {
65 *slot.borrow_mut() = session.map(MutationSessionRecord::normalize);
66 });
67}
68
69pub fn current_mutation_session() -> Option<MutationSessionRecord> {
70 CURRENT_MUTATION_SESSION.with(|slot| slot.borrow().clone())
71}
72
73#[derive(Clone, Debug)]
77pub enum PreToolAction {
78 Allow,
80 Deny(String),
82 Modify(serde_json::Value),
84}
85
86#[derive(Clone, Debug)]
88pub enum PostToolAction {
89 Pass,
91 Modify(String),
93}
94
95pub type PreToolHookFn = Rc<dyn Fn(&str, &serde_json::Value) -> PreToolAction>;
97pub type PostToolHookFn = Rc<dyn Fn(&str, &str) -> PostToolAction>;
98
99#[derive(Clone)]
101pub struct ToolHook {
102 pub pattern: String,
104 pub pre: Option<PreToolHookFn>,
106 pub post: Option<PostToolHookFn>,
108}
109
110impl std::fmt::Debug for ToolHook {
111 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112 f.debug_struct("ToolHook")
113 .field("pattern", &self.pattern)
114 .field("has_pre", &self.pre.is_some())
115 .field("has_post", &self.post.is_some())
116 .finish()
117 }
118}
119
120fn glob_match(pattern: &str, name: &str) -> bool {
121 if pattern == "*" {
122 return true;
123 }
124 if let Some(prefix) = pattern.strip_suffix('*') {
125 return name.starts_with(prefix);
126 }
127 if let Some(suffix) = pattern.strip_prefix('*') {
128 return name.ends_with(suffix);
129 }
130 pattern == name
131}
132
133pub fn register_tool_hook(hook: ToolHook) {
134 TOOL_HOOKS.with(|hooks| hooks.borrow_mut().push(hook));
135}
136
137pub fn clear_tool_hooks() {
138 TOOL_HOOKS.with(|hooks| hooks.borrow_mut().clear());
139}
140
141pub fn run_pre_tool_hooks(tool_name: &str, args: &serde_json::Value) -> PreToolAction {
143 TOOL_HOOKS.with(|hooks| {
144 let hooks = hooks.borrow();
145 let mut current_args = args.clone();
146 for hook in hooks.iter() {
147 if !glob_match(&hook.pattern, tool_name) {
148 continue;
149 }
150 if let Some(ref pre) = hook.pre {
151 match pre(tool_name, ¤t_args) {
152 PreToolAction::Allow => {}
153 PreToolAction::Deny(reason) => return PreToolAction::Deny(reason),
154 PreToolAction::Modify(new_args) => {
155 current_args = new_args;
156 }
157 }
158 }
159 }
160 if current_args != *args {
161 PreToolAction::Modify(current_args)
162 } else {
163 PreToolAction::Allow
164 }
165 })
166}
167
168pub fn run_post_tool_hooks(tool_name: &str, result: &str) -> String {
170 TOOL_HOOKS.with(|hooks| {
171 let hooks = hooks.borrow();
172 let mut current = result.to_string();
173 for hook in hooks.iter() {
174 if !glob_match(&hook.pattern, tool_name) {
175 continue;
176 }
177 if let Some(ref post) = hook.post {
178 match post(tool_name, ¤t) {
179 PostToolAction::Pass => {}
180 PostToolAction::Modify(new_result) => {
181 current = new_result;
182 }
183 }
184 }
185 }
186 current
187 })
188}
189
190#[derive(Clone, Debug, PartialEq, Eq)]
193pub enum CompactStrategy {
194 Llm,
195 Truncate,
196 Custom,
197}
198
199pub fn parse_compact_strategy(value: &str) -> Result<CompactStrategy, VmError> {
200 match value {
201 "llm" => Ok(CompactStrategy::Llm),
202 "truncate" => Ok(CompactStrategy::Truncate),
203 "custom" => Ok(CompactStrategy::Custom),
204 other => Err(VmError::Runtime(format!(
205 "unknown compact_strategy '{other}' (expected 'llm', 'truncate', or 'custom')"
206 ))),
207 }
208}
209
210#[derive(Clone, Debug)]
212pub struct AutoCompactConfig {
213 pub token_threshold: usize,
215 pub tool_output_max_chars: usize,
217 pub keep_last: usize,
219 pub compact_strategy: CompactStrategy,
221 pub custom_compactor: Option<VmValue>,
223}
224
225impl Default for AutoCompactConfig {
226 fn default() -> Self {
227 Self {
228 token_threshold: 80_000,
229 tool_output_max_chars: 20_000,
230 keep_last: 8,
231 compact_strategy: CompactStrategy::Llm,
232 custom_compactor: None,
233 }
234 }
235}
236
237pub fn estimate_message_tokens(messages: &[serde_json::Value]) -> usize {
239 messages
240 .iter()
241 .map(|m| {
242 m.get("content")
243 .and_then(|c| c.as_str())
244 .map(|s| s.len())
245 .unwrap_or(0)
246 })
247 .sum::<usize>()
248 / 4
249}
250
251pub fn microcompact_tool_output(output: &str, max_chars: usize) -> String {
254 if output.len() <= max_chars || max_chars < 200 {
255 return output.to_string();
256 }
257 let keep = max_chars / 2;
258 let head = &output[..keep];
259 let tail = &output[output.len() - keep..];
260 let snipped = output.len() - max_chars;
261 format!("{head}\n\n[... {snipped} characters snipped ...]\n\n{tail}")
262}
263
264fn format_compaction_messages(messages: &[serde_json::Value]) -> String {
265 messages
266 .iter()
267 .map(|msg| {
268 let role = msg
269 .get("role")
270 .and_then(|v| v.as_str())
271 .unwrap_or("user")
272 .to_uppercase();
273 let content = msg
274 .get("content")
275 .and_then(|v| v.as_str())
276 .unwrap_or_default();
277 format!("{role}: {content}")
278 })
279 .collect::<Vec<_>>()
280 .join("\n")
281}
282
283fn truncate_compaction_summary(
284 old_messages: &[serde_json::Value],
285 archived_count: usize,
286) -> String {
287 let summary_parts: Vec<String> = old_messages
288 .iter()
289 .filter_map(|m| {
290 let role = m.get("role")?.as_str()?;
291 let content = m.get("content")?.as_str()?;
292 if content.is_empty() {
293 return None;
294 }
295 let truncated = if content.len() > 500 {
296 format!("{}...", &content[..500])
297 } else {
298 content.to_string()
299 };
300 Some(format!("[{role}] {truncated}"))
301 })
302 .take(15)
303 .collect();
304 format!(
305 "[auto-compacted {archived_count} older messages]\n{}{}",
306 summary_parts.join("\n"),
307 if archived_count > 15 {
308 format!("\n... and {} more", archived_count - 15)
309 } else {
310 String::new()
311 }
312 )
313}
314
315fn compact_summary_text_from_value(value: &VmValue) -> Result<String, VmError> {
316 if let Some(map) = value.as_dict() {
317 if let Some(summary) = map.get("summary").or_else(|| map.get("text")) {
318 return Ok(summary.display());
319 }
320 }
321 match value {
322 VmValue::String(text) => Ok(text.to_string()),
323 VmValue::Nil => Ok(String::new()),
324 _ => serde_json::to_string_pretty(&vm_value_to_json(value))
325 .map_err(|e| VmError::Runtime(format!("custom compactor encode error: {e}"))),
326 }
327}
328
329async fn llm_compaction_summary(
330 old_messages: &[serde_json::Value],
331 archived_count: usize,
332 llm_opts: &crate::llm::api::LlmCallOptions,
333) -> Result<String, VmError> {
334 let mut compact_opts = llm_opts.clone();
335 let formatted = format_compaction_messages(old_messages);
336 compact_opts.system = None;
337 compact_opts.transcript_id = None;
338 compact_opts.transcript_summary = None;
339 compact_opts.transcript_metadata = None;
340 compact_opts.native_tools = None;
341 compact_opts.tool_choice = None;
342 compact_opts.response_format = None;
343 compact_opts.json_schema = None;
344 compact_opts.messages = vec![serde_json::json!({
345 "role": "user",
346 "content": format!(
347 "Summarize these archived conversation messages for a follow-on coding agent. Preserve goals, constraints, decisions, completed tool work, unresolved issues, and next actions. Output only the summary text.\n\nArchived message count: {archived_count}\n\nConversation:\n{formatted}"
348 ),
349 })];
350 let result = vm_call_llm_full(&compact_opts).await?;
351 let summary = result.text.trim();
352 if summary.is_empty() {
353 Ok(truncate_compaction_summary(old_messages, archived_count))
354 } else {
355 Ok(format!(
356 "[auto-compacted {archived_count} older messages]\n{summary}"
357 ))
358 }
359}
360
361async fn custom_compaction_summary(
362 old_messages: &[serde_json::Value],
363 archived_count: usize,
364 callback: &VmValue,
365) -> Result<String, VmError> {
366 let Some(VmValue::Closure(closure)) = Some(callback.clone()) else {
367 return Err(VmError::Runtime(
368 "compact_callback must be a closure when compact_strategy is 'custom'".to_string(),
369 ));
370 };
371 let mut vm = crate::vm::take_async_builtin_child_vm().ok_or_else(|| {
372 VmError::Runtime(
373 "custom transcript compaction requires an async builtin VM context".to_string(),
374 )
375 })?;
376 let messages_vm = VmValue::List(Rc::new(
377 old_messages
378 .iter()
379 .map(crate::stdlib::json_to_vm_value)
380 .collect(),
381 ));
382 let result = vm.call_closure_pub(&closure, &[messages_vm], &[]).await;
383 crate::vm::restore_async_builtin_child_vm(vm);
384 let summary = compact_summary_text_from_value(&result?)?;
385 if summary.trim().is_empty() {
386 Ok(truncate_compaction_summary(old_messages, archived_count))
387 } else {
388 Ok(format!(
389 "[auto-compacted {archived_count} older messages]\n{summary}"
390 ))
391 }
392}
393
394pub(crate) async fn auto_compact_messages(
397 messages: &mut Vec<serde_json::Value>,
398 config: &AutoCompactConfig,
399 llm_opts: Option<&crate::llm::api::LlmCallOptions>,
400) -> Result<bool, VmError> {
401 if messages.len() <= config.keep_last {
402 return Ok(false);
403 }
404 let split_at = messages.len().saturating_sub(config.keep_last);
405 let old_messages: Vec<_> = messages.drain(..split_at).collect();
406 let archived_count = old_messages.len();
407 let summary = match config.compact_strategy {
408 CompactStrategy::Truncate => truncate_compaction_summary(&old_messages, archived_count),
409 CompactStrategy::Llm => {
410 llm_compaction_summary(
411 &old_messages,
412 archived_count,
413 llm_opts.ok_or_else(|| {
414 VmError::Runtime(
415 "LLM transcript compaction requires active LLM call options".to_string(),
416 )
417 })?,
418 )
419 .await?
420 }
421 CompactStrategy::Custom => {
422 custom_compaction_summary(
423 &old_messages,
424 archived_count,
425 config.custom_compactor.as_ref().ok_or_else(|| {
426 VmError::Runtime(
427 "compact_callback is required when compact_strategy is 'custom'"
428 .to_string(),
429 )
430 })?,
431 )
432 .await?
433 }
434 };
435 messages.insert(
436 0,
437 serde_json::json!({
438 "role": "user",
439 "content": summary,
440 }),
441 );
442 Ok(true)
443}
444
445pub fn microcompact_artifact(artifact: &mut ArtifactRecord, max_tokens: usize) {
449 let max_chars = max_tokens * 4;
450 if let Some(ref text) = artifact.text {
451 if text.len() > max_chars && max_chars >= 200 {
452 artifact.text = Some(microcompact_tool_output(text, max_chars));
453 artifact.estimated_tokens = Some(max_tokens);
454 }
455 }
456}
457
458pub fn dedup_artifacts(artifacts: &mut Vec<ArtifactRecord>) {
461 let mut seen_hashes: BTreeSet<u64> = BTreeSet::new();
462 artifacts.retain(|artifact| {
463 let text = artifact.text.as_deref().unwrap_or("");
464 if text.is_empty() {
465 return true;
466 }
467 let hash = {
469 use std::hash::{Hash, Hasher};
470 let mut hasher = std::collections::hash_map::DefaultHasher::new();
471 text.hash(&mut hasher);
472 hasher.finish()
473 };
474 seen_hashes.insert(hash)
475 });
476}
477
478pub fn select_artifacts_adaptive(
481 mut artifacts: Vec<ArtifactRecord>,
482 policy: &ContextPolicy,
483) -> Vec<ArtifactRecord> {
484 dedup_artifacts(&mut artifacts);
486
487 if let Some(max_tokens) = policy.max_tokens {
491 let count = artifacts.len().max(1);
492 let per_artifact_budget = max_tokens / count;
493 let cap = per_artifact_budget.max(500).min(max_tokens);
495 for artifact in &mut artifacts {
496 let est = artifact.estimated_tokens.unwrap_or(0);
497 if est > cap * 2 {
498 microcompact_artifact(artifact, cap);
499 }
500 }
501 }
502
503 select_artifacts(artifacts, policy)
505}
506
507#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
511#[serde(default)]
512pub struct ToolArgConstraint {
513 pub tool: String,
515 pub arg_patterns: Vec<String>,
518}
519
520pub fn enforce_tool_arg_constraints(
522 policy: &CapabilityPolicy,
523 tool_name: &str,
524 args: &serde_json::Value,
525) -> Result<(), VmError> {
526 for constraint in &policy.tool_arg_constraints {
527 if !glob_match(&constraint.tool, tool_name) {
528 continue;
529 }
530 if constraint.arg_patterns.is_empty() {
531 continue;
532 }
533 let first_arg = args
535 .as_object()
536 .and_then(|o| o.values().next())
537 .and_then(|v| v.as_str())
538 .or_else(|| args.as_str())
539 .unwrap_or("");
540 let matches = constraint
541 .arg_patterns
542 .iter()
543 .any(|pattern| glob_match(pattern, first_arg));
544 if !matches {
545 return reject_policy(format!(
546 "tool '{tool_name}' argument '{first_arg}' does not match allowed patterns: {:?}",
547 constraint.arg_patterns
548 ));
549 }
550 }
551 Ok(())
552}
553
554fn normalize_artifact_kind(kind: &str) -> String {
555 match kind {
556 "resource"
557 | "workspace_file"
558 | "editor_selection"
559 | "workspace_snapshot"
560 | "transcript_summary"
561 | "summary"
562 | "plan"
563 | "diff"
564 | "git_diff"
565 | "patch"
566 | "patch_set"
567 | "patch_proposal"
568 | "diff_review"
569 | "review_decision"
570 | "verification_bundle"
571 | "apply_intent"
572 | "verification_result"
573 | "test_result"
574 | "command_result"
575 | "provider_payload"
576 | "worker_result"
577 | "worker_notification"
578 | "artifact" => kind.to_string(),
579 "file" => "workspace_file".to_string(),
580 "transcript" => "transcript_summary".to_string(),
581 "verification" => "verification_result".to_string(),
582 "test" => "test_result".to_string(),
583 other if other.trim().is_empty() => "artifact".to_string(),
584 other => other.to_string(),
585 }
586}
587
588fn default_artifact_priority(kind: &str) -> i64 {
589 match kind {
590 "verification_result" | "test_result" => 100,
591 "verification_bundle" => 95,
592 "diff" | "git_diff" | "patch" | "patch_set" | "patch_proposal" | "diff_review"
593 | "review_decision" | "apply_intent" => 90,
594 "plan" => 80,
595 "workspace_file" | "workspace_snapshot" | "editor_selection" | "resource" => 70,
596 "summary" | "transcript_summary" => 60,
597 "command_result" => 50,
598 _ => 40,
599 }
600}
601
602fn freshness_rank(value: Option<&str>) -> i64 {
603 match value.unwrap_or_default() {
604 "fresh" | "live" => 3,
605 "recent" => 2,
606 "stale" => 0,
607 _ => 1,
608 }
609}
610
611#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
612#[serde(default)]
613pub struct CapabilityPolicy {
614 pub tools: Vec<String>,
615 pub capabilities: BTreeMap<String, Vec<String>>,
616 pub workspace_roots: Vec<String>,
617 pub side_effect_level: Option<String>,
618 pub recursion_limit: Option<usize>,
619 #[serde(default)]
621 pub tool_arg_constraints: Vec<ToolArgConstraint>,
622}
623
624impl CapabilityPolicy {
625 pub fn intersect(&self, requested: &CapabilityPolicy) -> Result<CapabilityPolicy, String> {
626 let side_effect_level = match (&self.side_effect_level, &requested.side_effect_level) {
627 (Some(a), Some(b)) => Some(min_side_effect(a, b).to_string()),
628 (Some(a), None) => Some(a.clone()),
629 (None, Some(b)) => Some(b.clone()),
630 (None, None) => None,
631 };
632
633 if !self.tools.is_empty() {
634 let denied: Vec<String> = requested
635 .tools
636 .iter()
637 .filter(|tool| !self.tools.contains(*tool))
638 .cloned()
639 .collect();
640 if !denied.is_empty() {
641 return Err(format!(
642 "requested tools exceed host ceiling: {}",
643 denied.join(", ")
644 ));
645 }
646 }
647
648 for (capability, requested_ops) in &requested.capabilities {
649 if let Some(allowed_ops) = self.capabilities.get(capability) {
650 let denied: Vec<String> = requested_ops
651 .iter()
652 .filter(|op| !allowed_ops.contains(*op))
653 .cloned()
654 .collect();
655 if !denied.is_empty() {
656 return Err(format!(
657 "requested capability operations exceed host ceiling: {}.{}",
658 capability,
659 denied.join(",")
660 ));
661 }
662 } else if !self.capabilities.is_empty() {
663 return Err(format!(
664 "requested capability exceeds host ceiling: {capability}"
665 ));
666 }
667 }
668
669 let tools = if self.tools.is_empty() {
670 requested.tools.clone()
671 } else if requested.tools.is_empty() {
672 self.tools.clone()
673 } else {
674 requested
675 .tools
676 .iter()
677 .filter(|tool| self.tools.contains(*tool))
678 .cloned()
679 .collect()
680 };
681
682 let capabilities = if self.capabilities.is_empty() {
683 requested.capabilities.clone()
684 } else if requested.capabilities.is_empty() {
685 self.capabilities.clone()
686 } else {
687 requested
688 .capabilities
689 .iter()
690 .filter_map(|(capability, requested_ops)| {
691 self.capabilities.get(capability).map(|allowed_ops| {
692 (
693 capability.clone(),
694 requested_ops
695 .iter()
696 .filter(|op| allowed_ops.contains(*op))
697 .cloned()
698 .collect::<Vec<_>>(),
699 )
700 })
701 })
702 .collect()
703 };
704
705 let workspace_roots = if self.workspace_roots.is_empty() {
706 requested.workspace_roots.clone()
707 } else if requested.workspace_roots.is_empty() {
708 self.workspace_roots.clone()
709 } else {
710 requested
711 .workspace_roots
712 .iter()
713 .filter(|root| self.workspace_roots.contains(*root))
714 .cloned()
715 .collect()
716 };
717
718 let recursion_limit = match (self.recursion_limit, requested.recursion_limit) {
719 (Some(a), Some(b)) => Some(a.min(b)),
720 (Some(a), None) => Some(a),
721 (None, Some(b)) => Some(b),
722 (None, None) => None,
723 };
724
725 let mut tool_arg_constraints = self.tool_arg_constraints.clone();
727 tool_arg_constraints.extend(requested.tool_arg_constraints.clone());
728
729 Ok(CapabilityPolicy {
730 tools,
731 capabilities,
732 workspace_roots,
733 side_effect_level,
734 recursion_limit,
735 tool_arg_constraints,
736 })
737 }
738}
739
740fn min_side_effect<'a>(a: &'a str, b: &'a str) -> &'a str {
741 fn rank(v: &str) -> usize {
742 match v {
743 "none" => 0,
744 "read_only" => 1,
745 "workspace_write" => 2,
746 "process_exec" => 3,
747 "network" => 4,
748 _ => 5,
749 }
750 }
751 if rank(a) <= rank(b) {
752 a
753 } else {
754 b
755 }
756}
757
758#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
759#[serde(default)]
760pub struct ModelPolicy {
761 pub provider: Option<String>,
762 pub model: Option<String>,
763 pub model_tier: Option<String>,
764 pub temperature: Option<f64>,
765 pub max_tokens: Option<i64>,
766}
767
768#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
769#[serde(default)]
770pub struct TranscriptPolicy {
771 pub mode: Option<String>,
772 pub visibility: Option<String>,
773 pub summarize: bool,
774 pub compact: bool,
775 pub keep_last: Option<usize>,
776}
777
778#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
779#[serde(default)]
780pub struct ContextPolicy {
781 pub max_artifacts: Option<usize>,
782 pub max_tokens: Option<usize>,
783 pub reserve_tokens: Option<usize>,
784 pub include_kinds: Vec<String>,
785 pub exclude_kinds: Vec<String>,
786 pub prioritize_kinds: Vec<String>,
787 pub pinned_ids: Vec<String>,
788 pub include_stages: Vec<String>,
789 pub prefer_recent: bool,
790 pub prefer_fresh: bool,
791 pub render: Option<String>,
792}
793
794#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
795#[serde(default)]
796pub struct RetryPolicy {
797 pub max_attempts: usize,
798 pub verify: bool,
799 pub repair: bool,
800}
801
802#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
803#[serde(default)]
804pub struct StageContract {
805 pub input_kinds: Vec<String>,
806 pub output_kinds: Vec<String>,
807 pub min_inputs: Option<usize>,
808 pub max_inputs: Option<usize>,
809 pub require_transcript: bool,
810 pub schema: Option<serde_json::Value>,
811}
812
813#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
814#[serde(default)]
815pub struct BranchSemantics {
816 pub success: Option<String>,
817 pub failure: Option<String>,
818 pub verify_pass: Option<String>,
819 pub verify_fail: Option<String>,
820 pub condition_true: Option<String>,
821 pub condition_false: Option<String>,
822 pub loop_continue: Option<String>,
823 pub loop_exit: Option<String>,
824 pub escalation: Option<String>,
825}
826
827#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
828#[serde(default)]
829pub struct MapPolicy {
830 pub items: Vec<serde_json::Value>,
831 pub item_artifact_kind: Option<String>,
832 pub output_kind: Option<String>,
833 pub max_items: Option<usize>,
834}
835
836#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
837#[serde(default)]
838pub struct JoinPolicy {
839 pub strategy: String,
840 pub require_all_inputs: bool,
841 pub min_completed: Option<usize>,
842}
843
844#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
845#[serde(default)]
846pub struct ReducePolicy {
847 pub strategy: String,
848 pub separator: Option<String>,
849 pub output_kind: Option<String>,
850}
851
852#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
853#[serde(default)]
854pub struct EscalationPolicy {
855 pub level: Option<String>,
856 pub queue: Option<String>,
857 pub reason: Option<String>,
858}
859
860#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
861#[serde(default)]
862pub struct ArtifactRecord {
863 #[serde(rename = "_type")]
864 pub type_name: String,
865 pub id: String,
866 pub kind: String,
867 pub title: Option<String>,
868 pub text: Option<String>,
869 pub data: Option<serde_json::Value>,
870 pub source: Option<String>,
871 pub created_at: String,
872 pub freshness: Option<String>,
873 pub priority: Option<i64>,
874 pub lineage: Vec<String>,
875 pub relevance: Option<f64>,
876 pub estimated_tokens: Option<usize>,
877 pub stage: Option<String>,
878 pub metadata: BTreeMap<String, serde_json::Value>,
879}
880
881impl ArtifactRecord {
882 pub fn normalize(mut self) -> Self {
883 if self.type_name.is_empty() {
884 self.type_name = "artifact".to_string();
885 }
886 if self.id.is_empty() {
887 self.id = new_id("artifact");
888 }
889 if self.created_at.is_empty() {
890 self.created_at = now_rfc3339();
891 }
892 if self.kind.is_empty() {
893 self.kind = "artifact".to_string();
894 }
895 self.kind = normalize_artifact_kind(&self.kind);
896 if self.estimated_tokens.is_none() {
897 self.estimated_tokens = self
898 .text
899 .as_ref()
900 .map(|text| ((text.len() as f64) / 4.0).ceil() as usize);
901 }
902 if self.priority.is_none() {
903 self.priority = Some(default_artifact_priority(&self.kind));
904 }
905 self
906 }
907}
908
909#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
910#[serde(default)]
911pub struct WorkflowNode {
912 pub id: Option<String>,
913 pub kind: String,
914 pub mode: Option<String>,
915 pub prompt: Option<String>,
916 pub system: Option<String>,
917 pub task_label: Option<String>,
918 pub done_sentinel: Option<String>,
919 pub tools: serde_json::Value,
920 pub model_policy: ModelPolicy,
921 pub transcript_policy: TranscriptPolicy,
922 pub context_policy: ContextPolicy,
923 pub retry_policy: RetryPolicy,
924 pub capability_policy: CapabilityPolicy,
925 pub input_contract: StageContract,
926 pub output_contract: StageContract,
927 pub branch_semantics: BranchSemantics,
928 pub map_policy: MapPolicy,
929 pub join_policy: JoinPolicy,
930 pub reduce_policy: ReducePolicy,
931 pub escalation_policy: EscalationPolicy,
932 pub verify: Option<serde_json::Value>,
933 pub metadata: BTreeMap<String, serde_json::Value>,
934}
935
936pub fn workflow_tool_names(value: &serde_json::Value) -> Vec<String> {
937 match value {
938 serde_json::Value::Null => Vec::new(),
939 serde_json::Value::Array(items) => items
940 .iter()
941 .filter_map(|item| match item {
942 serde_json::Value::Object(map) => map
943 .get("name")
944 .and_then(|value| value.as_str())
945 .filter(|name| !name.is_empty())
946 .map(|name| name.to_string()),
947 _ => None,
948 })
949 .collect(),
950 serde_json::Value::Object(map) => {
951 if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
952 return map
953 .get("tools")
954 .map(workflow_tool_names)
955 .unwrap_or_default();
956 }
957 map.get("name")
958 .and_then(|value| value.as_str())
959 .filter(|name| !name.is_empty())
960 .map(|name| vec![name.to_string()])
961 .unwrap_or_default()
962 }
963 _ => Vec::new(),
964 }
965}
966
967#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
968#[serde(default)]
969pub struct WorkflowEdge {
970 pub from: String,
971 pub to: String,
972 pub branch: Option<String>,
973 pub label: Option<String>,
974}
975
976#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
977#[serde(default)]
978pub struct WorkflowGraph {
979 #[serde(rename = "_type")]
980 pub type_name: String,
981 pub id: String,
982 pub name: Option<String>,
983 pub version: usize,
984 pub entry: String,
985 pub nodes: BTreeMap<String, WorkflowNode>,
986 pub edges: Vec<WorkflowEdge>,
987 pub capability_policy: CapabilityPolicy,
988 pub metadata: BTreeMap<String, serde_json::Value>,
989 pub audit_log: Vec<WorkflowAuditEntry>,
990}
991
992#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
993#[serde(default)]
994pub struct WorkflowAuditEntry {
995 pub id: String,
996 pub op: String,
997 pub node_id: Option<String>,
998 pub timestamp: String,
999 pub reason: Option<String>,
1000 pub metadata: BTreeMap<String, serde_json::Value>,
1001}
1002
1003#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
1004#[serde(default)]
1005pub struct LlmUsageRecord {
1006 pub input_tokens: i64,
1007 pub output_tokens: i64,
1008 pub total_duration_ms: i64,
1009 pub call_count: i64,
1010 pub total_cost: f64,
1011 pub models: Vec<String>,
1012}
1013
1014#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
1015#[serde(default)]
1016pub struct RunStageRecord {
1017 pub id: String,
1018 pub node_id: String,
1019 pub kind: String,
1020 pub status: String,
1021 pub outcome: String,
1022 pub branch: Option<String>,
1023 pub started_at: String,
1024 pub finished_at: Option<String>,
1025 pub visible_text: Option<String>,
1026 pub private_reasoning: Option<String>,
1027 pub transcript: Option<serde_json::Value>,
1028 pub verification: Option<serde_json::Value>,
1029 pub usage: Option<LlmUsageRecord>,
1030 pub artifacts: Vec<ArtifactRecord>,
1031 pub consumed_artifact_ids: Vec<String>,
1032 pub produced_artifact_ids: Vec<String>,
1033 pub attempts: Vec<RunStageAttemptRecord>,
1034 pub metadata: BTreeMap<String, serde_json::Value>,
1035}
1036
1037#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
1038#[serde(default)]
1039pub struct RunStageAttemptRecord {
1040 pub attempt: usize,
1041 pub status: String,
1042 pub outcome: String,
1043 pub branch: Option<String>,
1044 pub error: Option<String>,
1045 pub verification: Option<serde_json::Value>,
1046 pub started_at: String,
1047 pub finished_at: Option<String>,
1048}
1049
1050#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1051#[serde(default)]
1052pub struct RunTransitionRecord {
1053 pub id: String,
1054 pub from_stage_id: Option<String>,
1055 pub from_node_id: Option<String>,
1056 pub to_node_id: String,
1057 pub branch: Option<String>,
1058 pub timestamp: String,
1059 pub consumed_artifact_ids: Vec<String>,
1060 pub produced_artifact_ids: Vec<String>,
1061}
1062
1063#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1064#[serde(default)]
1065pub struct RunCheckpointRecord {
1066 pub id: String,
1067 pub ready_nodes: Vec<String>,
1068 pub completed_nodes: Vec<String>,
1069 pub last_stage_id: Option<String>,
1070 pub persisted_at: String,
1071 pub reason: String,
1072}
1073
1074#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1075#[serde(default)]
1076pub struct ReplayFixture {
1077 #[serde(rename = "_type")]
1078 pub type_name: String,
1079 pub id: String,
1080 pub source_run_id: String,
1081 pub workflow_id: String,
1082 pub workflow_name: Option<String>,
1083 pub created_at: String,
1084 pub expected_status: String,
1085 pub stage_assertions: Vec<ReplayStageAssertion>,
1086}
1087
1088#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1089#[serde(default)]
1090pub struct ReplayStageAssertion {
1091 pub node_id: String,
1092 pub expected_status: String,
1093 pub expected_outcome: String,
1094 pub expected_branch: Option<String>,
1095 pub required_artifact_kinds: Vec<String>,
1096 pub visible_text_contains: Option<String>,
1097}
1098
1099#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1100#[serde(default)]
1101pub struct ReplayEvalReport {
1102 pub pass: bool,
1103 pub failures: Vec<String>,
1104 pub stage_count: usize,
1105}
1106
1107#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1108#[serde(default)]
1109pub struct ReplayEvalCaseReport {
1110 pub run_id: String,
1111 pub workflow_id: String,
1112 pub label: Option<String>,
1113 pub pass: bool,
1114 pub failures: Vec<String>,
1115 pub stage_count: usize,
1116 pub source_path: Option<String>,
1117 pub comparison: Option<RunDiffReport>,
1118}
1119
1120#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1121#[serde(default)]
1122pub struct ReplayEvalSuiteReport {
1123 pub pass: bool,
1124 pub total: usize,
1125 pub passed: usize,
1126 pub failed: usize,
1127 pub cases: Vec<ReplayEvalCaseReport>,
1128}
1129
1130#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1131#[serde(default)]
1132pub struct RunStageDiffRecord {
1133 pub node_id: String,
1134 pub change: String,
1135 pub details: Vec<String>,
1136}
1137
1138#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1139#[serde(default)]
1140pub struct RunDiffReport {
1141 pub left_run_id: String,
1142 pub right_run_id: String,
1143 pub identical: bool,
1144 pub status_changed: bool,
1145 pub left_status: String,
1146 pub right_status: String,
1147 pub stage_diffs: Vec<RunStageDiffRecord>,
1148 pub transition_count_delta: isize,
1149 pub artifact_count_delta: isize,
1150 pub checkpoint_count_delta: isize,
1151}
1152
1153#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1154#[serde(default)]
1155pub struct EvalSuiteManifest {
1156 #[serde(rename = "_type")]
1157 pub type_name: String,
1158 pub id: String,
1159 pub name: Option<String>,
1160 pub base_dir: Option<String>,
1161 pub cases: Vec<EvalSuiteCase>,
1162}
1163
1164#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1165#[serde(default)]
1166pub struct EvalSuiteCase {
1167 pub label: Option<String>,
1168 pub run_path: String,
1169 pub fixture_path: Option<String>,
1170 pub compare_to: Option<String>,
1171}
1172
1173#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
1174#[serde(default)]
1175pub struct RunRecord {
1176 #[serde(rename = "_type")]
1177 pub type_name: String,
1178 pub id: String,
1179 pub workflow_id: String,
1180 pub workflow_name: Option<String>,
1181 pub task: String,
1182 pub status: String,
1183 pub started_at: String,
1184 pub finished_at: Option<String>,
1185 pub parent_run_id: Option<String>,
1186 pub root_run_id: Option<String>,
1187 pub stages: Vec<RunStageRecord>,
1188 pub transitions: Vec<RunTransitionRecord>,
1189 pub checkpoints: Vec<RunCheckpointRecord>,
1190 pub pending_nodes: Vec<String>,
1191 pub completed_nodes: Vec<String>,
1192 pub child_runs: Vec<RunChildRecord>,
1193 pub artifacts: Vec<ArtifactRecord>,
1194 pub policy: CapabilityPolicy,
1195 pub execution: Option<RunExecutionRecord>,
1196 pub transcript: Option<serde_json::Value>,
1197 pub usage: Option<LlmUsageRecord>,
1198 pub replay_fixture: Option<ReplayFixture>,
1199 pub metadata: BTreeMap<String, serde_json::Value>,
1200 pub persisted_path: Option<String>,
1201}
1202
1203#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1204#[serde(default)]
1205pub struct RunChildRecord {
1206 pub worker_id: String,
1207 pub worker_name: String,
1208 pub parent_stage_id: Option<String>,
1209 pub session_id: Option<String>,
1210 pub parent_session_id: Option<String>,
1211 pub mutation_scope: Option<String>,
1212 pub approval_mode: Option<String>,
1213 pub task: String,
1214 pub status: String,
1215 pub started_at: String,
1216 pub finished_at: Option<String>,
1217 pub run_id: Option<String>,
1218 pub run_path: Option<String>,
1219 pub snapshot_path: Option<String>,
1220 pub execution: Option<RunExecutionRecord>,
1221}
1222
1223#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1224#[serde(default)]
1225pub struct RunExecutionRecord {
1226 pub cwd: Option<String>,
1227 pub source_dir: Option<String>,
1228 pub env: BTreeMap<String, String>,
1229 pub adapter: Option<String>,
1230 pub repo_path: Option<String>,
1231 pub worktree_path: Option<String>,
1232 pub branch: Option<String>,
1233 pub base_ref: Option<String>,
1234 pub cleanup: Option<String>,
1235}
1236
1237#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1238#[serde(default)]
1239pub struct WorkflowValidationReport {
1240 pub valid: bool,
1241 pub errors: Vec<String>,
1242 pub warnings: Vec<String>,
1243 pub reachable_nodes: Vec<String>,
1244}
1245
1246fn parse_json_payload<T: for<'de> Deserialize<'de>>(
1247 json: serde_json::Value,
1248 label: &str,
1249) -> Result<T, VmError> {
1250 let payload = json.to_string();
1251 let mut deserializer = serde_json::Deserializer::from_str(&payload);
1252 let mut tracker = serde_path_to_error::Track::new();
1253 let path_deserializer = serde_path_to_error::Deserializer::new(&mut deserializer, &mut tracker);
1254 T::deserialize(path_deserializer).map_err(|error| {
1255 let snippet = if payload.len() > 600 {
1256 format!("{}...", &payload[..600])
1257 } else {
1258 payload.clone()
1259 };
1260 VmError::Runtime(format!(
1261 "{label} parse error at {}: {} | payload={}",
1262 tracker.path(),
1263 error,
1264 snippet
1265 ))
1266 })
1267}
1268
1269fn parse_json_value<T: for<'de> Deserialize<'de>>(value: &VmValue) -> Result<T, VmError> {
1270 parse_json_payload(vm_value_to_json(value), "orchestration")
1271}
1272
1273pub fn parse_workflow_node_value(value: &VmValue, label: &str) -> Result<WorkflowNode, VmError> {
1274 parse_json_payload(vm_value_to_json(value), label)
1275}
1276
1277pub fn parse_workflow_node_json(
1278 json: serde_json::Value,
1279 label: &str,
1280) -> Result<WorkflowNode, VmError> {
1281 parse_json_payload(json, label)
1282}
1283
1284pub fn parse_workflow_edge_json(
1285 json: serde_json::Value,
1286 label: &str,
1287) -> Result<WorkflowEdge, VmError> {
1288 parse_json_payload(json, label)
1289}
1290
1291pub fn normalize_workflow_value(value: &VmValue) -> Result<WorkflowGraph, VmError> {
1292 let mut graph: WorkflowGraph = parse_json_value(value)?;
1293 let as_dict = value.as_dict().cloned().unwrap_or_default();
1294
1295 if graph.nodes.is_empty() {
1296 for key in ["act", "verify", "repair"] {
1297 if let Some(node_value) = as_dict.get(key) {
1298 let mut node: WorkflowNode = parse_json_value(node_value)?;
1299 let raw_node = node_value.as_dict().cloned().unwrap_or_default();
1300 node.id = Some(key.to_string());
1301 if node.kind.is_empty() {
1302 node.kind = if key == "verify" {
1303 "verify".to_string()
1304 } else {
1305 "stage".to_string()
1306 };
1307 }
1308 if node.model_policy.provider.is_none() {
1309 node.model_policy.provider = as_dict
1310 .get("provider")
1311 .map(|value| value.display())
1312 .filter(|value| !value.is_empty());
1313 }
1314 if node.model_policy.model.is_none() {
1315 node.model_policy.model = as_dict
1316 .get("model")
1317 .map(|value| value.display())
1318 .filter(|value| !value.is_empty());
1319 }
1320 if node.model_policy.model_tier.is_none() {
1321 node.model_policy.model_tier = as_dict
1322 .get("model_tier")
1323 .or_else(|| as_dict.get("tier"))
1324 .map(|value| value.display())
1325 .filter(|value| !value.is_empty());
1326 }
1327 if node.model_policy.temperature.is_none() {
1328 node.model_policy.temperature = as_dict.get("temperature").and_then(|value| {
1329 if let VmValue::Float(number) = value {
1330 Some(*number)
1331 } else {
1332 value.as_int().map(|number| number as f64)
1333 }
1334 });
1335 }
1336 if node.model_policy.max_tokens.is_none() {
1337 node.model_policy.max_tokens =
1338 as_dict.get("max_tokens").and_then(|value| value.as_int());
1339 }
1340 if node.mode.is_none() {
1341 node.mode = as_dict
1342 .get("mode")
1343 .map(|value| value.display())
1344 .filter(|value| !value.is_empty());
1345 }
1346 if node.done_sentinel.is_none() {
1347 node.done_sentinel = as_dict
1348 .get("done_sentinel")
1349 .map(|value| value.display())
1350 .filter(|value| !value.is_empty());
1351 }
1352 if key == "verify"
1353 && node.verify.is_none()
1354 && (raw_node.contains_key("assert_text")
1355 || raw_node.contains_key("command")
1356 || raw_node.contains_key("expect_status")
1357 || raw_node.contains_key("expect_text"))
1358 {
1359 node.verify = Some(serde_json::json!({
1360 "assert_text": raw_node.get("assert_text").map(vm_value_to_json),
1361 "command": raw_node.get("command").map(vm_value_to_json),
1362 "expect_status": raw_node.get("expect_status").map(vm_value_to_json),
1363 "expect_text": raw_node.get("expect_text").map(vm_value_to_json),
1364 }));
1365 }
1366 graph.nodes.insert(key.to_string(), node);
1367 }
1368 }
1369 if graph.entry.is_empty() && graph.nodes.contains_key("act") {
1370 graph.entry = "act".to_string();
1371 }
1372 if graph.edges.is_empty() && graph.nodes.contains_key("act") {
1373 if graph.nodes.contains_key("verify") {
1374 graph.edges.push(WorkflowEdge {
1375 from: "act".to_string(),
1376 to: "verify".to_string(),
1377 branch: None,
1378 label: None,
1379 });
1380 }
1381 if graph.nodes.contains_key("repair") {
1382 graph.edges.push(WorkflowEdge {
1383 from: "verify".to_string(),
1384 to: "repair".to_string(),
1385 branch: Some("failed".to_string()),
1386 label: None,
1387 });
1388 graph.edges.push(WorkflowEdge {
1389 from: "repair".to_string(),
1390 to: "verify".to_string(),
1391 branch: Some("retry".to_string()),
1392 label: None,
1393 });
1394 }
1395 }
1396 }
1397
1398 if graph.type_name.is_empty() {
1399 graph.type_name = "workflow_graph".to_string();
1400 }
1401 if graph.id.is_empty() {
1402 graph.id = new_id("workflow");
1403 }
1404 if graph.version == 0 {
1405 graph.version = 1;
1406 }
1407 if graph.entry.is_empty() {
1408 graph.entry = graph
1409 .nodes
1410 .keys()
1411 .next()
1412 .cloned()
1413 .unwrap_or_else(|| "act".to_string());
1414 }
1415 for (node_id, node) in &mut graph.nodes {
1416 if node.id.is_none() {
1417 node.id = Some(node_id.clone());
1418 }
1419 if node.kind.is_empty() {
1420 node.kind = "stage".to_string();
1421 }
1422 if node.join_policy.strategy.is_empty() {
1423 node.join_policy.strategy = "all".to_string();
1424 }
1425 if node.reduce_policy.strategy.is_empty() {
1426 node.reduce_policy.strategy = "concat".to_string();
1427 }
1428 if node.output_contract.output_kinds.is_empty() {
1429 node.output_contract.output_kinds = vec![match node.kind.as_str() {
1430 "verify" => "verification_result".to_string(),
1431 "reduce" => node
1432 .reduce_policy
1433 .output_kind
1434 .clone()
1435 .unwrap_or_else(|| "summary".to_string()),
1436 "map" => node
1437 .map_policy
1438 .output_kind
1439 .clone()
1440 .unwrap_or_else(|| "artifact".to_string()),
1441 "escalation" => "plan".to_string(),
1442 _ => "artifact".to_string(),
1443 }];
1444 }
1445 if node.retry_policy.max_attempts == 0 {
1446 node.retry_policy.max_attempts = 1;
1447 }
1448 }
1449 Ok(graph)
1450}
1451
1452pub fn validate_workflow(
1453 graph: &WorkflowGraph,
1454 ceiling: Option<&CapabilityPolicy>,
1455) -> WorkflowValidationReport {
1456 let mut errors = Vec::new();
1457 let mut warnings = Vec::new();
1458
1459 if !graph.nodes.contains_key(&graph.entry) {
1460 errors.push(format!("entry node does not exist: {}", graph.entry));
1461 }
1462
1463 let node_ids: BTreeSet<String> = graph.nodes.keys().cloned().collect();
1464 for edge in &graph.edges {
1465 if !node_ids.contains(&edge.from) {
1466 errors.push(format!("edge.from references unknown node: {}", edge.from));
1467 }
1468 if !node_ids.contains(&edge.to) {
1469 errors.push(format!("edge.to references unknown node: {}", edge.to));
1470 }
1471 }
1472
1473 let reachable_nodes = reachable_nodes(graph);
1474 for node_id in &node_ids {
1475 if !reachable_nodes.contains(node_id) {
1476 warnings.push(format!("node is unreachable: {node_id}"));
1477 }
1478 }
1479
1480 for (node_id, node) in &graph.nodes {
1481 let incoming = graph
1482 .edges
1483 .iter()
1484 .filter(|edge| edge.to == *node_id)
1485 .count();
1486 let outgoing: Vec<&WorkflowEdge> = graph
1487 .edges
1488 .iter()
1489 .filter(|edge| edge.from == *node_id)
1490 .collect();
1491 if let Some(min_inputs) = node.input_contract.min_inputs {
1492 if let Some(max_inputs) = node.input_contract.max_inputs {
1493 if min_inputs > max_inputs {
1494 errors.push(format!(
1495 "node {node_id}: input contract min_inputs exceeds max_inputs"
1496 ));
1497 }
1498 }
1499 }
1500 match node.kind.as_str() {
1501 "condition" => {
1502 let has_true = outgoing
1503 .iter()
1504 .any(|edge| edge.branch.as_deref() == Some("true"));
1505 let has_false = outgoing
1506 .iter()
1507 .any(|edge| edge.branch.as_deref() == Some("false"));
1508 if !has_true || !has_false {
1509 errors.push(format!(
1510 "node {node_id}: condition nodes require both 'true' and 'false' branch edges"
1511 ));
1512 }
1513 }
1514 "fork" => {
1515 if outgoing.len() < 2 {
1516 errors.push(format!(
1517 "node {node_id}: fork nodes require at least two outgoing edges"
1518 ));
1519 }
1520 }
1521 "join" => {
1522 if incoming < 2 {
1523 warnings.push(format!(
1524 "node {node_id}: join node has fewer than two incoming edges"
1525 ));
1526 }
1527 }
1528 "map" => {
1529 if node.map_policy.items.is_empty()
1530 && node.map_policy.item_artifact_kind.is_none()
1531 && node.input_contract.input_kinds.is_empty()
1532 {
1533 errors.push(format!(
1534 "node {node_id}: map nodes require items, item_artifact_kind, or input_contract.input_kinds"
1535 ));
1536 }
1537 }
1538 "reduce" => {
1539 if node.input_contract.input_kinds.is_empty() {
1540 warnings.push(format!(
1541 "node {node_id}: reduce node has no input_contract.input_kinds; it will consume all available artifacts"
1542 ));
1543 }
1544 }
1545 _ => {}
1546 }
1547 }
1548
1549 if let Some(ceiling) = ceiling {
1550 if let Err(error) = ceiling.intersect(&graph.capability_policy) {
1551 errors.push(error);
1552 }
1553 for (node_id, node) in &graph.nodes {
1554 if let Err(error) = ceiling.intersect(&node.capability_policy) {
1555 errors.push(format!("node {node_id}: {error}"));
1556 }
1557 }
1558 }
1559
1560 WorkflowValidationReport {
1561 valid: errors.is_empty(),
1562 errors,
1563 warnings,
1564 reachable_nodes: reachable_nodes.into_iter().collect(),
1565 }
1566}
1567
1568fn reachable_nodes(graph: &WorkflowGraph) -> BTreeSet<String> {
1569 let mut seen = BTreeSet::new();
1570 let mut stack = vec![graph.entry.clone()];
1571 while let Some(node_id) = stack.pop() {
1572 if !seen.insert(node_id.clone()) {
1573 continue;
1574 }
1575 for edge in graph.edges.iter().filter(|edge| edge.from == node_id) {
1576 stack.push(edge.to.clone());
1577 }
1578 }
1579 seen
1580}
1581
1582pub fn select_artifacts(
1583 mut artifacts: Vec<ArtifactRecord>,
1584 policy: &ContextPolicy,
1585) -> Vec<ArtifactRecord> {
1586 artifacts.retain(|artifact| {
1587 (policy.include_kinds.is_empty() || policy.include_kinds.contains(&artifact.kind))
1588 && !policy.exclude_kinds.contains(&artifact.kind)
1589 && (policy.include_stages.is_empty()
1590 || artifact
1591 .stage
1592 .as_ref()
1593 .is_some_and(|stage| policy.include_stages.contains(stage)))
1594 });
1595 artifacts.sort_by(|a, b| {
1596 let b_pinned = policy.pinned_ids.contains(&b.id);
1597 let a_pinned = policy.pinned_ids.contains(&a.id);
1598 b_pinned
1599 .cmp(&a_pinned)
1600 .then_with(|| {
1601 let b_prio_kind = policy.prioritize_kinds.contains(&b.kind);
1602 let a_prio_kind = policy.prioritize_kinds.contains(&a.kind);
1603 b_prio_kind.cmp(&a_prio_kind)
1604 })
1605 .then_with(|| {
1606 b.priority
1607 .unwrap_or_default()
1608 .cmp(&a.priority.unwrap_or_default())
1609 })
1610 .then_with(|| {
1611 if policy.prefer_fresh {
1612 freshness_rank(b.freshness.as_deref())
1613 .cmp(&freshness_rank(a.freshness.as_deref()))
1614 } else {
1615 std::cmp::Ordering::Equal
1616 }
1617 })
1618 .then_with(|| {
1619 if policy.prefer_recent {
1620 b.created_at.cmp(&a.created_at)
1621 } else {
1622 std::cmp::Ordering::Equal
1623 }
1624 })
1625 .then_with(|| {
1626 b.relevance
1627 .partial_cmp(&a.relevance)
1628 .unwrap_or(std::cmp::Ordering::Equal)
1629 })
1630 .then_with(|| {
1631 a.estimated_tokens
1632 .unwrap_or(usize::MAX)
1633 .cmp(&b.estimated_tokens.unwrap_or(usize::MAX))
1634 })
1635 });
1636
1637 let mut selected = Vec::new();
1638 let mut used_tokens = 0usize;
1639 let reserve_tokens = policy.reserve_tokens.unwrap_or(0);
1640 let effective_max_tokens = policy
1641 .max_tokens
1642 .map(|max| max.saturating_sub(reserve_tokens));
1643 for artifact in artifacts {
1644 if let Some(max_artifacts) = policy.max_artifacts {
1645 if selected.len() >= max_artifacts {
1646 break;
1647 }
1648 }
1649 let next_tokens = artifact.estimated_tokens.unwrap_or(0);
1650 if let Some(max_tokens) = effective_max_tokens {
1651 if used_tokens + next_tokens > max_tokens {
1652 continue;
1653 }
1654 }
1655 used_tokens += next_tokens;
1656 selected.push(artifact);
1657 }
1658 selected
1659}
1660
1661pub fn render_artifacts_context(artifacts: &[ArtifactRecord], policy: &ContextPolicy) -> String {
1662 let mut parts = Vec::new();
1663 for artifact in artifacts {
1664 let title = artifact
1665 .title
1666 .clone()
1667 .unwrap_or_else(|| format!("{} {}", artifact.kind, artifact.id));
1668 let body = artifact
1669 .text
1670 .clone()
1671 .or_else(|| artifact.data.as_ref().map(|v| v.to_string()))
1672 .unwrap_or_default();
1673 match policy.render.as_deref() {
1674 Some("json") => {
1675 parts.push(
1676 serde_json::json!({
1677 "id": artifact.id,
1678 "kind": artifact.kind,
1679 "title": title,
1680 "source": artifact.source,
1681 "freshness": artifact.freshness,
1682 "priority": artifact.priority,
1683 "text": body,
1684 })
1685 .to_string(),
1686 );
1687 }
1688 _ => parts.push(format!(
1689 "[{title}] kind={} source={} freshness={} priority={}\n{}",
1690 artifact.kind,
1691 artifact
1692 .source
1693 .clone()
1694 .unwrap_or_else(|| "unknown".to_string()),
1695 artifact
1696 .freshness
1697 .clone()
1698 .unwrap_or_else(|| "normal".to_string()),
1699 artifact.priority.unwrap_or_default(),
1700 body
1701 )),
1702 }
1703 }
1704 parts.join("\n\n")
1705}
1706
1707pub fn normalize_artifact(value: &VmValue) -> Result<ArtifactRecord, VmError> {
1708 let artifact: ArtifactRecord = parse_json_value(value)?;
1709 Ok(artifact.normalize())
1710}
1711
1712pub fn normalize_run_record(value: &VmValue) -> Result<RunRecord, VmError> {
1713 let json = vm_value_to_json(value);
1714 let payload = json.to_string();
1715 let mut deserializer = serde_json::Deserializer::from_str(&payload);
1716 let mut tracker = serde_path_to_error::Track::new();
1717 let path_deserializer = serde_path_to_error::Deserializer::new(&mut deserializer, &mut tracker);
1718 let mut run: RunRecord = RunRecord::deserialize(path_deserializer).map_err(|error| {
1719 let snippet = if payload.len() > 600 {
1720 format!("{}...", &payload[..600])
1721 } else {
1722 payload.clone()
1723 };
1724 VmError::Runtime(format!(
1725 "orchestration parse error at {}: {} | payload={}",
1726 tracker.path(),
1727 error,
1728 snippet
1729 ))
1730 })?;
1731 if run.type_name.is_empty() {
1732 run.type_name = "run_record".to_string();
1733 }
1734 if run.id.is_empty() {
1735 run.id = new_id("run");
1736 }
1737 if run.started_at.is_empty() {
1738 run.started_at = now_rfc3339();
1739 }
1740 if run.status.is_empty() {
1741 run.status = "running".to_string();
1742 }
1743 if run.root_run_id.is_none() {
1744 run.root_run_id = Some(run.id.clone());
1745 }
1746 if run.replay_fixture.is_none() {
1747 run.replay_fixture = Some(replay_fixture_from_run(&run));
1748 }
1749 Ok(run)
1750}
1751
1752pub fn normalize_eval_suite_manifest(value: &VmValue) -> Result<EvalSuiteManifest, VmError> {
1753 let mut manifest: EvalSuiteManifest = parse_json_value(value)?;
1754 if manifest.type_name.is_empty() {
1755 manifest.type_name = "eval_suite_manifest".to_string();
1756 }
1757 if manifest.id.is_empty() {
1758 manifest.id = new_id("eval_suite");
1759 }
1760 Ok(manifest)
1761}
1762
1763fn load_replay_fixture(path: &Path) -> Result<ReplayFixture, VmError> {
1764 let content = std::fs::read_to_string(path)
1765 .map_err(|e| VmError::Runtime(format!("failed to read replay fixture: {e}")))?;
1766 serde_json::from_str(&content)
1767 .map_err(|e| VmError::Runtime(format!("failed to parse replay fixture: {e}")))
1768}
1769
1770fn resolve_manifest_path(base_dir: Option<&Path>, path: &str) -> PathBuf {
1771 let path_buf = PathBuf::from(path);
1772 if path_buf.is_absolute() {
1773 path_buf
1774 } else if let Some(base_dir) = base_dir {
1775 base_dir.join(path_buf)
1776 } else {
1777 path_buf
1778 }
1779}
1780
1781pub fn evaluate_run_suite_manifest(
1782 manifest: &EvalSuiteManifest,
1783) -> Result<ReplayEvalSuiteReport, VmError> {
1784 let base_dir = manifest.base_dir.as_deref().map(Path::new);
1785 let mut reports = Vec::new();
1786 for case in &manifest.cases {
1787 let run_path = resolve_manifest_path(base_dir, &case.run_path);
1788 let run = load_run_record(&run_path)?;
1789 let fixture = match &case.fixture_path {
1790 Some(path) => load_replay_fixture(&resolve_manifest_path(base_dir, path))?,
1791 None => run
1792 .replay_fixture
1793 .clone()
1794 .unwrap_or_else(|| replay_fixture_from_run(&run)),
1795 };
1796 let eval = evaluate_run_against_fixture(&run, &fixture);
1797 let mut pass = eval.pass;
1798 let mut failures = eval.failures;
1799 let comparison = match &case.compare_to {
1800 Some(path) => {
1801 let baseline_path = resolve_manifest_path(base_dir, path);
1802 let baseline = load_run_record(&baseline_path)?;
1803 let diff = diff_run_records(&baseline, &run);
1804 if !diff.identical {
1805 pass = false;
1806 failures.push(format!(
1807 "run differs from baseline {} with {} stage changes",
1808 baseline_path.display(),
1809 diff.stage_diffs.len()
1810 ));
1811 }
1812 Some(diff)
1813 }
1814 None => None,
1815 };
1816 reports.push(ReplayEvalCaseReport {
1817 run_id: run.id.clone(),
1818 workflow_id: run.workflow_id.clone(),
1819 label: case.label.clone(),
1820 pass,
1821 failures,
1822 stage_count: eval.stage_count,
1823 source_path: Some(run_path.display().to_string()),
1824 comparison,
1825 });
1826 }
1827 let total = reports.len();
1828 let passed = reports.iter().filter(|report| report.pass).count();
1829 let failed = total.saturating_sub(passed);
1830 Ok(ReplayEvalSuiteReport {
1831 pass: failed == 0,
1832 total,
1833 passed,
1834 failed,
1835 cases: reports,
1836 })
1837}
1838
1839pub fn render_unified_diff(path: Option<&str>, before: &str, after: &str) -> String {
1840 let before_lines: Vec<&str> = before.lines().collect();
1841 let after_lines: Vec<&str> = after.lines().collect();
1842 let mut table = vec![vec![0usize; after_lines.len() + 1]; before_lines.len() + 1];
1843 for i in (0..before_lines.len()).rev() {
1844 for j in (0..after_lines.len()).rev() {
1845 table[i][j] = if before_lines[i] == after_lines[j] {
1846 table[i + 1][j + 1] + 1
1847 } else {
1848 table[i + 1][j].max(table[i][j + 1])
1849 };
1850 }
1851 }
1852
1853 let mut diff = String::new();
1854 let file = path.unwrap_or("artifact");
1855 diff.push_str(&format!("--- a/{file}\n+++ b/{file}\n"));
1856 let mut i = 0;
1857 let mut j = 0;
1858 while i < before_lines.len() && j < after_lines.len() {
1859 if before_lines[i] == after_lines[j] {
1860 diff.push_str(&format!(" {}\n", before_lines[i]));
1861 i += 1;
1862 j += 1;
1863 } else if table[i + 1][j] >= table[i][j + 1] {
1864 diff.push_str(&format!("-{}\n", before_lines[i]));
1865 i += 1;
1866 } else {
1867 diff.push_str(&format!("+{}\n", after_lines[j]));
1868 j += 1;
1869 }
1870 }
1871 while i < before_lines.len() {
1872 diff.push_str(&format!("-{}\n", before_lines[i]));
1873 i += 1;
1874 }
1875 while j < after_lines.len() {
1876 diff.push_str(&format!("+{}\n", after_lines[j]));
1877 j += 1;
1878 }
1879 diff
1880}
1881
1882pub fn save_run_record(run: &RunRecord, path: Option<&str>) -> Result<String, VmError> {
1883 let path = path
1884 .map(PathBuf::from)
1885 .unwrap_or_else(|| default_run_dir().join(format!("{}.json", run.id)));
1886 if let Some(parent) = path.parent() {
1887 std::fs::create_dir_all(parent)
1888 .map_err(|e| VmError::Runtime(format!("failed to create run directory: {e}")))?;
1889 }
1890 let json = serde_json::to_string_pretty(run)
1891 .map_err(|e| VmError::Runtime(format!("failed to encode run record: {e}")))?;
1892 std::fs::write(&path, json)
1893 .map_err(|e| VmError::Runtime(format!("failed to persist run record: {e}")))?;
1894 Ok(path.to_string_lossy().to_string())
1895}
1896
1897pub fn load_run_record(path: &Path) -> Result<RunRecord, VmError> {
1898 let content = std::fs::read_to_string(path)
1899 .map_err(|e| VmError::Runtime(format!("failed to read run record: {e}")))?;
1900 serde_json::from_str(&content)
1901 .map_err(|e| VmError::Runtime(format!("failed to parse run record: {e}")))
1902}
1903
1904pub fn replay_fixture_from_run(run: &RunRecord) -> ReplayFixture {
1905 ReplayFixture {
1906 type_name: "replay_fixture".to_string(),
1907 id: new_id("fixture"),
1908 source_run_id: run.id.clone(),
1909 workflow_id: run.workflow_id.clone(),
1910 workflow_name: run.workflow_name.clone(),
1911 created_at: now_rfc3339(),
1912 expected_status: run.status.clone(),
1913 stage_assertions: run
1914 .stages
1915 .iter()
1916 .map(|stage| ReplayStageAssertion {
1917 node_id: stage.node_id.clone(),
1918 expected_status: stage.status.clone(),
1919 expected_outcome: stage.outcome.clone(),
1920 expected_branch: stage.branch.clone(),
1921 required_artifact_kinds: stage
1922 .artifacts
1923 .iter()
1924 .map(|artifact| artifact.kind.clone())
1925 .collect(),
1926 visible_text_contains: stage
1927 .visible_text
1928 .as_ref()
1929 .filter(|text| !text.is_empty())
1930 .map(|text| text.chars().take(80).collect()),
1931 })
1932 .collect(),
1933 }
1934}
1935
1936pub fn evaluate_run_against_fixture(run: &RunRecord, fixture: &ReplayFixture) -> ReplayEvalReport {
1937 let mut failures = Vec::new();
1938 if run.status != fixture.expected_status {
1939 failures.push(format!(
1940 "run status mismatch: expected {}, got {}",
1941 fixture.expected_status, run.status
1942 ));
1943 }
1944 for assertion in &fixture.stage_assertions {
1945 let Some(stage) = run
1946 .stages
1947 .iter()
1948 .find(|stage| stage.node_id == assertion.node_id)
1949 else {
1950 failures.push(format!("missing stage {}", assertion.node_id));
1951 continue;
1952 };
1953 if stage.status != assertion.expected_status {
1954 failures.push(format!(
1955 "stage {} status mismatch: expected {}, got {}",
1956 assertion.node_id, assertion.expected_status, stage.status
1957 ));
1958 }
1959 if stage.outcome != assertion.expected_outcome {
1960 failures.push(format!(
1961 "stage {} outcome mismatch: expected {}, got {}",
1962 assertion.node_id, assertion.expected_outcome, stage.outcome
1963 ));
1964 }
1965 if stage.branch != assertion.expected_branch {
1966 failures.push(format!(
1967 "stage {} branch mismatch: expected {:?}, got {:?}",
1968 assertion.node_id, assertion.expected_branch, stage.branch
1969 ));
1970 }
1971 for required_kind in &assertion.required_artifact_kinds {
1972 if !stage
1973 .artifacts
1974 .iter()
1975 .any(|artifact| &artifact.kind == required_kind)
1976 {
1977 failures.push(format!(
1978 "stage {} missing artifact kind {}",
1979 assertion.node_id, required_kind
1980 ));
1981 }
1982 }
1983 if let Some(snippet) = &assertion.visible_text_contains {
1984 let actual = stage.visible_text.clone().unwrap_or_default();
1985 if !actual.contains(snippet) {
1986 failures.push(format!(
1987 "stage {} visible text does not contain expected snippet {:?}",
1988 assertion.node_id, snippet
1989 ));
1990 }
1991 }
1992 }
1993
1994 ReplayEvalReport {
1995 pass: failures.is_empty(),
1996 failures,
1997 stage_count: run.stages.len(),
1998 }
1999}
2000
2001pub fn evaluate_run_suite(
2002 cases: Vec<(RunRecord, ReplayFixture, Option<String>)>,
2003) -> ReplayEvalSuiteReport {
2004 let mut reports = Vec::new();
2005 for (run, fixture, source_path) in cases {
2006 let report = evaluate_run_against_fixture(&run, &fixture);
2007 reports.push(ReplayEvalCaseReport {
2008 run_id: run.id.clone(),
2009 workflow_id: run.workflow_id.clone(),
2010 label: None,
2011 pass: report.pass,
2012 failures: report.failures,
2013 stage_count: report.stage_count,
2014 source_path,
2015 comparison: None,
2016 });
2017 }
2018 let total = reports.len();
2019 let passed = reports.iter().filter(|report| report.pass).count();
2020 let failed = total.saturating_sub(passed);
2021 ReplayEvalSuiteReport {
2022 pass: failed == 0,
2023 total,
2024 passed,
2025 failed,
2026 cases: reports,
2027 }
2028}
2029
2030pub fn diff_run_records(left: &RunRecord, right: &RunRecord) -> RunDiffReport {
2031 let mut stage_diffs = Vec::new();
2032 let mut all_node_ids = BTreeSet::new();
2033 all_node_ids.extend(left.stages.iter().map(|stage| stage.node_id.clone()));
2034 all_node_ids.extend(right.stages.iter().map(|stage| stage.node_id.clone()));
2035
2036 for node_id in all_node_ids {
2037 let left_stage = left.stages.iter().find(|stage| stage.node_id == node_id);
2038 let right_stage = right.stages.iter().find(|stage| stage.node_id == node_id);
2039 match (left_stage, right_stage) {
2040 (Some(_), None) => stage_diffs.push(RunStageDiffRecord {
2041 node_id,
2042 change: "removed".to_string(),
2043 details: vec!["stage missing from right run".to_string()],
2044 }),
2045 (None, Some(_)) => stage_diffs.push(RunStageDiffRecord {
2046 node_id,
2047 change: "added".to_string(),
2048 details: vec!["stage missing from left run".to_string()],
2049 }),
2050 (Some(left_stage), Some(right_stage)) => {
2051 let mut details = Vec::new();
2052 if left_stage.status != right_stage.status {
2053 details.push(format!(
2054 "status: {} -> {}",
2055 left_stage.status, right_stage.status
2056 ));
2057 }
2058 if left_stage.outcome != right_stage.outcome {
2059 details.push(format!(
2060 "outcome: {} -> {}",
2061 left_stage.outcome, right_stage.outcome
2062 ));
2063 }
2064 if left_stage.branch != right_stage.branch {
2065 details.push(format!(
2066 "branch: {:?} -> {:?}",
2067 left_stage.branch, right_stage.branch
2068 ));
2069 }
2070 if left_stage.produced_artifact_ids.len() != right_stage.produced_artifact_ids.len()
2071 {
2072 details.push(format!(
2073 "produced_artifacts: {} -> {}",
2074 left_stage.produced_artifact_ids.len(),
2075 right_stage.produced_artifact_ids.len()
2076 ));
2077 }
2078 if left_stage.artifacts.len() != right_stage.artifacts.len() {
2079 details.push(format!(
2080 "artifact_records: {} -> {}",
2081 left_stage.artifacts.len(),
2082 right_stage.artifacts.len()
2083 ));
2084 }
2085 if !details.is_empty() {
2086 stage_diffs.push(RunStageDiffRecord {
2087 node_id,
2088 change: "changed".to_string(),
2089 details,
2090 });
2091 }
2092 }
2093 (None, None) => {}
2094 }
2095 }
2096
2097 let status_changed = left.status != right.status;
2098 let identical = !status_changed
2099 && stage_diffs.is_empty()
2100 && left.transitions.len() == right.transitions.len()
2101 && left.artifacts.len() == right.artifacts.len()
2102 && left.checkpoints.len() == right.checkpoints.len();
2103
2104 RunDiffReport {
2105 left_run_id: left.id.clone(),
2106 right_run_id: right.id.clone(),
2107 identical,
2108 status_changed,
2109 left_status: left.status.clone(),
2110 right_status: right.status.clone(),
2111 stage_diffs,
2112 transition_count_delta: right.transitions.len() as isize - left.transitions.len() as isize,
2113 artifact_count_delta: right.artifacts.len() as isize - left.artifacts.len() as isize,
2114 checkpoint_count_delta: right.checkpoints.len() as isize - left.checkpoints.len() as isize,
2115 }
2116}
2117
2118pub fn push_execution_policy(policy: CapabilityPolicy) {
2119 EXECUTION_POLICY_STACK.with(|stack| stack.borrow_mut().push(policy));
2120}
2121
2122pub fn pop_execution_policy() {
2123 EXECUTION_POLICY_STACK.with(|stack| {
2124 stack.borrow_mut().pop();
2125 });
2126}
2127
2128pub fn current_execution_policy() -> Option<CapabilityPolicy> {
2129 EXECUTION_POLICY_STACK.with(|stack| stack.borrow().last().cloned())
2130}
2131
2132fn policy_allows_tool(policy: &CapabilityPolicy, tool: &str) -> bool {
2133 policy.tools.is_empty() || policy.tools.iter().any(|allowed| allowed == tool)
2134}
2135
2136fn policy_allows_capability(policy: &CapabilityPolicy, capability: &str, op: &str) -> bool {
2137 policy.capabilities.is_empty()
2138 || policy
2139 .capabilities
2140 .get(capability)
2141 .is_some_and(|ops| ops.is_empty() || ops.iter().any(|allowed| allowed == op))
2142}
2143
2144fn policy_allows_side_effect(policy: &CapabilityPolicy, requested: &str) -> bool {
2145 fn rank(v: &str) -> usize {
2146 match v {
2147 "none" => 0,
2148 "read_only" => 1,
2149 "workspace_write" => 2,
2150 "process_exec" => 3,
2151 "network" => 4,
2152 _ => 5,
2153 }
2154 }
2155 policy
2156 .side_effect_level
2157 .as_ref()
2158 .map(|allowed| rank(allowed) >= rank(requested))
2159 .unwrap_or(true)
2160}
2161
2162fn reject_policy(reason: String) -> Result<(), VmError> {
2163 Err(VmError::CategorizedError {
2164 message: reason,
2165 category: crate::value::ErrorCategory::ToolRejected,
2166 })
2167}
2168
2169pub fn enforce_current_policy_for_builtin(name: &str, args: &[VmValue]) -> Result<(), VmError> {
2170 let Some(policy) = current_execution_policy() else {
2171 return Ok(());
2172 };
2173 match name {
2174 "read" | "read_file" => {
2175 if !policy_allows_tool(&policy, name)
2176 || !policy_allows_capability(&policy, "workspace", "read_text")
2177 {
2178 return reject_policy(format!(
2179 "builtin '{name}' exceeds workspace.read_text ceiling"
2180 ));
2181 }
2182 }
2183 "search" | "list_dir" => {
2184 if !policy_allows_tool(&policy, name)
2185 || !policy_allows_capability(&policy, "workspace", "list")
2186 {
2187 return reject_policy(format!("builtin '{name}' exceeds workspace.list ceiling"));
2188 }
2189 }
2190 "file_exists" | "stat" => {
2191 if !policy_allows_capability(&policy, "workspace", "exists") {
2192 return reject_policy(format!("builtin '{name}' exceeds workspace.exists ceiling"));
2193 }
2194 }
2195 "edit" | "write_file" | "append_file" | "mkdir" | "copy_file" => {
2196 if !policy_allows_tool(&policy, "edit")
2197 || !policy_allows_capability(&policy, "workspace", "write_text")
2198 || !policy_allows_side_effect(&policy, "workspace_write")
2199 {
2200 return reject_policy(format!("builtin '{name}' exceeds workspace write ceiling"));
2201 }
2202 }
2203 "delete_file" => {
2204 if !policy_allows_capability(&policy, "workspace", "delete")
2205 || !policy_allows_side_effect(&policy, "workspace_write")
2206 {
2207 return reject_policy(
2208 "builtin 'delete_file' exceeds workspace.delete ceiling".to_string(),
2209 );
2210 }
2211 }
2212 "apply_edit" => {
2213 if !policy_allows_capability(&policy, "workspace", "apply_edit")
2214 || !policy_allows_side_effect(&policy, "workspace_write")
2215 {
2216 return reject_policy(
2217 "builtin 'apply_edit' exceeds workspace.apply_edit ceiling".to_string(),
2218 );
2219 }
2220 }
2221 "exec" | "exec_at" | "shell" | "shell_at" | "run_command" => {
2222 if !policy_allows_tool(&policy, "run")
2223 || !policy_allows_capability(&policy, "process", "exec")
2224 || !policy_allows_side_effect(&policy, "process_exec")
2225 {
2226 return reject_policy(format!("builtin '{name}' exceeds process.exec ceiling"));
2227 }
2228 }
2229 "http_get" | "http_post" | "http_put" | "http_patch" | "http_delete" | "http_request" => {
2230 if !policy_allows_side_effect(&policy, "network") {
2231 return reject_policy(format!("builtin '{name}' exceeds network ceiling"));
2232 }
2233 }
2234 "mcp_connect"
2235 | "mcp_call"
2236 | "mcp_list_tools"
2237 | "mcp_list_resources"
2238 | "mcp_list_resource_templates"
2239 | "mcp_read_resource"
2240 | "mcp_list_prompts"
2241 | "mcp_get_prompt"
2242 | "mcp_server_info"
2243 | "mcp_disconnect" => {
2244 if !policy_allows_tool(&policy, "run")
2245 || !policy_allows_capability(&policy, "process", "exec")
2246 || !policy_allows_side_effect(&policy, "process_exec")
2247 {
2248 return reject_policy(format!("builtin '{name}' exceeds process.exec ceiling"));
2249 }
2250 }
2251 "host_invoke" => {
2252 let capability = args.first().map(|v| v.display()).unwrap_or_default();
2253 let op = args.get(1).map(|v| v.display()).unwrap_or_default();
2254 if !policy_allows_capability(&policy, &capability, &op) {
2255 return reject_policy(format!(
2256 "host_invoke {capability}.{op} exceeds capability ceiling"
2257 ));
2258 }
2259 let requested_side_effect = match (capability.as_str(), op.as_str()) {
2260 ("workspace", "write_text" | "apply_edit" | "delete") => "workspace_write",
2261 ("process", "exec") => "process_exec",
2262 _ => "read_only",
2263 };
2264 if !policy_allows_side_effect(&policy, requested_side_effect) {
2265 return reject_policy(format!(
2266 "host_invoke {capability}.{op} exceeds side-effect ceiling"
2267 ));
2268 }
2269 }
2270 _ => {}
2271 }
2272 Ok(())
2273}
2274
2275pub fn enforce_current_policy_for_bridge_builtin(name: &str) -> Result<(), VmError> {
2276 if current_execution_policy().is_some() {
2277 return reject_policy(format!(
2278 "bridged builtin '{name}' exceeds execution policy; declare an explicit capability/tool surface instead"
2279 ));
2280 }
2281 Ok(())
2282}
2283
2284pub fn enforce_current_policy_for_tool(tool_name: &str) -> Result<(), VmError> {
2285 let Some(policy) = current_execution_policy() else {
2286 return Ok(());
2287 };
2288 if !policy_allows_tool(&policy, tool_name) {
2289 return reject_policy(format!("tool '{tool_name}' exceeds tool ceiling"));
2290 }
2291 Ok(())
2292}
2293
2294fn compact_transcript(transcript: &VmValue, keep_last: usize) -> Option<VmValue> {
2295 let dict = transcript.as_dict()?;
2296 let messages = match dict.get("messages") {
2297 Some(VmValue::List(list)) => list.iter().cloned().collect::<Vec<_>>(),
2298 _ => Vec::new(),
2299 };
2300 let retained = messages
2301 .into_iter()
2302 .rev()
2303 .take(keep_last)
2304 .collect::<Vec<_>>()
2305 .into_iter()
2306 .rev()
2307 .collect::<Vec<_>>();
2308 let mut compacted = dict.clone();
2309 compacted.insert(
2310 "messages".to_string(),
2311 VmValue::List(Rc::new(retained.clone())),
2312 );
2313 compacted.insert(
2314 "events".to_string(),
2315 VmValue::List(Rc::new(
2316 crate::llm::helpers::transcript_events_from_messages(&retained),
2317 )),
2318 );
2319 Some(VmValue::Dict(Rc::new(compacted)))
2320}
2321
2322fn redact_transcript_visibility(transcript: &VmValue, visibility: Option<&str>) -> Option<VmValue> {
2323 let Some(visibility) = visibility else {
2324 return Some(transcript.clone());
2325 };
2326 if visibility != "public" && visibility != "public_only" {
2327 return Some(transcript.clone());
2328 }
2329 let dict = transcript.as_dict()?;
2330 let public_messages = match dict.get("messages") {
2331 Some(VmValue::List(list)) => list
2332 .iter()
2333 .filter(|message| {
2334 message
2335 .as_dict()
2336 .and_then(|d| d.get("role"))
2337 .map(|v| v.display())
2338 .map(|role| role != "tool_result")
2339 .unwrap_or(true)
2340 })
2341 .cloned()
2342 .collect::<Vec<_>>(),
2343 _ => Vec::new(),
2344 };
2345 let public_events = match dict.get("events") {
2346 Some(VmValue::List(list)) => list
2347 .iter()
2348 .filter(|event| {
2349 event
2350 .as_dict()
2351 .and_then(|d| d.get("visibility"))
2352 .map(|v| v.display())
2353 .map(|value| value == "public")
2354 .unwrap_or(true)
2355 })
2356 .cloned()
2357 .collect::<Vec<_>>(),
2358 _ => Vec::new(),
2359 };
2360 let mut redacted = dict.clone();
2361 redacted.insert(
2362 "messages".to_string(),
2363 VmValue::List(Rc::new(public_messages)),
2364 );
2365 redacted.insert("events".to_string(), VmValue::List(Rc::new(public_events)));
2366 Some(VmValue::Dict(Rc::new(redacted)))
2367}
2368
2369pub(crate) fn apply_input_transcript_policy(
2370 transcript: Option<VmValue>,
2371 policy: &TranscriptPolicy,
2372) -> Option<VmValue> {
2373 let mut transcript = transcript;
2374 match policy.mode.as_deref() {
2375 Some("reset") => return None,
2376 Some("fork") => {
2377 if let Some(VmValue::Dict(dict)) = transcript.as_ref() {
2378 let mut forked = dict.as_ref().clone();
2379 forked.insert(
2380 "id".to_string(),
2381 VmValue::String(Rc::from(new_id("transcript"))),
2382 );
2383 transcript = Some(VmValue::Dict(Rc::new(forked)));
2384 }
2385 }
2386 _ => {}
2387 }
2388 if policy.compact {
2389 let keep_last = policy.keep_last.unwrap_or(6);
2390 transcript = transcript.and_then(|value| compact_transcript(&value, keep_last));
2391 }
2392 transcript
2393}
2394
2395fn apply_output_transcript_policy(
2396 transcript: Option<VmValue>,
2397 policy: &TranscriptPolicy,
2398) -> Option<VmValue> {
2399 let mut transcript = transcript;
2400 if policy.compact {
2401 let keep_last = policy.keep_last.unwrap_or(6);
2402 transcript = transcript.and_then(|value| compact_transcript(&value, keep_last));
2403 }
2404 transcript.and_then(|value| redact_transcript_visibility(&value, policy.visibility.as_deref()))
2405}
2406
2407pub async fn execute_stage_node(
2408 node_id: &str,
2409 node: &WorkflowNode,
2410 task: &str,
2411 artifacts: &[ArtifactRecord],
2412 transcript: Option<VmValue>,
2413) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
2414 let mut selection_policy = node.context_policy.clone();
2415 if selection_policy.include_kinds.is_empty() && !node.input_contract.input_kinds.is_empty() {
2416 selection_policy.include_kinds = node.input_contract.input_kinds.clone();
2417 }
2418 let selected = select_artifacts_adaptive(artifacts.to_vec(), &selection_policy);
2419 let rendered_context = render_artifacts_context(&selected, &node.context_policy);
2420 let transcript = apply_input_transcript_policy(transcript, &node.transcript_policy);
2421 if node.input_contract.require_transcript && transcript.is_none() {
2422 return Err(VmError::Runtime(format!(
2423 "workflow stage {node_id} requires transcript input"
2424 )));
2425 }
2426 if let Some(min_inputs) = node.input_contract.min_inputs {
2427 if selected.len() < min_inputs {
2428 return Err(VmError::Runtime(format!(
2429 "workflow stage {node_id} requires at least {min_inputs} input artifacts"
2430 )));
2431 }
2432 }
2433 if let Some(max_inputs) = node.input_contract.max_inputs {
2434 if selected.len() > max_inputs {
2435 return Err(VmError::Runtime(format!(
2436 "workflow stage {node_id} accepts at most {max_inputs} input artifacts"
2437 )));
2438 }
2439 }
2440 let prompt = if rendered_context.is_empty() {
2441 task.to_string()
2442 } else {
2443 format!(
2444 "{rendered_context}\n\n{}:\n{task}",
2445 node.task_label
2446 .clone()
2447 .unwrap_or_else(|| "Task".to_string())
2448 )
2449 };
2450
2451 let mut options = BTreeMap::new();
2452 if let Some(provider) = &node.model_policy.provider {
2453 options.insert(
2454 "provider".to_string(),
2455 VmValue::String(Rc::from(provider.clone())),
2456 );
2457 }
2458 if let Some(model) = &node.model_policy.model {
2459 options.insert(
2460 "model".to_string(),
2461 VmValue::String(Rc::from(model.clone())),
2462 );
2463 }
2464 if let Some(model_tier) = &node.model_policy.model_tier {
2465 options.insert(
2466 "model_tier".to_string(),
2467 VmValue::String(Rc::from(model_tier.clone())),
2468 );
2469 }
2470 if let Some(temperature) = node.model_policy.temperature {
2471 options.insert("temperature".to_string(), VmValue::Float(temperature));
2472 }
2473 if let Some(max_tokens) = node.model_policy.max_tokens {
2474 options.insert("max_tokens".to_string(), VmValue::Int(max_tokens));
2475 }
2476 let tool_names = workflow_tool_names(&node.tools);
2477 if !matches!(node.tools, serde_json::Value::Null) && !tool_names.is_empty() {
2478 options.insert(
2479 "tools".to_string(),
2480 crate::stdlib::json_to_vm_value(&node.tools),
2481 );
2482 }
2483 if let Some(transcript) = transcript.clone() {
2484 options.insert("transcript".to_string(), transcript);
2485 }
2486
2487 let args = vec![
2488 VmValue::String(Rc::from(prompt)),
2489 node.system
2490 .clone()
2491 .map(|s| VmValue::String(Rc::from(s)))
2492 .unwrap_or(VmValue::Nil),
2493 VmValue::Dict(Rc::new(options)),
2494 ];
2495 let mut opts = extract_llm_options(&args)?;
2496
2497 let llm_result = if node.mode.as_deref() == Some("agent") || !tool_names.is_empty() {
2498 let tool_format = if !tool_names.is_empty() && opts.native_tools.is_some() {
2499 "native".to_string()
2500 } else {
2501 "text".to_string()
2502 };
2503 crate::llm::run_agent_loop_internal(
2504 &mut opts,
2505 crate::llm::AgentLoopConfig {
2506 persistent: true,
2507 max_iterations: 12,
2508 max_nudges: 3,
2509 nudge: None,
2510 done_sentinel: node.done_sentinel.clone(),
2511 tool_retries: 0,
2512 tool_backoff_ms: 1000,
2513 tool_format,
2514 auto_compact: None,
2515 policy: None,
2516 daemon: false,
2517 },
2518 )
2519 .await?
2520 } else {
2521 let result = vm_call_llm_full(&opts).await?;
2522 crate::llm::agent_loop_result_from_llm(&result, opts)
2523 };
2524
2525 let visible_text = llm_result["text"].as_str().unwrap_or_default().to_string();
2526 let transcript = llm_result
2527 .get("transcript")
2528 .cloned()
2529 .map(|value| crate::stdlib::json_to_vm_value(&value));
2530 let transcript = apply_output_transcript_policy(transcript, &node.transcript_policy);
2531 let output_kind = node
2532 .output_contract
2533 .output_kinds
2534 .first()
2535 .cloned()
2536 .unwrap_or_else(|| {
2537 if node.kind == "verify" {
2538 "verification_result".to_string()
2539 } else {
2540 "artifact".to_string()
2541 }
2542 });
2543 let mut metadata = BTreeMap::new();
2544 metadata.insert(
2545 "input_artifact_ids".to_string(),
2546 serde_json::json!(selected
2547 .iter()
2548 .map(|artifact| artifact.id.clone())
2549 .collect::<Vec<_>>()),
2550 );
2551 metadata.insert("node_kind".to_string(), serde_json::json!(node.kind));
2552 let artifact = ArtifactRecord {
2553 type_name: "artifact".to_string(),
2554 id: new_id("artifact"),
2555 kind: output_kind,
2556 title: Some(format!("stage {node_id} output")),
2557 text: Some(visible_text),
2558 data: Some(llm_result.clone()),
2559 source: Some(node_id.to_string()),
2560 created_at: now_rfc3339(),
2561 freshness: Some("fresh".to_string()),
2562 priority: None,
2563 lineage: selected
2564 .iter()
2565 .map(|artifact| artifact.id.clone())
2566 .collect(),
2567 relevance: Some(1.0),
2568 estimated_tokens: None,
2569 stage: Some(node_id.to_string()),
2570 metadata,
2571 }
2572 .normalize();
2573
2574 Ok((llm_result, vec![artifact], transcript))
2575}
2576
2577pub fn next_nodes_for(
2578 graph: &WorkflowGraph,
2579 current: &str,
2580 branch: Option<&str>,
2581) -> Vec<WorkflowEdge> {
2582 let mut matching: Vec<WorkflowEdge> = graph
2583 .edges
2584 .iter()
2585 .filter(|edge| edge.from == current && edge.branch.as_deref() == branch)
2586 .cloned()
2587 .collect();
2588 if matching.is_empty() {
2589 matching = graph
2590 .edges
2591 .iter()
2592 .filter(|edge| edge.from == current && edge.branch.is_none())
2593 .cloned()
2594 .collect();
2595 }
2596 matching
2597}
2598
2599pub fn next_node_for(graph: &WorkflowGraph, current: &str, branch: &str) -> Option<String> {
2600 next_nodes_for(graph, current, Some(branch))
2601 .into_iter()
2602 .next()
2603 .map(|edge| edge.to)
2604}
2605
2606pub fn append_audit_entry(
2607 graph: &mut WorkflowGraph,
2608 op: &str,
2609 node_id: Option<String>,
2610 reason: Option<String>,
2611 metadata: BTreeMap<String, serde_json::Value>,
2612) {
2613 graph.audit_log.push(WorkflowAuditEntry {
2614 id: new_id("audit"),
2615 op: op.to_string(),
2616 node_id,
2617 timestamp: now_rfc3339(),
2618 reason,
2619 metadata,
2620 });
2621}
2622
2623pub fn builtin_ceiling() -> CapabilityPolicy {
2624 CapabilityPolicy {
2625 tools: vec![
2626 "read".to_string(),
2627 "read_file".to_string(),
2628 "search".to_string(),
2629 "edit".to_string(),
2630 "run".to_string(),
2631 "exec".to_string(),
2632 "outline".to_string(),
2633 "list_directory".to_string(),
2634 "lsp_hover".to_string(),
2635 "lsp_definition".to_string(),
2636 "lsp_references".to_string(),
2637 "web_search".to_string(),
2638 "web_fetch".to_string(),
2639 ],
2640 capabilities: BTreeMap::from([
2641 (
2642 "workspace".to_string(),
2643 vec![
2644 "read_text".to_string(),
2645 "write_text".to_string(),
2646 "apply_edit".to_string(),
2647 "delete".to_string(),
2648 "exists".to_string(),
2649 "list".to_string(),
2650 ],
2651 ),
2652 ("process".to_string(), vec!["exec".to_string()]),
2653 ]),
2654 workspace_roots: Vec::new(),
2655 side_effect_level: Some("network".to_string()),
2656 recursion_limit: Some(8),
2657 tool_arg_constraints: Vec::new(),
2658 }
2659}
2660
2661#[cfg(test)]
2662mod tests {
2663 use super::*;
2664
2665 #[test]
2666 fn capability_intersection_rejects_privilege_expansion() {
2667 let ceiling = CapabilityPolicy {
2668 tools: vec!["read".to_string()],
2669 side_effect_level: Some("read_only".to_string()),
2670 recursion_limit: Some(2),
2671 ..Default::default()
2672 };
2673 let requested = CapabilityPolicy {
2674 tools: vec!["read".to_string(), "edit".to_string()],
2675 ..Default::default()
2676 };
2677 let error = ceiling.intersect(&requested).unwrap_err();
2678 assert!(error.contains("host ceiling"));
2679 }
2680
2681 #[test]
2682 fn mutation_session_normalize_fills_defaults() {
2683 let normalized = MutationSessionRecord::default().normalize();
2684 assert!(normalized.session_id.starts_with("session_"));
2685 assert_eq!(normalized.mutation_scope, "read_only");
2686 assert_eq!(normalized.approval_mode, "host_enforced");
2687 }
2688
2689 #[test]
2690 fn install_current_mutation_session_round_trips() {
2691 install_current_mutation_session(Some(MutationSessionRecord {
2692 session_id: "session_test".to_string(),
2693 mutation_scope: "apply_workspace".to_string(),
2694 approval_mode: "explicit".to_string(),
2695 ..Default::default()
2696 }));
2697 let current = current_mutation_session().expect("session installed");
2698 assert_eq!(current.session_id, "session_test");
2699 assert_eq!(current.mutation_scope, "apply_workspace");
2700 assert_eq!(current.approval_mode, "explicit");
2701
2702 install_current_mutation_session(None);
2703 assert!(current_mutation_session().is_none());
2704 }
2705
2706 #[test]
2707 fn active_execution_policy_rejects_unknown_bridge_builtin() {
2708 push_execution_policy(CapabilityPolicy {
2709 tools: vec!["read".to_string()],
2710 capabilities: BTreeMap::from([(
2711 "workspace".to_string(),
2712 vec!["read_text".to_string()],
2713 )]),
2714 side_effect_level: Some("read_only".to_string()),
2715 recursion_limit: Some(1),
2716 ..Default::default()
2717 });
2718 let error = enforce_current_policy_for_bridge_builtin("custom_host_builtin").unwrap_err();
2719 pop_execution_policy();
2720 assert!(matches!(
2721 error,
2722 VmError::CategorizedError {
2723 category: crate::value::ErrorCategory::ToolRejected,
2724 ..
2725 }
2726 ));
2727 }
2728
2729 #[test]
2730 fn active_execution_policy_rejects_mcp_escape_hatch() {
2731 push_execution_policy(CapabilityPolicy {
2732 tools: vec!["read".to_string()],
2733 capabilities: BTreeMap::from([(
2734 "workspace".to_string(),
2735 vec!["read_text".to_string()],
2736 )]),
2737 side_effect_level: Some("read_only".to_string()),
2738 recursion_limit: Some(1),
2739 ..Default::default()
2740 });
2741 let error = enforce_current_policy_for_builtin("mcp_connect", &[]).unwrap_err();
2742 pop_execution_policy();
2743 assert!(matches!(
2744 error,
2745 VmError::CategorizedError {
2746 category: crate::value::ErrorCategory::ToolRejected,
2747 ..
2748 }
2749 ));
2750 }
2751
2752 #[test]
2753 fn workflow_normalization_upgrades_legacy_act_verify_repair_shape() {
2754 let value = crate::stdlib::json_to_vm_value(&serde_json::json!({
2755 "name": "legacy",
2756 "act": {"mode": "llm"},
2757 "verify": {"kind": "verify"},
2758 "repair": {"mode": "agent"},
2759 }));
2760 let graph = normalize_workflow_value(&value).unwrap();
2761 assert_eq!(graph.type_name, "workflow_graph");
2762 assert!(graph.nodes.contains_key("act"));
2763 assert!(graph.nodes.contains_key("verify"));
2764 assert!(graph.nodes.contains_key("repair"));
2765 assert_eq!(graph.entry, "act");
2766 }
2767
2768 #[test]
2769 fn workflow_normalization_accepts_tool_registry_nodes() {
2770 let value = crate::stdlib::json_to_vm_value(&serde_json::json!({
2771 "name": "registry_tools",
2772 "entry": "implement",
2773 "nodes": {
2774 "implement": {
2775 "kind": "stage",
2776 "mode": "agent",
2777 "tools": {
2778 "_type": "tool_registry",
2779 "tools": [
2780 {"name": "read", "description": "Read files"},
2781 {"name": "run", "description": "Run commands"}
2782 ]
2783 }
2784 }
2785 },
2786 "edges": []
2787 }));
2788 let graph = normalize_workflow_value(&value).unwrap();
2789 let node = graph.nodes.get("implement").unwrap();
2790 assert_eq!(workflow_tool_names(&node.tools), vec!["read", "run"]);
2791 }
2792
2793 #[test]
2794 fn artifact_selection_honors_budget_and_priority() {
2795 let policy = ContextPolicy {
2796 max_artifacts: Some(2),
2797 max_tokens: Some(30),
2798 prefer_recent: true,
2799 prefer_fresh: true,
2800 prioritize_kinds: vec!["verification_result".to_string()],
2801 ..Default::default()
2802 };
2803 let artifacts = vec![
2804 ArtifactRecord {
2805 type_name: "artifact".to_string(),
2806 id: "a".to_string(),
2807 kind: "summary".to_string(),
2808 text: Some("short".to_string()),
2809 relevance: Some(0.9),
2810 created_at: now_rfc3339(),
2811 ..Default::default()
2812 }
2813 .normalize(),
2814 ArtifactRecord {
2815 type_name: "artifact".to_string(),
2816 id: "b".to_string(),
2817 kind: "summary".to_string(),
2818 text: Some("this is a much larger artifact body".to_string()),
2819 relevance: Some(1.0),
2820 created_at: now_rfc3339(),
2821 ..Default::default()
2822 }
2823 .normalize(),
2824 ArtifactRecord {
2825 type_name: "artifact".to_string(),
2826 id: "c".to_string(),
2827 kind: "summary".to_string(),
2828 text: Some("tiny".to_string()),
2829 relevance: Some(0.5),
2830 created_at: now_rfc3339(),
2831 ..Default::default()
2832 }
2833 .normalize(),
2834 ];
2835 let selected = select_artifacts(artifacts, &policy);
2836 assert_eq!(selected.len(), 2);
2837 assert!(selected.iter().all(|artifact| artifact.kind == "summary"));
2838 }
2839
2840 #[test]
2841 fn workflow_validation_rejects_condition_without_true_false_edges() {
2842 let graph = WorkflowGraph {
2843 entry: "gate".to_string(),
2844 nodes: BTreeMap::from([(
2845 "gate".to_string(),
2846 WorkflowNode {
2847 id: Some("gate".to_string()),
2848 kind: "condition".to_string(),
2849 ..Default::default()
2850 },
2851 )]),
2852 edges: vec![WorkflowEdge {
2853 from: "gate".to_string(),
2854 to: "next".to_string(),
2855 branch: Some("true".to_string()),
2856 label: None,
2857 }],
2858 ..Default::default()
2859 };
2860 let report = validate_workflow(&graph, None);
2861 assert!(!report.valid);
2862 assert!(report
2863 .errors
2864 .iter()
2865 .any(|error| error.contains("true") && error.contains("false")));
2866 }
2867
2868 #[test]
2869 fn replay_fixture_round_trip_passes() {
2870 let run = RunRecord {
2871 type_name: "run_record".to_string(),
2872 id: "run_1".to_string(),
2873 workflow_id: "wf".to_string(),
2874 workflow_name: Some("demo".to_string()),
2875 task: "demo".to_string(),
2876 status: "completed".to_string(),
2877 started_at: "1".to_string(),
2878 finished_at: Some("2".to_string()),
2879 parent_run_id: None,
2880 root_run_id: Some("run_1".to_string()),
2881 stages: vec![RunStageRecord {
2882 id: "stage_1".to_string(),
2883 node_id: "act".to_string(),
2884 kind: "stage".to_string(),
2885 status: "completed".to_string(),
2886 outcome: "success".to_string(),
2887 branch: Some("success".to_string()),
2888 started_at: "1".to_string(),
2889 finished_at: Some("2".to_string()),
2890 visible_text: Some("done".to_string()),
2891 private_reasoning: None,
2892 transcript: None,
2893 verification: None,
2894 usage: None,
2895 artifacts: vec![ArtifactRecord {
2896 type_name: "artifact".to_string(),
2897 id: "a1".to_string(),
2898 kind: "summary".to_string(),
2899 text: Some("done".to_string()),
2900 created_at: "1".to_string(),
2901 ..Default::default()
2902 }
2903 .normalize()],
2904 consumed_artifact_ids: vec![],
2905 produced_artifact_ids: vec!["a1".to_string()],
2906 attempts: vec![],
2907 metadata: BTreeMap::new(),
2908 }],
2909 transitions: vec![],
2910 checkpoints: vec![],
2911 pending_nodes: vec![],
2912 completed_nodes: vec!["act".to_string()],
2913 child_runs: vec![],
2914 artifacts: vec![],
2915 policy: CapabilityPolicy::default(),
2916 execution: None,
2917 transcript: None,
2918 usage: None,
2919 replay_fixture: None,
2920 metadata: BTreeMap::new(),
2921 persisted_path: None,
2922 };
2923 let fixture = replay_fixture_from_run(&run);
2924 let report = evaluate_run_against_fixture(&run, &fixture);
2925 assert!(report.pass);
2926 assert!(report.failures.is_empty());
2927 }
2928
2929 #[test]
2930 fn replay_eval_suite_reports_failed_case() {
2931 let good = RunRecord {
2932 id: "run_good".to_string(),
2933 workflow_id: "wf".to_string(),
2934 status: "completed".to_string(),
2935 stages: vec![RunStageRecord {
2936 node_id: "act".to_string(),
2937 status: "completed".to_string(),
2938 outcome: "success".to_string(),
2939 ..Default::default()
2940 }],
2941 ..Default::default()
2942 };
2943 let bad = RunRecord {
2944 id: "run_bad".to_string(),
2945 workflow_id: "wf".to_string(),
2946 status: "failed".to_string(),
2947 stages: vec![RunStageRecord {
2948 node_id: "act".to_string(),
2949 status: "failed".to_string(),
2950 outcome: "error".to_string(),
2951 ..Default::default()
2952 }],
2953 ..Default::default()
2954 };
2955 let suite = evaluate_run_suite(vec![
2956 (
2957 good.clone(),
2958 replay_fixture_from_run(&good),
2959 Some("good.json".to_string()),
2960 ),
2961 (
2962 bad.clone(),
2963 replay_fixture_from_run(&good),
2964 Some("bad.json".to_string()),
2965 ),
2966 ]);
2967 assert!(!suite.pass);
2968 assert_eq!(suite.total, 2);
2969 assert_eq!(suite.failed, 1);
2970 assert!(suite.cases.iter().any(|case| !case.pass));
2971 }
2972
2973 #[test]
2974 fn run_diff_reports_changed_stage() {
2975 let left = RunRecord {
2976 id: "left".to_string(),
2977 workflow_id: "wf".to_string(),
2978 status: "completed".to_string(),
2979 stages: vec![RunStageRecord {
2980 node_id: "act".to_string(),
2981 status: "completed".to_string(),
2982 outcome: "success".to_string(),
2983 ..Default::default()
2984 }],
2985 ..Default::default()
2986 };
2987 let right = RunRecord {
2988 id: "right".to_string(),
2989 workflow_id: "wf".to_string(),
2990 status: "failed".to_string(),
2991 stages: vec![RunStageRecord {
2992 node_id: "act".to_string(),
2993 status: "failed".to_string(),
2994 outcome: "error".to_string(),
2995 ..Default::default()
2996 }],
2997 ..Default::default()
2998 };
2999 let diff = diff_run_records(&left, &right);
3000 assert!(diff.status_changed);
3001 assert!(!diff.identical);
3002 assert_eq!(diff.stage_diffs.len(), 1);
3003 }
3004
3005 #[test]
3006 fn eval_suite_manifest_can_fail_on_baseline_diff() {
3007 let temp_dir =
3008 std::env::temp_dir().join(format!("harn-eval-suite-{}", uuid::Uuid::now_v7()));
3009 std::fs::create_dir_all(&temp_dir).unwrap();
3010 let baseline_path = temp_dir.join("baseline.json");
3011 let candidate_path = temp_dir.join("candidate.json");
3012
3013 let baseline = RunRecord {
3014 id: "baseline".to_string(),
3015 workflow_id: "wf".to_string(),
3016 status: "completed".to_string(),
3017 stages: vec![RunStageRecord {
3018 node_id: "act".to_string(),
3019 status: "completed".to_string(),
3020 outcome: "success".to_string(),
3021 ..Default::default()
3022 }],
3023 ..Default::default()
3024 };
3025 let candidate = RunRecord {
3026 id: "candidate".to_string(),
3027 workflow_id: "wf".to_string(),
3028 status: "failed".to_string(),
3029 stages: vec![RunStageRecord {
3030 node_id: "act".to_string(),
3031 status: "failed".to_string(),
3032 outcome: "error".to_string(),
3033 ..Default::default()
3034 }],
3035 ..Default::default()
3036 };
3037
3038 save_run_record(&baseline, Some(baseline_path.to_str().unwrap())).unwrap();
3039 save_run_record(&candidate, Some(candidate_path.to_str().unwrap())).unwrap();
3040
3041 let manifest = EvalSuiteManifest {
3042 base_dir: Some(temp_dir.display().to_string()),
3043 cases: vec![EvalSuiteCase {
3044 label: Some("candidate".to_string()),
3045 run_path: "candidate.json".to_string(),
3046 fixture_path: None,
3047 compare_to: Some("baseline.json".to_string()),
3048 }],
3049 ..Default::default()
3050 };
3051 let suite = evaluate_run_suite_manifest(&manifest).unwrap();
3052 assert!(!suite.pass);
3053 assert_eq!(suite.failed, 1);
3054 assert!(suite.cases[0].comparison.is_some());
3055 assert!(suite.cases[0]
3056 .failures
3057 .iter()
3058 .any(|failure| failure.contains("baseline")));
3059 }
3060
3061 #[test]
3062 fn render_unified_diff_marks_removed_and_added_lines() {
3063 let diff = render_unified_diff(Some("src/main.rs"), "old\nsame", "new\nsame");
3064 assert!(diff.contains("--- a/src/main.rs"));
3065 assert!(diff.contains("+++ b/src/main.rs"));
3066 assert!(diff.contains("-old"));
3067 assert!(diff.contains("+new"));
3068 assert!(diff.contains(" same"));
3069 }
3070
3071 #[test]
3072 fn execution_policy_rejects_process_exec_when_read_only() {
3073 push_execution_policy(CapabilityPolicy {
3074 side_effect_level: Some("read_only".to_string()),
3075 capabilities: BTreeMap::from([("process".to_string(), vec!["exec".to_string()])]),
3076 ..Default::default()
3077 });
3078 let result = enforce_current_policy_for_builtin("exec", &[]);
3079 pop_execution_policy();
3080 assert!(result.is_err());
3081 }
3082
3083 #[test]
3084 fn execution_policy_rejects_unlisted_tool() {
3085 push_execution_policy(CapabilityPolicy {
3086 tools: vec!["read".to_string()],
3087 ..Default::default()
3088 });
3089 let result = enforce_current_policy_for_tool("edit");
3090 pop_execution_policy();
3091 assert!(result.is_err());
3092 }
3093
3094 #[test]
3097 fn pre_tool_hook_deny_blocks_execution() {
3098 clear_tool_hooks();
3099 register_tool_hook(ToolHook {
3100 pattern: "dangerous_*".to_string(),
3101 pre: Some(Rc::new(|_name, _args| {
3102 PreToolAction::Deny("blocked by policy".to_string())
3103 })),
3104 post: None,
3105 });
3106 let result = run_pre_tool_hooks("dangerous_delete", &serde_json::json!({}));
3107 clear_tool_hooks();
3108 assert!(matches!(result, PreToolAction::Deny(_)));
3109 }
3110
3111 #[test]
3112 fn pre_tool_hook_allow_passes_through() {
3113 clear_tool_hooks();
3114 register_tool_hook(ToolHook {
3115 pattern: "safe_*".to_string(),
3116 pre: Some(Rc::new(|_name, _args| PreToolAction::Allow)),
3117 post: None,
3118 });
3119 let result = run_pre_tool_hooks("safe_read", &serde_json::json!({}));
3120 clear_tool_hooks();
3121 assert!(matches!(result, PreToolAction::Allow));
3122 }
3123
3124 #[test]
3125 fn pre_tool_hook_modify_rewrites_args() {
3126 clear_tool_hooks();
3127 register_tool_hook(ToolHook {
3128 pattern: "*".to_string(),
3129 pre: Some(Rc::new(|_name, _args| {
3130 PreToolAction::Modify(serde_json::json!({"path": "/sanitized"}))
3131 })),
3132 post: None,
3133 });
3134 let result = run_pre_tool_hooks("read_file", &serde_json::json!({"path": "/etc/passwd"}));
3135 clear_tool_hooks();
3136 match result {
3137 PreToolAction::Modify(args) => assert_eq!(args["path"], "/sanitized"),
3138 _ => panic!("expected Modify"),
3139 }
3140 }
3141
3142 #[test]
3143 fn post_tool_hook_modifies_result() {
3144 clear_tool_hooks();
3145 register_tool_hook(ToolHook {
3146 pattern: "exec".to_string(),
3147 pre: None,
3148 post: Some(Rc::new(|_name, result| {
3149 if result.contains("SECRET") {
3150 PostToolAction::Modify("[REDACTED]".to_string())
3151 } else {
3152 PostToolAction::Pass
3153 }
3154 })),
3155 });
3156 let result = run_post_tool_hooks("exec", "output with SECRET data");
3157 let clean = run_post_tool_hooks("exec", "clean output");
3158 clear_tool_hooks();
3159 assert_eq!(result, "[REDACTED]");
3160 assert_eq!(clean, "clean output");
3161 }
3162
3163 #[test]
3164 fn unmatched_hook_pattern_does_not_fire() {
3165 clear_tool_hooks();
3166 register_tool_hook(ToolHook {
3167 pattern: "exec".to_string(),
3168 pre: Some(Rc::new(|_name, _args| {
3169 PreToolAction::Deny("should not match".to_string())
3170 })),
3171 post: None,
3172 });
3173 let result = run_pre_tool_hooks("read_file", &serde_json::json!({}));
3174 clear_tool_hooks();
3175 assert!(matches!(result, PreToolAction::Allow));
3176 }
3177
3178 #[test]
3179 fn glob_match_patterns() {
3180 assert!(glob_match("*", "anything"));
3181 assert!(glob_match("exec*", "exec_at"));
3182 assert!(glob_match("*_file", "read_file"));
3183 assert!(!glob_match("exec*", "read_file"));
3184 assert!(glob_match("read_file", "read_file"));
3185 assert!(!glob_match("read_file", "write_file"));
3186 }
3187
3188 #[test]
3191 fn microcompact_snips_large_output() {
3192 let large = "x".repeat(50_000);
3193 let result = microcompact_tool_output(&large, 10_000);
3194 assert!(result.len() < 15_000);
3195 assert!(result.contains("snipped"));
3196 }
3197
3198 #[test]
3199 fn microcompact_preserves_small_output() {
3200 let small = "hello world";
3201 let result = microcompact_tool_output(small, 10_000);
3202 assert_eq!(result, small);
3203 }
3204
3205 #[test]
3206 fn auto_compact_messages_reduces_count() {
3207 let mut messages: Vec<serde_json::Value> = (0..20)
3208 .map(|i| serde_json::json!({"role": "user", "content": format!("message {i}")}))
3209 .collect();
3210 let runtime = tokio::runtime::Builder::new_current_thread()
3211 .enable_all()
3212 .build()
3213 .unwrap();
3214 let compacted = runtime.block_on(auto_compact_messages(
3215 &mut messages,
3216 &AutoCompactConfig {
3217 compact_strategy: CompactStrategy::Truncate,
3218 keep_last: 6,
3219 ..Default::default()
3220 },
3221 None,
3222 ));
3223 assert!(compacted.unwrap());
3224 assert!(messages.len() <= 7); assert!(messages[0]["content"]
3226 .as_str()
3227 .unwrap()
3228 .contains("auto-compacted"));
3229 }
3230
3231 #[test]
3232 fn auto_compact_noop_when_under_threshold() {
3233 let mut messages: Vec<serde_json::Value> = (0..4)
3234 .map(|i| serde_json::json!({"role": "user", "content": format!("msg {i}")}))
3235 .collect();
3236 let runtime = tokio::runtime::Builder::new_current_thread()
3237 .enable_all()
3238 .build()
3239 .unwrap();
3240 let compacted = runtime.block_on(auto_compact_messages(
3241 &mut messages,
3242 &AutoCompactConfig {
3243 compact_strategy: CompactStrategy::Truncate,
3244 keep_last: 6,
3245 ..Default::default()
3246 },
3247 None,
3248 ));
3249 assert!(!compacted.unwrap());
3250 assert_eq!(messages.len(), 4);
3251 }
3252
3253 #[test]
3254 fn estimate_message_tokens_basic() {
3255 let messages = vec![
3256 serde_json::json!({"role": "user", "content": "a".repeat(400)}),
3257 serde_json::json!({"role": "assistant", "content": "b".repeat(400)}),
3258 ];
3259 let tokens = estimate_message_tokens(&messages);
3260 assert_eq!(tokens, 200); }
3262
3263 #[test]
3266 fn dedup_artifacts_removes_duplicates() {
3267 let mut artifacts = vec![
3268 ArtifactRecord {
3269 id: "a1".to_string(),
3270 kind: "test".to_string(),
3271 text: Some("duplicate content".to_string()),
3272 ..Default::default()
3273 },
3274 ArtifactRecord {
3275 id: "a2".to_string(),
3276 kind: "test".to_string(),
3277 text: Some("duplicate content".to_string()),
3278 ..Default::default()
3279 },
3280 ArtifactRecord {
3281 id: "a3".to_string(),
3282 kind: "test".to_string(),
3283 text: Some("unique content".to_string()),
3284 ..Default::default()
3285 },
3286 ];
3287 dedup_artifacts(&mut artifacts);
3288 assert_eq!(artifacts.len(), 2);
3289 }
3290
3291 #[test]
3292 fn microcompact_artifact_snips_oversized() {
3293 let mut artifact = ArtifactRecord {
3294 id: "a1".to_string(),
3295 kind: "test".to_string(),
3296 text: Some("x".repeat(10_000)),
3297 estimated_tokens: Some(2_500),
3298 ..Default::default()
3299 };
3300 microcompact_artifact(&mut artifact, 500);
3301 assert!(artifact.text.as_ref().unwrap().len() < 5_000);
3302 assert_eq!(artifact.estimated_tokens, Some(500));
3303 }
3304
3305 #[test]
3308 fn arg_constraint_allows_matching_pattern() {
3309 let policy = CapabilityPolicy {
3310 tool_arg_constraints: vec![ToolArgConstraint {
3311 tool: "exec".to_string(),
3312 arg_patterns: vec!["cargo *".to_string()],
3313 }],
3314 ..Default::default()
3315 };
3316 let result = enforce_tool_arg_constraints(
3317 &policy,
3318 "exec",
3319 &serde_json::json!({"command": "cargo test"}),
3320 );
3321 assert!(result.is_ok());
3322 }
3323
3324 #[test]
3325 fn arg_constraint_rejects_non_matching_pattern() {
3326 let policy = CapabilityPolicy {
3327 tool_arg_constraints: vec![ToolArgConstraint {
3328 tool: "exec".to_string(),
3329 arg_patterns: vec!["cargo *".to_string()],
3330 }],
3331 ..Default::default()
3332 };
3333 let result = enforce_tool_arg_constraints(
3334 &policy,
3335 "exec",
3336 &serde_json::json!({"command": "rm -rf /"}),
3337 );
3338 assert!(result.is_err());
3339 }
3340
3341 #[test]
3342 fn arg_constraint_ignores_unmatched_tool() {
3343 let policy = CapabilityPolicy {
3344 tool_arg_constraints: vec![ToolArgConstraint {
3345 tool: "exec".to_string(),
3346 arg_patterns: vec!["cargo *".to_string()],
3347 }],
3348 ..Default::default()
3349 };
3350 let result = enforce_tool_arg_constraints(
3351 &policy,
3352 "read_file",
3353 &serde_json::json!({"path": "/etc/passwd"}),
3354 );
3355 assert!(result.is_ok());
3356 }
3357}