Skip to main content

harn_vm/
orchestration.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::path::{Path, PathBuf};
3use std::rc::Rc;
4use std::{cell::RefCell, thread_local};
5
6use serde::{Deserialize, Serialize};
7
8use crate::llm::{extract_llm_options, vm_call_llm_full, vm_value_to_json};
9use crate::value::{VmError, VmValue};
10
11fn now_rfc3339() -> String {
12    use std::time::{SystemTime, UNIX_EPOCH};
13    let ts = SystemTime::now()
14        .duration_since(UNIX_EPOCH)
15        .unwrap_or_default()
16        .as_secs();
17    format!("{ts}")
18}
19
20fn new_id(prefix: &str) -> String {
21    format!("{prefix}_{}", uuid::Uuid::now_v7())
22}
23
24fn default_run_dir() -> PathBuf {
25    std::env::var("HARN_RUN_DIR")
26        .map(PathBuf::from)
27        .unwrap_or_else(|_| PathBuf::from(".harn-runs"))
28}
29
30thread_local! {
31    static EXECUTION_POLICY_STACK: RefCell<Vec<CapabilityPolicy>> = const { RefCell::new(Vec::new()) };
32    static TOOL_HOOKS: RefCell<Vec<ToolHook>> = const { RefCell::new(Vec::new()) };
33    static CURRENT_MUTATION_SESSION: RefCell<Option<MutationSessionRecord>> = const { RefCell::new(None) };
34}
35
36#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
37#[serde(default)]
38pub struct MutationSessionRecord {
39    pub session_id: String,
40    pub parent_session_id: Option<String>,
41    pub run_id: Option<String>,
42    pub worker_id: Option<String>,
43    pub execution_kind: Option<String>,
44    pub mutation_scope: String,
45    pub approval_mode: String,
46}
47
48impl MutationSessionRecord {
49    pub fn normalize(mut self) -> Self {
50        if self.session_id.is_empty() {
51            self.session_id = new_id("session");
52        }
53        if self.mutation_scope.is_empty() {
54            self.mutation_scope = "read_only".to_string();
55        }
56        if self.approval_mode.is_empty() {
57            self.approval_mode = "host_enforced".to_string();
58        }
59        self
60    }
61}
62
63pub fn install_current_mutation_session(session: Option<MutationSessionRecord>) {
64    CURRENT_MUTATION_SESSION.with(|slot| {
65        *slot.borrow_mut() = session.map(MutationSessionRecord::normalize);
66    });
67}
68
69pub fn current_mutation_session() -> Option<MutationSessionRecord> {
70    CURRENT_MUTATION_SESSION.with(|slot| slot.borrow().clone())
71}
72
73// ── Tool lifecycle hooks ──────────────────────────────────────────────
74
75/// Action returned by a PreToolUse hook.
76#[derive(Clone, Debug)]
77pub enum PreToolAction {
78    /// Allow the tool call to proceed unchanged.
79    Allow,
80    /// Deny the tool call with an explanation.
81    Deny(String),
82    /// Allow but replace the arguments.
83    Modify(serde_json::Value),
84}
85
86/// Action returned by a PostToolUse hook.
87#[derive(Clone, Debug)]
88pub enum PostToolAction {
89    /// Pass the result through unchanged.
90    Pass,
91    /// Replace the result text.
92    Modify(String),
93}
94
95/// Callback types for tool lifecycle hooks.
96pub type PreToolHookFn = Rc<dyn Fn(&str, &serde_json::Value) -> PreToolAction>;
97pub type PostToolHookFn = Rc<dyn Fn(&str, &str) -> PostToolAction>;
98
99/// A registered tool hook with a name pattern and callbacks.
100#[derive(Clone)]
101pub struct ToolHook {
102    /// Glob-style pattern matched against tool names (e.g. `"*"`, `"exec*"`, `"read_file"`).
103    pub pattern: String,
104    /// Called before tool execution. Return `Deny` to reject, `Modify` to rewrite args.
105    pub pre: Option<PreToolHookFn>,
106    /// Called after tool execution with the result text. Return `Modify` to rewrite.
107    pub post: Option<PostToolHookFn>,
108}
109
110impl std::fmt::Debug for ToolHook {
111    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112        f.debug_struct("ToolHook")
113            .field("pattern", &self.pattern)
114            .field("has_pre", &self.pre.is_some())
115            .field("has_post", &self.post.is_some())
116            .finish()
117    }
118}
119
120fn glob_match(pattern: &str, name: &str) -> bool {
121    if pattern == "*" {
122        return true;
123    }
124    if let Some(prefix) = pattern.strip_suffix('*') {
125        return name.starts_with(prefix);
126    }
127    if let Some(suffix) = pattern.strip_prefix('*') {
128        return name.ends_with(suffix);
129    }
130    pattern == name
131}
132
133pub fn register_tool_hook(hook: ToolHook) {
134    TOOL_HOOKS.with(|hooks| hooks.borrow_mut().push(hook));
135}
136
137pub fn clear_tool_hooks() {
138    TOOL_HOOKS.with(|hooks| hooks.borrow_mut().clear());
139}
140
141/// Run all matching PreToolUse hooks. Returns the final action.
142pub fn run_pre_tool_hooks(tool_name: &str, args: &serde_json::Value) -> PreToolAction {
143    TOOL_HOOKS.with(|hooks| {
144        let hooks = hooks.borrow();
145        let mut current_args = args.clone();
146        for hook in hooks.iter() {
147            if !glob_match(&hook.pattern, tool_name) {
148                continue;
149            }
150            if let Some(ref pre) = hook.pre {
151                match pre(tool_name, &current_args) {
152                    PreToolAction::Allow => {}
153                    PreToolAction::Deny(reason) => return PreToolAction::Deny(reason),
154                    PreToolAction::Modify(new_args) => {
155                        current_args = new_args;
156                    }
157                }
158            }
159        }
160        if current_args != *args {
161            PreToolAction::Modify(current_args)
162        } else {
163            PreToolAction::Allow
164        }
165    })
166}
167
168/// Run all matching PostToolUse hooks. Returns the (possibly modified) result.
169pub fn run_post_tool_hooks(tool_name: &str, result: &str) -> String {
170    TOOL_HOOKS.with(|hooks| {
171        let hooks = hooks.borrow();
172        let mut current = result.to_string();
173        for hook in hooks.iter() {
174            if !glob_match(&hook.pattern, tool_name) {
175                continue;
176            }
177            if let Some(ref post) = hook.post {
178                match post(tool_name, &current) {
179                    PostToolAction::Pass => {}
180                    PostToolAction::Modify(new_result) => {
181                        current = new_result;
182                    }
183                }
184            }
185        }
186        current
187    })
188}
189
190// ── Auto-compaction ───────────────────────────────────────────────────
191
192#[derive(Clone, Debug, PartialEq, Eq)]
193pub enum CompactStrategy {
194    Llm,
195    Truncate,
196    Custom,
197}
198
199pub fn parse_compact_strategy(value: &str) -> Result<CompactStrategy, VmError> {
200    match value {
201        "llm" => Ok(CompactStrategy::Llm),
202        "truncate" => Ok(CompactStrategy::Truncate),
203        "custom" => Ok(CompactStrategy::Custom),
204        other => Err(VmError::Runtime(format!(
205            "unknown compact_strategy '{other}' (expected 'llm', 'truncate', or 'custom')"
206        ))),
207    }
208}
209
210/// Configuration for automatic transcript compaction in agent loops.
211#[derive(Clone, Debug)]
212pub struct AutoCompactConfig {
213    /// Maximum estimated tokens before triggering auto-compaction.
214    pub token_threshold: usize,
215    /// Maximum character length for a single tool result before microcompaction.
216    pub tool_output_max_chars: usize,
217    /// Number of recent messages to keep during auto-compaction.
218    pub keep_last: usize,
219    /// Strategy used to summarize archived messages.
220    pub compact_strategy: CompactStrategy,
221    /// Optional Harn callback used when `compact_strategy` is `custom`.
222    pub custom_compactor: Option<VmValue>,
223}
224
225impl Default for AutoCompactConfig {
226    fn default() -> Self {
227        Self {
228            token_threshold: 80_000,
229            tool_output_max_chars: 20_000,
230            keep_last: 8,
231            compact_strategy: CompactStrategy::Llm,
232            custom_compactor: None,
233        }
234    }
235}
236
237/// Estimate token count from a list of JSON messages (chars / 4 heuristic).
238pub fn estimate_message_tokens(messages: &[serde_json::Value]) -> usize {
239    messages
240        .iter()
241        .map(|m| {
242            m.get("content")
243                .and_then(|c| c.as_str())
244                .map(|s| s.len())
245                .unwrap_or(0)
246        })
247        .sum::<usize>()
248        / 4
249}
250
251/// Microcompact a tool result: if it exceeds `max_chars`, keep the first and
252/// last portions with a snip marker in between.
253pub fn microcompact_tool_output(output: &str, max_chars: usize) -> String {
254    if output.len() <= max_chars || max_chars < 200 {
255        return output.to_string();
256    }
257    let keep = max_chars / 2;
258    let head = &output[..keep];
259    let tail = &output[output.len() - keep..];
260    let snipped = output.len() - max_chars;
261    format!("{head}\n\n[... {snipped} characters snipped ...]\n\n{tail}")
262}
263
264fn format_compaction_messages(messages: &[serde_json::Value]) -> String {
265    messages
266        .iter()
267        .map(|msg| {
268            let role = msg
269                .get("role")
270                .and_then(|v| v.as_str())
271                .unwrap_or("user")
272                .to_uppercase();
273            let content = msg
274                .get("content")
275                .and_then(|v| v.as_str())
276                .unwrap_or_default();
277            format!("{role}: {content}")
278        })
279        .collect::<Vec<_>>()
280        .join("\n")
281}
282
283fn truncate_compaction_summary(
284    old_messages: &[serde_json::Value],
285    archived_count: usize,
286) -> String {
287    let summary_parts: Vec<String> = old_messages
288        .iter()
289        .filter_map(|m| {
290            let role = m.get("role")?.as_str()?;
291            let content = m.get("content")?.as_str()?;
292            if content.is_empty() {
293                return None;
294            }
295            let truncated = if content.len() > 500 {
296                format!("{}...", &content[..500])
297            } else {
298                content.to_string()
299            };
300            Some(format!("[{role}] {truncated}"))
301        })
302        .take(15)
303        .collect();
304    format!(
305        "[auto-compacted {archived_count} older messages]\n{}{}",
306        summary_parts.join("\n"),
307        if archived_count > 15 {
308            format!("\n... and {} more", archived_count - 15)
309        } else {
310            String::new()
311        }
312    )
313}
314
315fn compact_summary_text_from_value(value: &VmValue) -> Result<String, VmError> {
316    if let Some(map) = value.as_dict() {
317        if let Some(summary) = map.get("summary").or_else(|| map.get("text")) {
318            return Ok(summary.display());
319        }
320    }
321    match value {
322        VmValue::String(text) => Ok(text.to_string()),
323        VmValue::Nil => Ok(String::new()),
324        _ => serde_json::to_string_pretty(&vm_value_to_json(value))
325            .map_err(|e| VmError::Runtime(format!("custom compactor encode error: {e}"))),
326    }
327}
328
329async fn llm_compaction_summary(
330    old_messages: &[serde_json::Value],
331    archived_count: usize,
332    llm_opts: &crate::llm::api::LlmCallOptions,
333) -> Result<String, VmError> {
334    let mut compact_opts = llm_opts.clone();
335    let formatted = format_compaction_messages(old_messages);
336    compact_opts.system = None;
337    compact_opts.transcript_id = None;
338    compact_opts.transcript_summary = None;
339    compact_opts.transcript_metadata = None;
340    compact_opts.native_tools = None;
341    compact_opts.tool_choice = None;
342    compact_opts.response_format = None;
343    compact_opts.json_schema = None;
344    compact_opts.messages = vec![serde_json::json!({
345        "role": "user",
346        "content": format!(
347            "Summarize these archived conversation messages for a follow-on coding agent. Preserve goals, constraints, decisions, completed tool work, unresolved issues, and next actions. Output only the summary text.\n\nArchived message count: {archived_count}\n\nConversation:\n{formatted}"
348        ),
349    })];
350    let result = vm_call_llm_full(&compact_opts).await?;
351    let summary = result.text.trim();
352    if summary.is_empty() {
353        Ok(truncate_compaction_summary(old_messages, archived_count))
354    } else {
355        Ok(format!(
356            "[auto-compacted {archived_count} older messages]\n{summary}"
357        ))
358    }
359}
360
361async fn custom_compaction_summary(
362    old_messages: &[serde_json::Value],
363    archived_count: usize,
364    callback: &VmValue,
365) -> Result<String, VmError> {
366    let Some(VmValue::Closure(closure)) = Some(callback.clone()) else {
367        return Err(VmError::Runtime(
368            "compact_callback must be a closure when compact_strategy is 'custom'".to_string(),
369        ));
370    };
371    let mut vm = crate::vm::take_async_builtin_child_vm().ok_or_else(|| {
372        VmError::Runtime(
373            "custom transcript compaction requires an async builtin VM context".to_string(),
374        )
375    })?;
376    let messages_vm = VmValue::List(Rc::new(
377        old_messages
378            .iter()
379            .map(crate::stdlib::json_to_vm_value)
380            .collect(),
381    ));
382    let result = vm.call_closure_pub(&closure, &[messages_vm], &[]).await;
383    crate::vm::restore_async_builtin_child_vm(vm);
384    let summary = compact_summary_text_from_value(&result?)?;
385    if summary.trim().is_empty() {
386        Ok(truncate_compaction_summary(old_messages, archived_count))
387    } else {
388        Ok(format!(
389            "[auto-compacted {archived_count} older messages]\n{summary}"
390        ))
391    }
392}
393
394/// Auto-compact a message list in place: summarize older messages into a
395/// note and keep the most recent `keep_last` messages.
396pub(crate) async fn auto_compact_messages(
397    messages: &mut Vec<serde_json::Value>,
398    config: &AutoCompactConfig,
399    llm_opts: Option<&crate::llm::api::LlmCallOptions>,
400) -> Result<bool, VmError> {
401    if messages.len() <= config.keep_last {
402        return Ok(false);
403    }
404    let split_at = messages.len().saturating_sub(config.keep_last);
405    let old_messages: Vec<_> = messages.drain(..split_at).collect();
406    let archived_count = old_messages.len();
407    let summary = match config.compact_strategy {
408        CompactStrategy::Truncate => truncate_compaction_summary(&old_messages, archived_count),
409        CompactStrategy::Llm => {
410            llm_compaction_summary(
411                &old_messages,
412                archived_count,
413                llm_opts.ok_or_else(|| {
414                    VmError::Runtime(
415                        "LLM transcript compaction requires active LLM call options".to_string(),
416                    )
417                })?,
418            )
419            .await?
420        }
421        CompactStrategy::Custom => {
422            custom_compaction_summary(
423                &old_messages,
424                archived_count,
425                config.custom_compactor.as_ref().ok_or_else(|| {
426                    VmError::Runtime(
427                        "compact_callback is required when compact_strategy is 'custom'"
428                            .to_string(),
429                    )
430                })?,
431            )
432            .await?
433        }
434    };
435    messages.insert(
436        0,
437        serde_json::json!({
438            "role": "user",
439            "content": summary,
440        }),
441    );
442    Ok(true)
443}
444
445// ── Adaptive context assembly ─────────────────────────────────────────
446
447/// Snip an artifact's text to fit within a token budget.
448pub fn microcompact_artifact(artifact: &mut ArtifactRecord, max_tokens: usize) {
449    let max_chars = max_tokens * 4;
450    if let Some(ref text) = artifact.text {
451        if text.len() > max_chars && max_chars >= 200 {
452            artifact.text = Some(microcompact_tool_output(text, max_chars));
453            artifact.estimated_tokens = Some(max_tokens);
454        }
455    }
456}
457
458/// Deduplicate artifacts by removing those with identical text content,
459/// keeping the one with higher priority.
460pub fn dedup_artifacts(artifacts: &mut Vec<ArtifactRecord>) {
461    let mut seen_hashes: BTreeSet<u64> = BTreeSet::new();
462    artifacts.retain(|artifact| {
463        let text = artifact.text.as_deref().unwrap_or("");
464        if text.is_empty() {
465            return true;
466        }
467        // Simple hash for dedup
468        let hash = {
469            use std::hash::{Hash, Hasher};
470            let mut hasher = std::collections::hash_map::DefaultHasher::new();
471            text.hash(&mut hasher);
472            hasher.finish()
473        };
474        seen_hashes.insert(hash)
475    });
476}
477
478/// Enhanced artifact selection: dedup, microcompact oversized artifacts,
479/// then delegate to the standard `select_artifacts`.
480pub fn select_artifacts_adaptive(
481    mut artifacts: Vec<ArtifactRecord>,
482    policy: &ContextPolicy,
483) -> Vec<ArtifactRecord> {
484    // Phase 1: deduplicate
485    dedup_artifacts(&mut artifacts);
486
487    // Phase 2: microcompact oversized artifacts relative to budget.
488    // Cap individual artifacts to a fraction of the total budget, but don't
489    // let the per-artifact cap exceed the total budget (avoid overrun).
490    if let Some(max_tokens) = policy.max_tokens {
491        let count = artifacts.len().max(1);
492        let per_artifact_budget = max_tokens / count;
493        // Floor of 500 tokens, but never more than total budget
494        let cap = per_artifact_budget.max(500).min(max_tokens);
495        for artifact in &mut artifacts {
496            let est = artifact.estimated_tokens.unwrap_or(0);
497            if est > cap * 2 {
498                microcompact_artifact(artifact, cap);
499            }
500        }
501    }
502
503    // Phase 3: standard selection with budget
504    select_artifacts(artifacts, policy)
505}
506
507// ── Per-agent policy with argument patterns ───────────────────────────
508
509/// Extended policy that supports argument-level constraints.
510#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
511#[serde(default)]
512pub struct ToolArgConstraint {
513    /// Tool name to constrain.
514    pub tool: String,
515    /// Glob patterns that the first string argument must match.
516    /// If empty, no argument constraint is applied.
517    pub arg_patterns: Vec<String>,
518}
519
520/// Check if a tool call satisfies argument constraints in the policy.
521pub fn enforce_tool_arg_constraints(
522    policy: &CapabilityPolicy,
523    tool_name: &str,
524    args: &serde_json::Value,
525) -> Result<(), VmError> {
526    for constraint in &policy.tool_arg_constraints {
527        if !glob_match(&constraint.tool, tool_name) {
528            continue;
529        }
530        if constraint.arg_patterns.is_empty() {
531            continue;
532        }
533        // Extract the first string-like argument for pattern matching
534        let first_arg = args
535            .as_object()
536            .and_then(|o| o.values().next())
537            .and_then(|v| v.as_str())
538            .or_else(|| args.as_str())
539            .unwrap_or("");
540        let matches = constraint
541            .arg_patterns
542            .iter()
543            .any(|pattern| glob_match(pattern, first_arg));
544        if !matches {
545            return reject_policy(format!(
546                "tool '{tool_name}' argument '{first_arg}' does not match allowed patterns: {:?}",
547                constraint.arg_patterns
548            ));
549        }
550    }
551    Ok(())
552}
553
554fn normalize_artifact_kind(kind: &str) -> String {
555    match kind {
556        "resource"
557        | "workspace_file"
558        | "editor_selection"
559        | "workspace_snapshot"
560        | "transcript_summary"
561        | "summary"
562        | "plan"
563        | "diff"
564        | "git_diff"
565        | "patch"
566        | "patch_set"
567        | "patch_proposal"
568        | "diff_review"
569        | "review_decision"
570        | "verification_bundle"
571        | "apply_intent"
572        | "verification_result"
573        | "test_result"
574        | "command_result"
575        | "provider_payload"
576        | "worker_result"
577        | "worker_notification"
578        | "artifact" => kind.to_string(),
579        "file" => "workspace_file".to_string(),
580        "transcript" => "transcript_summary".to_string(),
581        "verification" => "verification_result".to_string(),
582        "test" => "test_result".to_string(),
583        other if other.trim().is_empty() => "artifact".to_string(),
584        other => other.to_string(),
585    }
586}
587
588fn default_artifact_priority(kind: &str) -> i64 {
589    match kind {
590        "verification_result" | "test_result" => 100,
591        "verification_bundle" => 95,
592        "diff" | "git_diff" | "patch" | "patch_set" | "patch_proposal" | "diff_review"
593        | "review_decision" | "apply_intent" => 90,
594        "plan" => 80,
595        "workspace_file" | "workspace_snapshot" | "editor_selection" | "resource" => 70,
596        "summary" | "transcript_summary" => 60,
597        "command_result" => 50,
598        _ => 40,
599    }
600}
601
602fn freshness_rank(value: Option<&str>) -> i64 {
603    match value.unwrap_or_default() {
604        "fresh" | "live" => 3,
605        "recent" => 2,
606        "stale" => 0,
607        _ => 1,
608    }
609}
610
611#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
612#[serde(default)]
613pub struct CapabilityPolicy {
614    pub tools: Vec<String>,
615    pub capabilities: BTreeMap<String, Vec<String>>,
616    pub workspace_roots: Vec<String>,
617    pub side_effect_level: Option<String>,
618    pub recursion_limit: Option<usize>,
619    /// Argument-level constraints for specific tools.
620    #[serde(default)]
621    pub tool_arg_constraints: Vec<ToolArgConstraint>,
622}
623
624impl CapabilityPolicy {
625    pub fn intersect(&self, requested: &CapabilityPolicy) -> Result<CapabilityPolicy, String> {
626        let side_effect_level = match (&self.side_effect_level, &requested.side_effect_level) {
627            (Some(a), Some(b)) => Some(min_side_effect(a, b).to_string()),
628            (Some(a), None) => Some(a.clone()),
629            (None, Some(b)) => Some(b.clone()),
630            (None, None) => None,
631        };
632
633        if !self.tools.is_empty() {
634            let denied: Vec<String> = requested
635                .tools
636                .iter()
637                .filter(|tool| !self.tools.contains(*tool))
638                .cloned()
639                .collect();
640            if !denied.is_empty() {
641                return Err(format!(
642                    "requested tools exceed host ceiling: {}",
643                    denied.join(", ")
644                ));
645            }
646        }
647
648        for (capability, requested_ops) in &requested.capabilities {
649            if let Some(allowed_ops) = self.capabilities.get(capability) {
650                let denied: Vec<String> = requested_ops
651                    .iter()
652                    .filter(|op| !allowed_ops.contains(*op))
653                    .cloned()
654                    .collect();
655                if !denied.is_empty() {
656                    return Err(format!(
657                        "requested capability operations exceed host ceiling: {}.{}",
658                        capability,
659                        denied.join(",")
660                    ));
661                }
662            } else if !self.capabilities.is_empty() {
663                return Err(format!(
664                    "requested capability exceeds host ceiling: {capability}"
665                ));
666            }
667        }
668
669        let tools = if self.tools.is_empty() {
670            requested.tools.clone()
671        } else if requested.tools.is_empty() {
672            self.tools.clone()
673        } else {
674            requested
675                .tools
676                .iter()
677                .filter(|tool| self.tools.contains(*tool))
678                .cloned()
679                .collect()
680        };
681
682        let capabilities = if self.capabilities.is_empty() {
683            requested.capabilities.clone()
684        } else if requested.capabilities.is_empty() {
685            self.capabilities.clone()
686        } else {
687            requested
688                .capabilities
689                .iter()
690                .filter_map(|(capability, requested_ops)| {
691                    self.capabilities.get(capability).map(|allowed_ops| {
692                        (
693                            capability.clone(),
694                            requested_ops
695                                .iter()
696                                .filter(|op| allowed_ops.contains(*op))
697                                .cloned()
698                                .collect::<Vec<_>>(),
699                        )
700                    })
701                })
702                .collect()
703        };
704
705        let workspace_roots = if self.workspace_roots.is_empty() {
706            requested.workspace_roots.clone()
707        } else if requested.workspace_roots.is_empty() {
708            self.workspace_roots.clone()
709        } else {
710            requested
711                .workspace_roots
712                .iter()
713                .filter(|root| self.workspace_roots.contains(*root))
714                .cloned()
715                .collect()
716        };
717
718        let recursion_limit = match (self.recursion_limit, requested.recursion_limit) {
719            (Some(a), Some(b)) => Some(a.min(b)),
720            (Some(a), None) => Some(a),
721            (None, Some(b)) => Some(b),
722            (None, None) => None,
723        };
724
725        // Merge arg constraints from both sides
726        let mut tool_arg_constraints = self.tool_arg_constraints.clone();
727        tool_arg_constraints.extend(requested.tool_arg_constraints.clone());
728
729        Ok(CapabilityPolicy {
730            tools,
731            capabilities,
732            workspace_roots,
733            side_effect_level,
734            recursion_limit,
735            tool_arg_constraints,
736        })
737    }
738}
739
740fn min_side_effect<'a>(a: &'a str, b: &'a str) -> &'a str {
741    fn rank(v: &str) -> usize {
742        match v {
743            "none" => 0,
744            "read_only" => 1,
745            "workspace_write" => 2,
746            "process_exec" => 3,
747            "network" => 4,
748            _ => 5,
749        }
750    }
751    if rank(a) <= rank(b) {
752        a
753    } else {
754        b
755    }
756}
757
758#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
759#[serde(default)]
760pub struct ModelPolicy {
761    pub provider: Option<String>,
762    pub model: Option<String>,
763    pub model_tier: Option<String>,
764    pub temperature: Option<f64>,
765    pub max_tokens: Option<i64>,
766}
767
768#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
769#[serde(default)]
770pub struct TranscriptPolicy {
771    pub mode: Option<String>,
772    pub visibility: Option<String>,
773    pub summarize: bool,
774    pub compact: bool,
775    pub keep_last: Option<usize>,
776}
777
778#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
779#[serde(default)]
780pub struct ContextPolicy {
781    pub max_artifacts: Option<usize>,
782    pub max_tokens: Option<usize>,
783    pub reserve_tokens: Option<usize>,
784    pub include_kinds: Vec<String>,
785    pub exclude_kinds: Vec<String>,
786    pub prioritize_kinds: Vec<String>,
787    pub pinned_ids: Vec<String>,
788    pub include_stages: Vec<String>,
789    pub prefer_recent: bool,
790    pub prefer_fresh: bool,
791    pub render: Option<String>,
792}
793
794#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
795#[serde(default)]
796pub struct RetryPolicy {
797    pub max_attempts: usize,
798    pub verify: bool,
799    pub repair: bool,
800}
801
802#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
803#[serde(default)]
804pub struct StageContract {
805    pub input_kinds: Vec<String>,
806    pub output_kinds: Vec<String>,
807    pub min_inputs: Option<usize>,
808    pub max_inputs: Option<usize>,
809    pub require_transcript: bool,
810    pub schema: Option<serde_json::Value>,
811}
812
813#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
814#[serde(default)]
815pub struct BranchSemantics {
816    pub success: Option<String>,
817    pub failure: Option<String>,
818    pub verify_pass: Option<String>,
819    pub verify_fail: Option<String>,
820    pub condition_true: Option<String>,
821    pub condition_false: Option<String>,
822    pub loop_continue: Option<String>,
823    pub loop_exit: Option<String>,
824    pub escalation: Option<String>,
825}
826
827#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
828#[serde(default)]
829pub struct MapPolicy {
830    pub items: Vec<serde_json::Value>,
831    pub item_artifact_kind: Option<String>,
832    pub output_kind: Option<String>,
833    pub max_items: Option<usize>,
834}
835
836#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
837#[serde(default)]
838pub struct JoinPolicy {
839    pub strategy: String,
840    pub require_all_inputs: bool,
841    pub min_completed: Option<usize>,
842}
843
844#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
845#[serde(default)]
846pub struct ReducePolicy {
847    pub strategy: String,
848    pub separator: Option<String>,
849    pub output_kind: Option<String>,
850}
851
852#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
853#[serde(default)]
854pub struct EscalationPolicy {
855    pub level: Option<String>,
856    pub queue: Option<String>,
857    pub reason: Option<String>,
858}
859
860#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
861#[serde(default)]
862pub struct ArtifactRecord {
863    #[serde(rename = "_type")]
864    pub type_name: String,
865    pub id: String,
866    pub kind: String,
867    pub title: Option<String>,
868    pub text: Option<String>,
869    pub data: Option<serde_json::Value>,
870    pub source: Option<String>,
871    pub created_at: String,
872    pub freshness: Option<String>,
873    pub priority: Option<i64>,
874    pub lineage: Vec<String>,
875    pub relevance: Option<f64>,
876    pub estimated_tokens: Option<usize>,
877    pub stage: Option<String>,
878    pub metadata: BTreeMap<String, serde_json::Value>,
879}
880
881impl ArtifactRecord {
882    pub fn normalize(mut self) -> Self {
883        if self.type_name.is_empty() {
884            self.type_name = "artifact".to_string();
885        }
886        if self.id.is_empty() {
887            self.id = new_id("artifact");
888        }
889        if self.created_at.is_empty() {
890            self.created_at = now_rfc3339();
891        }
892        if self.kind.is_empty() {
893            self.kind = "artifact".to_string();
894        }
895        self.kind = normalize_artifact_kind(&self.kind);
896        if self.estimated_tokens.is_none() {
897            self.estimated_tokens = self
898                .text
899                .as_ref()
900                .map(|text| ((text.len() as f64) / 4.0).ceil() as usize);
901        }
902        if self.priority.is_none() {
903            self.priority = Some(default_artifact_priority(&self.kind));
904        }
905        self
906    }
907}
908
909#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
910#[serde(default)]
911pub struct WorkflowNode {
912    pub id: Option<String>,
913    pub kind: String,
914    pub mode: Option<String>,
915    pub prompt: Option<String>,
916    pub system: Option<String>,
917    pub task_label: Option<String>,
918    pub tools: serde_json::Value,
919    pub model_policy: ModelPolicy,
920    pub transcript_policy: TranscriptPolicy,
921    pub context_policy: ContextPolicy,
922    pub retry_policy: RetryPolicy,
923    pub capability_policy: CapabilityPolicy,
924    pub input_contract: StageContract,
925    pub output_contract: StageContract,
926    pub branch_semantics: BranchSemantics,
927    pub map_policy: MapPolicy,
928    pub join_policy: JoinPolicy,
929    pub reduce_policy: ReducePolicy,
930    pub escalation_policy: EscalationPolicy,
931    pub verify: Option<serde_json::Value>,
932    pub metadata: BTreeMap<String, serde_json::Value>,
933}
934
935pub fn workflow_tool_names(value: &serde_json::Value) -> Vec<String> {
936    match value {
937        serde_json::Value::Null => Vec::new(),
938        serde_json::Value::Array(items) => items
939            .iter()
940            .filter_map(|item| match item {
941                serde_json::Value::Object(map) => map
942                    .get("name")
943                    .and_then(|value| value.as_str())
944                    .filter(|name| !name.is_empty())
945                    .map(|name| name.to_string()),
946                _ => None,
947            })
948            .collect(),
949        serde_json::Value::Object(map) => {
950            if map.get("_type").and_then(|value| value.as_str()) == Some("tool_registry") {
951                return map
952                    .get("tools")
953                    .map(workflow_tool_names)
954                    .unwrap_or_default();
955            }
956            map.get("name")
957                .and_then(|value| value.as_str())
958                .filter(|name| !name.is_empty())
959                .map(|name| vec![name.to_string()])
960                .unwrap_or_default()
961        }
962        _ => Vec::new(),
963    }
964}
965
966#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
967#[serde(default)]
968pub struct WorkflowEdge {
969    pub from: String,
970    pub to: String,
971    pub branch: Option<String>,
972    pub label: Option<String>,
973}
974
975#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
976#[serde(default)]
977pub struct WorkflowGraph {
978    #[serde(rename = "_type")]
979    pub type_name: String,
980    pub id: String,
981    pub name: Option<String>,
982    pub version: usize,
983    pub entry: String,
984    pub nodes: BTreeMap<String, WorkflowNode>,
985    pub edges: Vec<WorkflowEdge>,
986    pub capability_policy: CapabilityPolicy,
987    pub metadata: BTreeMap<String, serde_json::Value>,
988    pub audit_log: Vec<WorkflowAuditEntry>,
989}
990
991#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
992#[serde(default)]
993pub struct WorkflowAuditEntry {
994    pub id: String,
995    pub op: String,
996    pub node_id: Option<String>,
997    pub timestamp: String,
998    pub reason: Option<String>,
999    pub metadata: BTreeMap<String, serde_json::Value>,
1000}
1001
1002#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
1003#[serde(default)]
1004pub struct LlmUsageRecord {
1005    pub input_tokens: i64,
1006    pub output_tokens: i64,
1007    pub total_duration_ms: i64,
1008    pub call_count: i64,
1009    pub total_cost: f64,
1010    pub models: Vec<String>,
1011}
1012
1013#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
1014#[serde(default)]
1015pub struct RunStageRecord {
1016    pub id: String,
1017    pub node_id: String,
1018    pub kind: String,
1019    pub status: String,
1020    pub outcome: String,
1021    pub branch: Option<String>,
1022    pub started_at: String,
1023    pub finished_at: Option<String>,
1024    pub visible_text: Option<String>,
1025    pub private_reasoning: Option<String>,
1026    pub transcript: Option<serde_json::Value>,
1027    pub verification: Option<serde_json::Value>,
1028    pub usage: Option<LlmUsageRecord>,
1029    pub artifacts: Vec<ArtifactRecord>,
1030    pub consumed_artifact_ids: Vec<String>,
1031    pub produced_artifact_ids: Vec<String>,
1032    pub attempts: Vec<RunStageAttemptRecord>,
1033    pub metadata: BTreeMap<String, serde_json::Value>,
1034}
1035
1036#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
1037#[serde(default)]
1038pub struct RunStageAttemptRecord {
1039    pub attempt: usize,
1040    pub status: String,
1041    pub outcome: String,
1042    pub branch: Option<String>,
1043    pub error: Option<String>,
1044    pub verification: Option<serde_json::Value>,
1045    pub started_at: String,
1046    pub finished_at: Option<String>,
1047}
1048
1049#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1050#[serde(default)]
1051pub struct RunTransitionRecord {
1052    pub id: String,
1053    pub from_stage_id: Option<String>,
1054    pub from_node_id: Option<String>,
1055    pub to_node_id: String,
1056    pub branch: Option<String>,
1057    pub timestamp: String,
1058    pub consumed_artifact_ids: Vec<String>,
1059    pub produced_artifact_ids: Vec<String>,
1060}
1061
1062#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1063#[serde(default)]
1064pub struct RunCheckpointRecord {
1065    pub id: String,
1066    pub ready_nodes: Vec<String>,
1067    pub completed_nodes: Vec<String>,
1068    pub last_stage_id: Option<String>,
1069    pub persisted_at: String,
1070    pub reason: String,
1071}
1072
1073#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1074#[serde(default)]
1075pub struct ReplayFixture {
1076    #[serde(rename = "_type")]
1077    pub type_name: String,
1078    pub id: String,
1079    pub source_run_id: String,
1080    pub workflow_id: String,
1081    pub workflow_name: Option<String>,
1082    pub created_at: String,
1083    pub expected_status: String,
1084    pub stage_assertions: Vec<ReplayStageAssertion>,
1085}
1086
1087#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1088#[serde(default)]
1089pub struct ReplayStageAssertion {
1090    pub node_id: String,
1091    pub expected_status: String,
1092    pub expected_outcome: String,
1093    pub expected_branch: Option<String>,
1094    pub required_artifact_kinds: Vec<String>,
1095    pub visible_text_contains: Option<String>,
1096}
1097
1098#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1099#[serde(default)]
1100pub struct ReplayEvalReport {
1101    pub pass: bool,
1102    pub failures: Vec<String>,
1103    pub stage_count: usize,
1104}
1105
1106#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1107#[serde(default)]
1108pub struct ReplayEvalCaseReport {
1109    pub run_id: String,
1110    pub workflow_id: String,
1111    pub label: Option<String>,
1112    pub pass: bool,
1113    pub failures: Vec<String>,
1114    pub stage_count: usize,
1115    pub source_path: Option<String>,
1116    pub comparison: Option<RunDiffReport>,
1117}
1118
1119#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1120#[serde(default)]
1121pub struct ReplayEvalSuiteReport {
1122    pub pass: bool,
1123    pub total: usize,
1124    pub passed: usize,
1125    pub failed: usize,
1126    pub cases: Vec<ReplayEvalCaseReport>,
1127}
1128
1129#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1130#[serde(default)]
1131pub struct RunStageDiffRecord {
1132    pub node_id: String,
1133    pub change: String,
1134    pub details: Vec<String>,
1135}
1136
1137#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1138#[serde(default)]
1139pub struct RunDiffReport {
1140    pub left_run_id: String,
1141    pub right_run_id: String,
1142    pub identical: bool,
1143    pub status_changed: bool,
1144    pub left_status: String,
1145    pub right_status: String,
1146    pub stage_diffs: Vec<RunStageDiffRecord>,
1147    pub transition_count_delta: isize,
1148    pub artifact_count_delta: isize,
1149    pub checkpoint_count_delta: isize,
1150}
1151
1152#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1153#[serde(default)]
1154pub struct EvalSuiteManifest {
1155    #[serde(rename = "_type")]
1156    pub type_name: String,
1157    pub id: String,
1158    pub name: Option<String>,
1159    pub base_dir: Option<String>,
1160    pub cases: Vec<EvalSuiteCase>,
1161}
1162
1163#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1164#[serde(default)]
1165pub struct EvalSuiteCase {
1166    pub label: Option<String>,
1167    pub run_path: String,
1168    pub fixture_path: Option<String>,
1169    pub compare_to: Option<String>,
1170}
1171
1172#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
1173#[serde(default)]
1174pub struct RunRecord {
1175    #[serde(rename = "_type")]
1176    pub type_name: String,
1177    pub id: String,
1178    pub workflow_id: String,
1179    pub workflow_name: Option<String>,
1180    pub task: String,
1181    pub status: String,
1182    pub started_at: String,
1183    pub finished_at: Option<String>,
1184    pub parent_run_id: Option<String>,
1185    pub root_run_id: Option<String>,
1186    pub stages: Vec<RunStageRecord>,
1187    pub transitions: Vec<RunTransitionRecord>,
1188    pub checkpoints: Vec<RunCheckpointRecord>,
1189    pub pending_nodes: Vec<String>,
1190    pub completed_nodes: Vec<String>,
1191    pub child_runs: Vec<RunChildRecord>,
1192    pub artifacts: Vec<ArtifactRecord>,
1193    pub policy: CapabilityPolicy,
1194    pub execution: Option<RunExecutionRecord>,
1195    pub transcript: Option<serde_json::Value>,
1196    pub usage: Option<LlmUsageRecord>,
1197    pub replay_fixture: Option<ReplayFixture>,
1198    pub metadata: BTreeMap<String, serde_json::Value>,
1199    pub persisted_path: Option<String>,
1200}
1201
1202#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1203#[serde(default)]
1204pub struct RunChildRecord {
1205    pub worker_id: String,
1206    pub worker_name: String,
1207    pub parent_stage_id: Option<String>,
1208    pub session_id: Option<String>,
1209    pub parent_session_id: Option<String>,
1210    pub mutation_scope: Option<String>,
1211    pub approval_mode: Option<String>,
1212    pub task: String,
1213    pub status: String,
1214    pub started_at: String,
1215    pub finished_at: Option<String>,
1216    pub run_id: Option<String>,
1217    pub run_path: Option<String>,
1218    pub snapshot_path: Option<String>,
1219    pub execution: Option<RunExecutionRecord>,
1220}
1221
1222#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1223#[serde(default)]
1224pub struct RunExecutionRecord {
1225    pub cwd: Option<String>,
1226    pub source_dir: Option<String>,
1227    pub env: BTreeMap<String, String>,
1228    pub adapter: Option<String>,
1229    pub repo_path: Option<String>,
1230    pub worktree_path: Option<String>,
1231    pub branch: Option<String>,
1232    pub base_ref: Option<String>,
1233    pub cleanup: Option<String>,
1234}
1235
1236#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
1237#[serde(default)]
1238pub struct WorkflowValidationReport {
1239    pub valid: bool,
1240    pub errors: Vec<String>,
1241    pub warnings: Vec<String>,
1242    pub reachable_nodes: Vec<String>,
1243}
1244
1245fn parse_json_payload<T: for<'de> Deserialize<'de>>(
1246    json: serde_json::Value,
1247    label: &str,
1248) -> Result<T, VmError> {
1249    let payload = json.to_string();
1250    let mut deserializer = serde_json::Deserializer::from_str(&payload);
1251    let mut tracker = serde_path_to_error::Track::new();
1252    let path_deserializer = serde_path_to_error::Deserializer::new(&mut deserializer, &mut tracker);
1253    T::deserialize(path_deserializer).map_err(|error| {
1254        let snippet = if payload.len() > 600 {
1255            format!("{}...", &payload[..600])
1256        } else {
1257            payload.clone()
1258        };
1259        VmError::Runtime(format!(
1260            "{label} parse error at {}: {} | payload={}",
1261            tracker.path(),
1262            error,
1263            snippet
1264        ))
1265    })
1266}
1267
1268fn parse_json_value<T: for<'de> Deserialize<'de>>(value: &VmValue) -> Result<T, VmError> {
1269    parse_json_payload(vm_value_to_json(value), "orchestration")
1270}
1271
1272pub fn parse_workflow_node_value(value: &VmValue, label: &str) -> Result<WorkflowNode, VmError> {
1273    parse_json_payload(vm_value_to_json(value), label)
1274}
1275
1276pub fn parse_workflow_node_json(
1277    json: serde_json::Value,
1278    label: &str,
1279) -> Result<WorkflowNode, VmError> {
1280    parse_json_payload(json, label)
1281}
1282
1283pub fn parse_workflow_edge_json(
1284    json: serde_json::Value,
1285    label: &str,
1286) -> Result<WorkflowEdge, VmError> {
1287    parse_json_payload(json, label)
1288}
1289
1290pub fn normalize_workflow_value(value: &VmValue) -> Result<WorkflowGraph, VmError> {
1291    let mut graph: WorkflowGraph = parse_json_value(value)?;
1292    let as_dict = value.as_dict().cloned().unwrap_or_default();
1293
1294    if graph.nodes.is_empty() {
1295        for key in ["act", "verify", "repair"] {
1296            if let Some(node_value) = as_dict.get(key) {
1297                let mut node: WorkflowNode = parse_json_value(node_value)?;
1298                let raw_node = node_value.as_dict().cloned().unwrap_or_default();
1299                node.id = Some(key.to_string());
1300                if node.kind.is_empty() {
1301                    node.kind = if key == "verify" {
1302                        "verify".to_string()
1303                    } else {
1304                        "stage".to_string()
1305                    };
1306                }
1307                if node.model_policy.provider.is_none() {
1308                    node.model_policy.provider = as_dict
1309                        .get("provider")
1310                        .map(|value| value.display())
1311                        .filter(|value| !value.is_empty());
1312                }
1313                if node.model_policy.model.is_none() {
1314                    node.model_policy.model = as_dict
1315                        .get("model")
1316                        .map(|value| value.display())
1317                        .filter(|value| !value.is_empty());
1318                }
1319                if node.model_policy.model_tier.is_none() {
1320                    node.model_policy.model_tier = as_dict
1321                        .get("model_tier")
1322                        .or_else(|| as_dict.get("tier"))
1323                        .map(|value| value.display())
1324                        .filter(|value| !value.is_empty());
1325                }
1326                if node.model_policy.temperature.is_none() {
1327                    node.model_policy.temperature = as_dict.get("temperature").and_then(|value| {
1328                        if let VmValue::Float(number) = value {
1329                            Some(*number)
1330                        } else {
1331                            value.as_int().map(|number| number as f64)
1332                        }
1333                    });
1334                }
1335                if node.model_policy.max_tokens.is_none() {
1336                    node.model_policy.max_tokens =
1337                        as_dict.get("max_tokens").and_then(|value| value.as_int());
1338                }
1339                if node.mode.is_none() {
1340                    node.mode = as_dict
1341                        .get("mode")
1342                        .map(|value| value.display())
1343                        .filter(|value| !value.is_empty());
1344                }
1345                if key == "verify"
1346                    && node.verify.is_none()
1347                    && (raw_node.contains_key("assert_text")
1348                        || raw_node.contains_key("command")
1349                        || raw_node.contains_key("expect_status")
1350                        || raw_node.contains_key("expect_text"))
1351                {
1352                    node.verify = Some(serde_json::json!({
1353                        "assert_text": raw_node.get("assert_text").map(vm_value_to_json),
1354                        "command": raw_node.get("command").map(vm_value_to_json),
1355                        "expect_status": raw_node.get("expect_status").map(vm_value_to_json),
1356                        "expect_text": raw_node.get("expect_text").map(vm_value_to_json),
1357                    }));
1358                }
1359                graph.nodes.insert(key.to_string(), node);
1360            }
1361        }
1362        if graph.entry.is_empty() && graph.nodes.contains_key("act") {
1363            graph.entry = "act".to_string();
1364        }
1365        if graph.edges.is_empty() && graph.nodes.contains_key("act") {
1366            if graph.nodes.contains_key("verify") {
1367                graph.edges.push(WorkflowEdge {
1368                    from: "act".to_string(),
1369                    to: "verify".to_string(),
1370                    branch: None,
1371                    label: None,
1372                });
1373            }
1374            if graph.nodes.contains_key("repair") {
1375                graph.edges.push(WorkflowEdge {
1376                    from: "verify".to_string(),
1377                    to: "repair".to_string(),
1378                    branch: Some("failed".to_string()),
1379                    label: None,
1380                });
1381                graph.edges.push(WorkflowEdge {
1382                    from: "repair".to_string(),
1383                    to: "verify".to_string(),
1384                    branch: Some("retry".to_string()),
1385                    label: None,
1386                });
1387            }
1388        }
1389    }
1390
1391    if graph.type_name.is_empty() {
1392        graph.type_name = "workflow_graph".to_string();
1393    }
1394    if graph.id.is_empty() {
1395        graph.id = new_id("workflow");
1396    }
1397    if graph.version == 0 {
1398        graph.version = 1;
1399    }
1400    if graph.entry.is_empty() {
1401        graph.entry = graph
1402            .nodes
1403            .keys()
1404            .next()
1405            .cloned()
1406            .unwrap_or_else(|| "act".to_string());
1407    }
1408    for (node_id, node) in &mut graph.nodes {
1409        if node.id.is_none() {
1410            node.id = Some(node_id.clone());
1411        }
1412        if node.kind.is_empty() {
1413            node.kind = "stage".to_string();
1414        }
1415        if node.join_policy.strategy.is_empty() {
1416            node.join_policy.strategy = "all".to_string();
1417        }
1418        if node.reduce_policy.strategy.is_empty() {
1419            node.reduce_policy.strategy = "concat".to_string();
1420        }
1421        if node.output_contract.output_kinds.is_empty() {
1422            node.output_contract.output_kinds = vec![match node.kind.as_str() {
1423                "verify" => "verification_result".to_string(),
1424                "reduce" => node
1425                    .reduce_policy
1426                    .output_kind
1427                    .clone()
1428                    .unwrap_or_else(|| "summary".to_string()),
1429                "map" => node
1430                    .map_policy
1431                    .output_kind
1432                    .clone()
1433                    .unwrap_or_else(|| "artifact".to_string()),
1434                "escalation" => "plan".to_string(),
1435                _ => "artifact".to_string(),
1436            }];
1437        }
1438        if node.retry_policy.max_attempts == 0 {
1439            node.retry_policy.max_attempts = 1;
1440        }
1441    }
1442    Ok(graph)
1443}
1444
1445pub fn validate_workflow(
1446    graph: &WorkflowGraph,
1447    ceiling: Option<&CapabilityPolicy>,
1448) -> WorkflowValidationReport {
1449    let mut errors = Vec::new();
1450    let mut warnings = Vec::new();
1451
1452    if !graph.nodes.contains_key(&graph.entry) {
1453        errors.push(format!("entry node does not exist: {}", graph.entry));
1454    }
1455
1456    let node_ids: BTreeSet<String> = graph.nodes.keys().cloned().collect();
1457    for edge in &graph.edges {
1458        if !node_ids.contains(&edge.from) {
1459            errors.push(format!("edge.from references unknown node: {}", edge.from));
1460        }
1461        if !node_ids.contains(&edge.to) {
1462            errors.push(format!("edge.to references unknown node: {}", edge.to));
1463        }
1464    }
1465
1466    let reachable_nodes = reachable_nodes(graph);
1467    for node_id in &node_ids {
1468        if !reachable_nodes.contains(node_id) {
1469            warnings.push(format!("node is unreachable: {node_id}"));
1470        }
1471    }
1472
1473    for (node_id, node) in &graph.nodes {
1474        let incoming = graph
1475            .edges
1476            .iter()
1477            .filter(|edge| edge.to == *node_id)
1478            .count();
1479        let outgoing: Vec<&WorkflowEdge> = graph
1480            .edges
1481            .iter()
1482            .filter(|edge| edge.from == *node_id)
1483            .collect();
1484        if let Some(min_inputs) = node.input_contract.min_inputs {
1485            if let Some(max_inputs) = node.input_contract.max_inputs {
1486                if min_inputs > max_inputs {
1487                    errors.push(format!(
1488                        "node {node_id}: input contract min_inputs exceeds max_inputs"
1489                    ));
1490                }
1491            }
1492        }
1493        match node.kind.as_str() {
1494            "condition" => {
1495                let has_true = outgoing
1496                    .iter()
1497                    .any(|edge| edge.branch.as_deref() == Some("true"));
1498                let has_false = outgoing
1499                    .iter()
1500                    .any(|edge| edge.branch.as_deref() == Some("false"));
1501                if !has_true || !has_false {
1502                    errors.push(format!(
1503                        "node {node_id}: condition nodes require both 'true' and 'false' branch edges"
1504                    ));
1505                }
1506            }
1507            "fork" => {
1508                if outgoing.len() < 2 {
1509                    errors.push(format!(
1510                        "node {node_id}: fork nodes require at least two outgoing edges"
1511                    ));
1512                }
1513            }
1514            "join" => {
1515                if incoming < 2 {
1516                    warnings.push(format!(
1517                        "node {node_id}: join node has fewer than two incoming edges"
1518                    ));
1519                }
1520            }
1521            "map" => {
1522                if node.map_policy.items.is_empty()
1523                    && node.map_policy.item_artifact_kind.is_none()
1524                    && node.input_contract.input_kinds.is_empty()
1525                {
1526                    errors.push(format!(
1527                        "node {node_id}: map nodes require items, item_artifact_kind, or input_contract.input_kinds"
1528                    ));
1529                }
1530            }
1531            "reduce" => {
1532                if node.input_contract.input_kinds.is_empty() {
1533                    warnings.push(format!(
1534                        "node {node_id}: reduce node has no input_contract.input_kinds; it will consume all available artifacts"
1535                    ));
1536                }
1537            }
1538            _ => {}
1539        }
1540    }
1541
1542    if let Some(ceiling) = ceiling {
1543        if let Err(error) = ceiling.intersect(&graph.capability_policy) {
1544            errors.push(error);
1545        }
1546        for (node_id, node) in &graph.nodes {
1547            if let Err(error) = ceiling.intersect(&node.capability_policy) {
1548                errors.push(format!("node {node_id}: {error}"));
1549            }
1550        }
1551    }
1552
1553    WorkflowValidationReport {
1554        valid: errors.is_empty(),
1555        errors,
1556        warnings,
1557        reachable_nodes: reachable_nodes.into_iter().collect(),
1558    }
1559}
1560
1561fn reachable_nodes(graph: &WorkflowGraph) -> BTreeSet<String> {
1562    let mut seen = BTreeSet::new();
1563    let mut stack = vec![graph.entry.clone()];
1564    while let Some(node_id) = stack.pop() {
1565        if !seen.insert(node_id.clone()) {
1566            continue;
1567        }
1568        for edge in graph.edges.iter().filter(|edge| edge.from == node_id) {
1569            stack.push(edge.to.clone());
1570        }
1571    }
1572    seen
1573}
1574
1575pub fn select_artifacts(
1576    mut artifacts: Vec<ArtifactRecord>,
1577    policy: &ContextPolicy,
1578) -> Vec<ArtifactRecord> {
1579    artifacts.retain(|artifact| {
1580        (policy.include_kinds.is_empty() || policy.include_kinds.contains(&artifact.kind))
1581            && !policy.exclude_kinds.contains(&artifact.kind)
1582            && (policy.include_stages.is_empty()
1583                || artifact
1584                    .stage
1585                    .as_ref()
1586                    .is_some_and(|stage| policy.include_stages.contains(stage)))
1587    });
1588    artifacts.sort_by(|a, b| {
1589        let b_pinned = policy.pinned_ids.contains(&b.id);
1590        let a_pinned = policy.pinned_ids.contains(&a.id);
1591        b_pinned
1592            .cmp(&a_pinned)
1593            .then_with(|| {
1594                let b_prio_kind = policy.prioritize_kinds.contains(&b.kind);
1595                let a_prio_kind = policy.prioritize_kinds.contains(&a.kind);
1596                b_prio_kind.cmp(&a_prio_kind)
1597            })
1598            .then_with(|| {
1599                b.priority
1600                    .unwrap_or_default()
1601                    .cmp(&a.priority.unwrap_or_default())
1602            })
1603            .then_with(|| {
1604                if policy.prefer_fresh {
1605                    freshness_rank(b.freshness.as_deref())
1606                        .cmp(&freshness_rank(a.freshness.as_deref()))
1607                } else {
1608                    std::cmp::Ordering::Equal
1609                }
1610            })
1611            .then_with(|| {
1612                if policy.prefer_recent {
1613                    b.created_at.cmp(&a.created_at)
1614                } else {
1615                    std::cmp::Ordering::Equal
1616                }
1617            })
1618            .then_with(|| {
1619                b.relevance
1620                    .partial_cmp(&a.relevance)
1621                    .unwrap_or(std::cmp::Ordering::Equal)
1622            })
1623            .then_with(|| {
1624                a.estimated_tokens
1625                    .unwrap_or(usize::MAX)
1626                    .cmp(&b.estimated_tokens.unwrap_or(usize::MAX))
1627            })
1628    });
1629
1630    let mut selected = Vec::new();
1631    let mut used_tokens = 0usize;
1632    let reserve_tokens = policy.reserve_tokens.unwrap_or(0);
1633    let effective_max_tokens = policy
1634        .max_tokens
1635        .map(|max| max.saturating_sub(reserve_tokens));
1636    for artifact in artifacts {
1637        if let Some(max_artifacts) = policy.max_artifacts {
1638            if selected.len() >= max_artifacts {
1639                break;
1640            }
1641        }
1642        let next_tokens = artifact.estimated_tokens.unwrap_or(0);
1643        if let Some(max_tokens) = effective_max_tokens {
1644            if used_tokens + next_tokens > max_tokens {
1645                continue;
1646            }
1647        }
1648        used_tokens += next_tokens;
1649        selected.push(artifact);
1650    }
1651    selected
1652}
1653
1654pub fn render_artifacts_context(artifacts: &[ArtifactRecord], policy: &ContextPolicy) -> String {
1655    let mut parts = Vec::new();
1656    for artifact in artifacts {
1657        let title = artifact
1658            .title
1659            .clone()
1660            .unwrap_or_else(|| format!("{} {}", artifact.kind, artifact.id));
1661        let body = artifact
1662            .text
1663            .clone()
1664            .or_else(|| artifact.data.as_ref().map(|v| v.to_string()))
1665            .unwrap_or_default();
1666        match policy.render.as_deref() {
1667            Some("json") => {
1668                parts.push(
1669                    serde_json::json!({
1670                        "id": artifact.id,
1671                        "kind": artifact.kind,
1672                        "title": title,
1673                        "source": artifact.source,
1674                        "freshness": artifact.freshness,
1675                        "priority": artifact.priority,
1676                        "text": body,
1677                    })
1678                    .to_string(),
1679                );
1680            }
1681            _ => parts.push(format!(
1682                "[{title}] kind={} source={} freshness={} priority={}\n{}",
1683                artifact.kind,
1684                artifact
1685                    .source
1686                    .clone()
1687                    .unwrap_or_else(|| "unknown".to_string()),
1688                artifact
1689                    .freshness
1690                    .clone()
1691                    .unwrap_or_else(|| "normal".to_string()),
1692                artifact.priority.unwrap_or_default(),
1693                body
1694            )),
1695        }
1696    }
1697    parts.join("\n\n")
1698}
1699
1700pub fn normalize_artifact(value: &VmValue) -> Result<ArtifactRecord, VmError> {
1701    let artifact: ArtifactRecord = parse_json_value(value)?;
1702    Ok(artifact.normalize())
1703}
1704
1705pub fn normalize_run_record(value: &VmValue) -> Result<RunRecord, VmError> {
1706    let json = vm_value_to_json(value);
1707    let payload = json.to_string();
1708    let mut deserializer = serde_json::Deserializer::from_str(&payload);
1709    let mut tracker = serde_path_to_error::Track::new();
1710    let path_deserializer = serde_path_to_error::Deserializer::new(&mut deserializer, &mut tracker);
1711    let mut run: RunRecord = RunRecord::deserialize(path_deserializer).map_err(|error| {
1712        let snippet = if payload.len() > 600 {
1713            format!("{}...", &payload[..600])
1714        } else {
1715            payload.clone()
1716        };
1717        VmError::Runtime(format!(
1718            "orchestration parse error at {}: {} | payload={}",
1719            tracker.path(),
1720            error,
1721            snippet
1722        ))
1723    })?;
1724    if run.type_name.is_empty() {
1725        run.type_name = "run_record".to_string();
1726    }
1727    if run.id.is_empty() {
1728        run.id = new_id("run");
1729    }
1730    if run.started_at.is_empty() {
1731        run.started_at = now_rfc3339();
1732    }
1733    if run.status.is_empty() {
1734        run.status = "running".to_string();
1735    }
1736    if run.root_run_id.is_none() {
1737        run.root_run_id = Some(run.id.clone());
1738    }
1739    if run.replay_fixture.is_none() {
1740        run.replay_fixture = Some(replay_fixture_from_run(&run));
1741    }
1742    Ok(run)
1743}
1744
1745pub fn normalize_eval_suite_manifest(value: &VmValue) -> Result<EvalSuiteManifest, VmError> {
1746    let mut manifest: EvalSuiteManifest = parse_json_value(value)?;
1747    if manifest.type_name.is_empty() {
1748        manifest.type_name = "eval_suite_manifest".to_string();
1749    }
1750    if manifest.id.is_empty() {
1751        manifest.id = new_id("eval_suite");
1752    }
1753    Ok(manifest)
1754}
1755
1756fn load_replay_fixture(path: &Path) -> Result<ReplayFixture, VmError> {
1757    let content = std::fs::read_to_string(path)
1758        .map_err(|e| VmError::Runtime(format!("failed to read replay fixture: {e}")))?;
1759    serde_json::from_str(&content)
1760        .map_err(|e| VmError::Runtime(format!("failed to parse replay fixture: {e}")))
1761}
1762
1763fn resolve_manifest_path(base_dir: Option<&Path>, path: &str) -> PathBuf {
1764    let path_buf = PathBuf::from(path);
1765    if path_buf.is_absolute() {
1766        path_buf
1767    } else if let Some(base_dir) = base_dir {
1768        base_dir.join(path_buf)
1769    } else {
1770        path_buf
1771    }
1772}
1773
1774pub fn evaluate_run_suite_manifest(
1775    manifest: &EvalSuiteManifest,
1776) -> Result<ReplayEvalSuiteReport, VmError> {
1777    let base_dir = manifest.base_dir.as_deref().map(Path::new);
1778    let mut reports = Vec::new();
1779    for case in &manifest.cases {
1780        let run_path = resolve_manifest_path(base_dir, &case.run_path);
1781        let run = load_run_record(&run_path)?;
1782        let fixture = match &case.fixture_path {
1783            Some(path) => load_replay_fixture(&resolve_manifest_path(base_dir, path))?,
1784            None => run
1785                .replay_fixture
1786                .clone()
1787                .unwrap_or_else(|| replay_fixture_from_run(&run)),
1788        };
1789        let eval = evaluate_run_against_fixture(&run, &fixture);
1790        let mut pass = eval.pass;
1791        let mut failures = eval.failures;
1792        let comparison = match &case.compare_to {
1793            Some(path) => {
1794                let baseline_path = resolve_manifest_path(base_dir, path);
1795                let baseline = load_run_record(&baseline_path)?;
1796                let diff = diff_run_records(&baseline, &run);
1797                if !diff.identical {
1798                    pass = false;
1799                    failures.push(format!(
1800                        "run differs from baseline {} with {} stage changes",
1801                        baseline_path.display(),
1802                        diff.stage_diffs.len()
1803                    ));
1804                }
1805                Some(diff)
1806            }
1807            None => None,
1808        };
1809        reports.push(ReplayEvalCaseReport {
1810            run_id: run.id.clone(),
1811            workflow_id: run.workflow_id.clone(),
1812            label: case.label.clone(),
1813            pass,
1814            failures,
1815            stage_count: eval.stage_count,
1816            source_path: Some(run_path.display().to_string()),
1817            comparison,
1818        });
1819    }
1820    let total = reports.len();
1821    let passed = reports.iter().filter(|report| report.pass).count();
1822    let failed = total.saturating_sub(passed);
1823    Ok(ReplayEvalSuiteReport {
1824        pass: failed == 0,
1825        total,
1826        passed,
1827        failed,
1828        cases: reports,
1829    })
1830}
1831
1832pub fn render_unified_diff(path: Option<&str>, before: &str, after: &str) -> String {
1833    let before_lines: Vec<&str> = before.lines().collect();
1834    let after_lines: Vec<&str> = after.lines().collect();
1835    let mut table = vec![vec![0usize; after_lines.len() + 1]; before_lines.len() + 1];
1836    for i in (0..before_lines.len()).rev() {
1837        for j in (0..after_lines.len()).rev() {
1838            table[i][j] = if before_lines[i] == after_lines[j] {
1839                table[i + 1][j + 1] + 1
1840            } else {
1841                table[i + 1][j].max(table[i][j + 1])
1842            };
1843        }
1844    }
1845
1846    let mut diff = String::new();
1847    let file = path.unwrap_or("artifact");
1848    diff.push_str(&format!("--- a/{file}\n+++ b/{file}\n"));
1849    let mut i = 0;
1850    let mut j = 0;
1851    while i < before_lines.len() && j < after_lines.len() {
1852        if before_lines[i] == after_lines[j] {
1853            diff.push_str(&format!(" {}\n", before_lines[i]));
1854            i += 1;
1855            j += 1;
1856        } else if table[i + 1][j] >= table[i][j + 1] {
1857            diff.push_str(&format!("-{}\n", before_lines[i]));
1858            i += 1;
1859        } else {
1860            diff.push_str(&format!("+{}\n", after_lines[j]));
1861            j += 1;
1862        }
1863    }
1864    while i < before_lines.len() {
1865        diff.push_str(&format!("-{}\n", before_lines[i]));
1866        i += 1;
1867    }
1868    while j < after_lines.len() {
1869        diff.push_str(&format!("+{}\n", after_lines[j]));
1870        j += 1;
1871    }
1872    diff
1873}
1874
1875pub fn save_run_record(run: &RunRecord, path: Option<&str>) -> Result<String, VmError> {
1876    let path = path
1877        .map(PathBuf::from)
1878        .unwrap_or_else(|| default_run_dir().join(format!("{}.json", run.id)));
1879    if let Some(parent) = path.parent() {
1880        std::fs::create_dir_all(parent)
1881            .map_err(|e| VmError::Runtime(format!("failed to create run directory: {e}")))?;
1882    }
1883    let json = serde_json::to_string_pretty(run)
1884        .map_err(|e| VmError::Runtime(format!("failed to encode run record: {e}")))?;
1885    std::fs::write(&path, json)
1886        .map_err(|e| VmError::Runtime(format!("failed to persist run record: {e}")))?;
1887    Ok(path.to_string_lossy().to_string())
1888}
1889
1890pub fn load_run_record(path: &Path) -> Result<RunRecord, VmError> {
1891    let content = std::fs::read_to_string(path)
1892        .map_err(|e| VmError::Runtime(format!("failed to read run record: {e}")))?;
1893    serde_json::from_str(&content)
1894        .map_err(|e| VmError::Runtime(format!("failed to parse run record: {e}")))
1895}
1896
1897pub fn replay_fixture_from_run(run: &RunRecord) -> ReplayFixture {
1898    ReplayFixture {
1899        type_name: "replay_fixture".to_string(),
1900        id: new_id("fixture"),
1901        source_run_id: run.id.clone(),
1902        workflow_id: run.workflow_id.clone(),
1903        workflow_name: run.workflow_name.clone(),
1904        created_at: now_rfc3339(),
1905        expected_status: run.status.clone(),
1906        stage_assertions: run
1907            .stages
1908            .iter()
1909            .map(|stage| ReplayStageAssertion {
1910                node_id: stage.node_id.clone(),
1911                expected_status: stage.status.clone(),
1912                expected_outcome: stage.outcome.clone(),
1913                expected_branch: stage.branch.clone(),
1914                required_artifact_kinds: stage
1915                    .artifacts
1916                    .iter()
1917                    .map(|artifact| artifact.kind.clone())
1918                    .collect(),
1919                visible_text_contains: stage
1920                    .visible_text
1921                    .as_ref()
1922                    .filter(|text| !text.is_empty())
1923                    .map(|text| text.chars().take(80).collect()),
1924            })
1925            .collect(),
1926    }
1927}
1928
1929pub fn evaluate_run_against_fixture(run: &RunRecord, fixture: &ReplayFixture) -> ReplayEvalReport {
1930    let mut failures = Vec::new();
1931    if run.status != fixture.expected_status {
1932        failures.push(format!(
1933            "run status mismatch: expected {}, got {}",
1934            fixture.expected_status, run.status
1935        ));
1936    }
1937    for assertion in &fixture.stage_assertions {
1938        let Some(stage) = run
1939            .stages
1940            .iter()
1941            .find(|stage| stage.node_id == assertion.node_id)
1942        else {
1943            failures.push(format!("missing stage {}", assertion.node_id));
1944            continue;
1945        };
1946        if stage.status != assertion.expected_status {
1947            failures.push(format!(
1948                "stage {} status mismatch: expected {}, got {}",
1949                assertion.node_id, assertion.expected_status, stage.status
1950            ));
1951        }
1952        if stage.outcome != assertion.expected_outcome {
1953            failures.push(format!(
1954                "stage {} outcome mismatch: expected {}, got {}",
1955                assertion.node_id, assertion.expected_outcome, stage.outcome
1956            ));
1957        }
1958        if stage.branch != assertion.expected_branch {
1959            failures.push(format!(
1960                "stage {} branch mismatch: expected {:?}, got {:?}",
1961                assertion.node_id, assertion.expected_branch, stage.branch
1962            ));
1963        }
1964        for required_kind in &assertion.required_artifact_kinds {
1965            if !stage
1966                .artifacts
1967                .iter()
1968                .any(|artifact| &artifact.kind == required_kind)
1969            {
1970                failures.push(format!(
1971                    "stage {} missing artifact kind {}",
1972                    assertion.node_id, required_kind
1973                ));
1974            }
1975        }
1976        if let Some(snippet) = &assertion.visible_text_contains {
1977            let actual = stage.visible_text.clone().unwrap_or_default();
1978            if !actual.contains(snippet) {
1979                failures.push(format!(
1980                    "stage {} visible text does not contain expected snippet {:?}",
1981                    assertion.node_id, snippet
1982                ));
1983            }
1984        }
1985    }
1986
1987    ReplayEvalReport {
1988        pass: failures.is_empty(),
1989        failures,
1990        stage_count: run.stages.len(),
1991    }
1992}
1993
1994pub fn evaluate_run_suite(
1995    cases: Vec<(RunRecord, ReplayFixture, Option<String>)>,
1996) -> ReplayEvalSuiteReport {
1997    let mut reports = Vec::new();
1998    for (run, fixture, source_path) in cases {
1999        let report = evaluate_run_against_fixture(&run, &fixture);
2000        reports.push(ReplayEvalCaseReport {
2001            run_id: run.id.clone(),
2002            workflow_id: run.workflow_id.clone(),
2003            label: None,
2004            pass: report.pass,
2005            failures: report.failures,
2006            stage_count: report.stage_count,
2007            source_path,
2008            comparison: None,
2009        });
2010    }
2011    let total = reports.len();
2012    let passed = reports.iter().filter(|report| report.pass).count();
2013    let failed = total.saturating_sub(passed);
2014    ReplayEvalSuiteReport {
2015        pass: failed == 0,
2016        total,
2017        passed,
2018        failed,
2019        cases: reports,
2020    }
2021}
2022
2023pub fn diff_run_records(left: &RunRecord, right: &RunRecord) -> RunDiffReport {
2024    let mut stage_diffs = Vec::new();
2025    let mut all_node_ids = BTreeSet::new();
2026    all_node_ids.extend(left.stages.iter().map(|stage| stage.node_id.clone()));
2027    all_node_ids.extend(right.stages.iter().map(|stage| stage.node_id.clone()));
2028
2029    for node_id in all_node_ids {
2030        let left_stage = left.stages.iter().find(|stage| stage.node_id == node_id);
2031        let right_stage = right.stages.iter().find(|stage| stage.node_id == node_id);
2032        match (left_stage, right_stage) {
2033            (Some(_), None) => stage_diffs.push(RunStageDiffRecord {
2034                node_id,
2035                change: "removed".to_string(),
2036                details: vec!["stage missing from right run".to_string()],
2037            }),
2038            (None, Some(_)) => stage_diffs.push(RunStageDiffRecord {
2039                node_id,
2040                change: "added".to_string(),
2041                details: vec!["stage missing from left run".to_string()],
2042            }),
2043            (Some(left_stage), Some(right_stage)) => {
2044                let mut details = Vec::new();
2045                if left_stage.status != right_stage.status {
2046                    details.push(format!(
2047                        "status: {} -> {}",
2048                        left_stage.status, right_stage.status
2049                    ));
2050                }
2051                if left_stage.outcome != right_stage.outcome {
2052                    details.push(format!(
2053                        "outcome: {} -> {}",
2054                        left_stage.outcome, right_stage.outcome
2055                    ));
2056                }
2057                if left_stage.branch != right_stage.branch {
2058                    details.push(format!(
2059                        "branch: {:?} -> {:?}",
2060                        left_stage.branch, right_stage.branch
2061                    ));
2062                }
2063                if left_stage.produced_artifact_ids.len() != right_stage.produced_artifact_ids.len()
2064                {
2065                    details.push(format!(
2066                        "produced_artifacts: {} -> {}",
2067                        left_stage.produced_artifact_ids.len(),
2068                        right_stage.produced_artifact_ids.len()
2069                    ));
2070                }
2071                if left_stage.artifacts.len() != right_stage.artifacts.len() {
2072                    details.push(format!(
2073                        "artifact_records: {} -> {}",
2074                        left_stage.artifacts.len(),
2075                        right_stage.artifacts.len()
2076                    ));
2077                }
2078                if !details.is_empty() {
2079                    stage_diffs.push(RunStageDiffRecord {
2080                        node_id,
2081                        change: "changed".to_string(),
2082                        details,
2083                    });
2084                }
2085            }
2086            (None, None) => {}
2087        }
2088    }
2089
2090    let status_changed = left.status != right.status;
2091    let identical = !status_changed
2092        && stage_diffs.is_empty()
2093        && left.transitions.len() == right.transitions.len()
2094        && left.artifacts.len() == right.artifacts.len()
2095        && left.checkpoints.len() == right.checkpoints.len();
2096
2097    RunDiffReport {
2098        left_run_id: left.id.clone(),
2099        right_run_id: right.id.clone(),
2100        identical,
2101        status_changed,
2102        left_status: left.status.clone(),
2103        right_status: right.status.clone(),
2104        stage_diffs,
2105        transition_count_delta: right.transitions.len() as isize - left.transitions.len() as isize,
2106        artifact_count_delta: right.artifacts.len() as isize - left.artifacts.len() as isize,
2107        checkpoint_count_delta: right.checkpoints.len() as isize - left.checkpoints.len() as isize,
2108    }
2109}
2110
2111pub fn push_execution_policy(policy: CapabilityPolicy) {
2112    EXECUTION_POLICY_STACK.with(|stack| stack.borrow_mut().push(policy));
2113}
2114
2115pub fn pop_execution_policy() {
2116    EXECUTION_POLICY_STACK.with(|stack| {
2117        stack.borrow_mut().pop();
2118    });
2119}
2120
2121pub fn current_execution_policy() -> Option<CapabilityPolicy> {
2122    EXECUTION_POLICY_STACK.with(|stack| stack.borrow().last().cloned())
2123}
2124
2125fn policy_allows_tool(policy: &CapabilityPolicy, tool: &str) -> bool {
2126    policy.tools.is_empty() || policy.tools.iter().any(|allowed| allowed == tool)
2127}
2128
2129fn policy_allows_capability(policy: &CapabilityPolicy, capability: &str, op: &str) -> bool {
2130    policy.capabilities.is_empty()
2131        || policy
2132            .capabilities
2133            .get(capability)
2134            .is_some_and(|ops| ops.is_empty() || ops.iter().any(|allowed| allowed == op))
2135}
2136
2137fn policy_allows_side_effect(policy: &CapabilityPolicy, requested: &str) -> bool {
2138    fn rank(v: &str) -> usize {
2139        match v {
2140            "none" => 0,
2141            "read_only" => 1,
2142            "workspace_write" => 2,
2143            "process_exec" => 3,
2144            "network" => 4,
2145            _ => 5,
2146        }
2147    }
2148    policy
2149        .side_effect_level
2150        .as_ref()
2151        .map(|allowed| rank(allowed) >= rank(requested))
2152        .unwrap_or(true)
2153}
2154
2155fn reject_policy(reason: String) -> Result<(), VmError> {
2156    Err(VmError::CategorizedError {
2157        message: reason,
2158        category: crate::value::ErrorCategory::ToolRejected,
2159    })
2160}
2161
2162pub fn enforce_current_policy_for_builtin(name: &str, args: &[VmValue]) -> Result<(), VmError> {
2163    let Some(policy) = current_execution_policy() else {
2164        return Ok(());
2165    };
2166    match name {
2167        "read" | "read_file" => {
2168            if !policy_allows_tool(&policy, name)
2169                || !policy_allows_capability(&policy, "workspace", "read_text")
2170            {
2171                return reject_policy(format!(
2172                    "builtin '{name}' exceeds workspace.read_text ceiling"
2173                ));
2174            }
2175        }
2176        "search" | "list_dir" => {
2177            if !policy_allows_tool(&policy, name)
2178                || !policy_allows_capability(&policy, "workspace", "list")
2179            {
2180                return reject_policy(format!("builtin '{name}' exceeds workspace.list ceiling"));
2181            }
2182        }
2183        "file_exists" | "stat" => {
2184            if !policy_allows_capability(&policy, "workspace", "exists") {
2185                return reject_policy(format!("builtin '{name}' exceeds workspace.exists ceiling"));
2186            }
2187        }
2188        "edit" | "write_file" | "append_file" | "mkdir" | "copy_file" => {
2189            if !policy_allows_tool(&policy, "edit")
2190                || !policy_allows_capability(&policy, "workspace", "write_text")
2191                || !policy_allows_side_effect(&policy, "workspace_write")
2192            {
2193                return reject_policy(format!("builtin '{name}' exceeds workspace write ceiling"));
2194            }
2195        }
2196        "delete_file" => {
2197            if !policy_allows_capability(&policy, "workspace", "delete")
2198                || !policy_allows_side_effect(&policy, "workspace_write")
2199            {
2200                return reject_policy(
2201                    "builtin 'delete_file' exceeds workspace.delete ceiling".to_string(),
2202                );
2203            }
2204        }
2205        "apply_edit" => {
2206            if !policy_allows_capability(&policy, "workspace", "apply_edit")
2207                || !policy_allows_side_effect(&policy, "workspace_write")
2208            {
2209                return reject_policy(
2210                    "builtin 'apply_edit' exceeds workspace.apply_edit ceiling".to_string(),
2211                );
2212            }
2213        }
2214        "exec" | "exec_at" | "shell" | "shell_at" | "run_command" => {
2215            if !policy_allows_tool(&policy, "run")
2216                || !policy_allows_capability(&policy, "process", "exec")
2217                || !policy_allows_side_effect(&policy, "process_exec")
2218            {
2219                return reject_policy(format!("builtin '{name}' exceeds process.exec ceiling"));
2220            }
2221        }
2222        "http_get" | "http_post" | "http_put" | "http_patch" | "http_delete" | "http_request" => {
2223            if !policy_allows_side_effect(&policy, "network") {
2224                return reject_policy(format!("builtin '{name}' exceeds network ceiling"));
2225            }
2226        }
2227        "mcp_connect"
2228        | "mcp_call"
2229        | "mcp_list_tools"
2230        | "mcp_list_resources"
2231        | "mcp_list_resource_templates"
2232        | "mcp_read_resource"
2233        | "mcp_list_prompts"
2234        | "mcp_get_prompt"
2235        | "mcp_server_info"
2236        | "mcp_disconnect" => {
2237            if !policy_allows_tool(&policy, "run")
2238                || !policy_allows_capability(&policy, "process", "exec")
2239                || !policy_allows_side_effect(&policy, "process_exec")
2240            {
2241                return reject_policy(format!("builtin '{name}' exceeds process.exec ceiling"));
2242            }
2243        }
2244        "host_invoke" => {
2245            let capability = args.first().map(|v| v.display()).unwrap_or_default();
2246            let op = args.get(1).map(|v| v.display()).unwrap_or_default();
2247            if !policy_allows_capability(&policy, &capability, &op) {
2248                return reject_policy(format!(
2249                    "host_invoke {capability}.{op} exceeds capability ceiling"
2250                ));
2251            }
2252            let requested_side_effect = match (capability.as_str(), op.as_str()) {
2253                ("workspace", "write_text" | "apply_edit" | "delete") => "workspace_write",
2254                ("process", "exec") => "process_exec",
2255                _ => "read_only",
2256            };
2257            if !policy_allows_side_effect(&policy, requested_side_effect) {
2258                return reject_policy(format!(
2259                    "host_invoke {capability}.{op} exceeds side-effect ceiling"
2260                ));
2261            }
2262        }
2263        _ => {}
2264    }
2265    Ok(())
2266}
2267
2268pub fn enforce_current_policy_for_bridge_builtin(name: &str) -> Result<(), VmError> {
2269    if current_execution_policy().is_some() {
2270        return reject_policy(format!(
2271            "bridged builtin '{name}' exceeds execution policy; declare an explicit capability/tool surface instead"
2272        ));
2273    }
2274    Ok(())
2275}
2276
2277pub fn enforce_current_policy_for_tool(tool_name: &str) -> Result<(), VmError> {
2278    let Some(policy) = current_execution_policy() else {
2279        return Ok(());
2280    };
2281    if !policy_allows_tool(&policy, tool_name) {
2282        return reject_policy(format!("tool '{tool_name}' exceeds tool ceiling"));
2283    }
2284    Ok(())
2285}
2286
2287fn compact_transcript(transcript: &VmValue, keep_last: usize) -> Option<VmValue> {
2288    let dict = transcript.as_dict()?;
2289    let messages = match dict.get("messages") {
2290        Some(VmValue::List(list)) => list.iter().cloned().collect::<Vec<_>>(),
2291        _ => Vec::new(),
2292    };
2293    let retained = messages
2294        .into_iter()
2295        .rev()
2296        .take(keep_last)
2297        .collect::<Vec<_>>()
2298        .into_iter()
2299        .rev()
2300        .collect::<Vec<_>>();
2301    let mut compacted = dict.clone();
2302    compacted.insert(
2303        "messages".to_string(),
2304        VmValue::List(Rc::new(retained.clone())),
2305    );
2306    compacted.insert(
2307        "events".to_string(),
2308        VmValue::List(Rc::new(
2309            crate::llm::helpers::transcript_events_from_messages(&retained),
2310        )),
2311    );
2312    Some(VmValue::Dict(Rc::new(compacted)))
2313}
2314
2315fn redact_transcript_visibility(transcript: &VmValue, visibility: Option<&str>) -> Option<VmValue> {
2316    let Some(visibility) = visibility else {
2317        return Some(transcript.clone());
2318    };
2319    if visibility != "public" && visibility != "public_only" {
2320        return Some(transcript.clone());
2321    }
2322    let dict = transcript.as_dict()?;
2323    let public_messages = match dict.get("messages") {
2324        Some(VmValue::List(list)) => list
2325            .iter()
2326            .filter(|message| {
2327                message
2328                    .as_dict()
2329                    .and_then(|d| d.get("role"))
2330                    .map(|v| v.display())
2331                    .map(|role| role != "tool_result")
2332                    .unwrap_or(true)
2333            })
2334            .cloned()
2335            .collect::<Vec<_>>(),
2336        _ => Vec::new(),
2337    };
2338    let public_events = match dict.get("events") {
2339        Some(VmValue::List(list)) => list
2340            .iter()
2341            .filter(|event| {
2342                event
2343                    .as_dict()
2344                    .and_then(|d| d.get("visibility"))
2345                    .map(|v| v.display())
2346                    .map(|value| value == "public")
2347                    .unwrap_or(true)
2348            })
2349            .cloned()
2350            .collect::<Vec<_>>(),
2351        _ => Vec::new(),
2352    };
2353    let mut redacted = dict.clone();
2354    redacted.insert(
2355        "messages".to_string(),
2356        VmValue::List(Rc::new(public_messages)),
2357    );
2358    redacted.insert("events".to_string(), VmValue::List(Rc::new(public_events)));
2359    Some(VmValue::Dict(Rc::new(redacted)))
2360}
2361
2362pub(crate) fn apply_input_transcript_policy(
2363    transcript: Option<VmValue>,
2364    policy: &TranscriptPolicy,
2365) -> Option<VmValue> {
2366    let mut transcript = transcript;
2367    match policy.mode.as_deref() {
2368        Some("reset") => return None,
2369        Some("fork") => {
2370            if let Some(VmValue::Dict(dict)) = transcript.as_ref() {
2371                let mut forked = dict.as_ref().clone();
2372                forked.insert(
2373                    "id".to_string(),
2374                    VmValue::String(Rc::from(new_id("transcript"))),
2375                );
2376                transcript = Some(VmValue::Dict(Rc::new(forked)));
2377            }
2378        }
2379        _ => {}
2380    }
2381    if policy.compact {
2382        let keep_last = policy.keep_last.unwrap_or(6);
2383        transcript = transcript.and_then(|value| compact_transcript(&value, keep_last));
2384    }
2385    transcript
2386}
2387
2388fn apply_output_transcript_policy(
2389    transcript: Option<VmValue>,
2390    policy: &TranscriptPolicy,
2391) -> Option<VmValue> {
2392    let mut transcript = transcript;
2393    if policy.compact {
2394        let keep_last = policy.keep_last.unwrap_or(6);
2395        transcript = transcript.and_then(|value| compact_transcript(&value, keep_last));
2396    }
2397    transcript.and_then(|value| redact_transcript_visibility(&value, policy.visibility.as_deref()))
2398}
2399
2400pub async fn execute_stage_node(
2401    node_id: &str,
2402    node: &WorkflowNode,
2403    task: &str,
2404    artifacts: &[ArtifactRecord],
2405    transcript: Option<VmValue>,
2406) -> Result<(serde_json::Value, Vec<ArtifactRecord>, Option<VmValue>), VmError> {
2407    let mut selection_policy = node.context_policy.clone();
2408    if selection_policy.include_kinds.is_empty() && !node.input_contract.input_kinds.is_empty() {
2409        selection_policy.include_kinds = node.input_contract.input_kinds.clone();
2410    }
2411    let selected = select_artifacts_adaptive(artifacts.to_vec(), &selection_policy);
2412    let rendered_context = render_artifacts_context(&selected, &node.context_policy);
2413    let transcript = apply_input_transcript_policy(transcript, &node.transcript_policy);
2414    if node.input_contract.require_transcript && transcript.is_none() {
2415        return Err(VmError::Runtime(format!(
2416            "workflow stage {node_id} requires transcript input"
2417        )));
2418    }
2419    if let Some(min_inputs) = node.input_contract.min_inputs {
2420        if selected.len() < min_inputs {
2421            return Err(VmError::Runtime(format!(
2422                "workflow stage {node_id} requires at least {min_inputs} input artifacts"
2423            )));
2424        }
2425    }
2426    if let Some(max_inputs) = node.input_contract.max_inputs {
2427        if selected.len() > max_inputs {
2428            return Err(VmError::Runtime(format!(
2429                "workflow stage {node_id} accepts at most {max_inputs} input artifacts"
2430            )));
2431        }
2432    }
2433    let prompt = if rendered_context.is_empty() {
2434        task.to_string()
2435    } else {
2436        format!(
2437            "{rendered_context}\n\n{}:\n{task}",
2438            node.task_label
2439                .clone()
2440                .unwrap_or_else(|| "Task".to_string())
2441        )
2442    };
2443
2444    let mut options = BTreeMap::new();
2445    if let Some(provider) = &node.model_policy.provider {
2446        options.insert(
2447            "provider".to_string(),
2448            VmValue::String(Rc::from(provider.clone())),
2449        );
2450    }
2451    if let Some(model) = &node.model_policy.model {
2452        options.insert(
2453            "model".to_string(),
2454            VmValue::String(Rc::from(model.clone())),
2455        );
2456    }
2457    if let Some(model_tier) = &node.model_policy.model_tier {
2458        options.insert(
2459            "model_tier".to_string(),
2460            VmValue::String(Rc::from(model_tier.clone())),
2461        );
2462    }
2463    if let Some(temperature) = node.model_policy.temperature {
2464        options.insert("temperature".to_string(), VmValue::Float(temperature));
2465    }
2466    if let Some(max_tokens) = node.model_policy.max_tokens {
2467        options.insert("max_tokens".to_string(), VmValue::Int(max_tokens));
2468    }
2469    let tool_names = workflow_tool_names(&node.tools);
2470    if !matches!(node.tools, serde_json::Value::Null) && !tool_names.is_empty() {
2471        options.insert(
2472            "tools".to_string(),
2473            crate::stdlib::json_to_vm_value(&node.tools),
2474        );
2475    }
2476    if let Some(transcript) = transcript.clone() {
2477        options.insert("transcript".to_string(), transcript);
2478    }
2479
2480    let args = vec![
2481        VmValue::String(Rc::from(prompt)),
2482        node.system
2483            .clone()
2484            .map(|s| VmValue::String(Rc::from(s)))
2485            .unwrap_or(VmValue::Nil),
2486        VmValue::Dict(Rc::new(options)),
2487    ];
2488    let mut opts = extract_llm_options(&args)?;
2489
2490    let llm_result = if node.mode.as_deref() == Some("agent") || !tool_names.is_empty() {
2491        crate::llm::run_agent_loop_internal(
2492            &mut opts,
2493            crate::llm::AgentLoopConfig {
2494                persistent: true,
2495                max_iterations: 12,
2496                max_nudges: 3,
2497                nudge: None,
2498                tool_retries: 0,
2499                tool_backoff_ms: 1000,
2500                tool_format: "text".to_string(),
2501                auto_compact: None,
2502                policy: None,
2503                daemon: false,
2504            },
2505        )
2506        .await?
2507    } else {
2508        let result = vm_call_llm_full(&opts).await?;
2509        crate::llm::agent_loop_result_from_llm(&result, opts)
2510    };
2511
2512    let visible_text = llm_result["text"].as_str().unwrap_or_default().to_string();
2513    let transcript = llm_result
2514        .get("transcript")
2515        .cloned()
2516        .map(|value| crate::stdlib::json_to_vm_value(&value));
2517    let transcript = apply_output_transcript_policy(transcript, &node.transcript_policy);
2518    let output_kind = node
2519        .output_contract
2520        .output_kinds
2521        .first()
2522        .cloned()
2523        .unwrap_or_else(|| {
2524            if node.kind == "verify" {
2525                "verification_result".to_string()
2526            } else {
2527                "artifact".to_string()
2528            }
2529        });
2530    let mut metadata = BTreeMap::new();
2531    metadata.insert(
2532        "input_artifact_ids".to_string(),
2533        serde_json::json!(selected
2534            .iter()
2535            .map(|artifact| artifact.id.clone())
2536            .collect::<Vec<_>>()),
2537    );
2538    metadata.insert("node_kind".to_string(), serde_json::json!(node.kind));
2539    let artifact = ArtifactRecord {
2540        type_name: "artifact".to_string(),
2541        id: new_id("artifact"),
2542        kind: output_kind,
2543        title: Some(format!("stage {node_id} output")),
2544        text: Some(visible_text),
2545        data: Some(llm_result.clone()),
2546        source: Some(node_id.to_string()),
2547        created_at: now_rfc3339(),
2548        freshness: Some("fresh".to_string()),
2549        priority: None,
2550        lineage: selected
2551            .iter()
2552            .map(|artifact| artifact.id.clone())
2553            .collect(),
2554        relevance: Some(1.0),
2555        estimated_tokens: None,
2556        stage: Some(node_id.to_string()),
2557        metadata,
2558    }
2559    .normalize();
2560
2561    Ok((llm_result, vec![artifact], transcript))
2562}
2563
2564pub fn next_nodes_for(
2565    graph: &WorkflowGraph,
2566    current: &str,
2567    branch: Option<&str>,
2568) -> Vec<WorkflowEdge> {
2569    let mut matching: Vec<WorkflowEdge> = graph
2570        .edges
2571        .iter()
2572        .filter(|edge| edge.from == current && edge.branch.as_deref() == branch)
2573        .cloned()
2574        .collect();
2575    if matching.is_empty() {
2576        matching = graph
2577            .edges
2578            .iter()
2579            .filter(|edge| edge.from == current && edge.branch.is_none())
2580            .cloned()
2581            .collect();
2582    }
2583    matching
2584}
2585
2586pub fn next_node_for(graph: &WorkflowGraph, current: &str, branch: &str) -> Option<String> {
2587    next_nodes_for(graph, current, Some(branch))
2588        .into_iter()
2589        .next()
2590        .map(|edge| edge.to)
2591}
2592
2593pub fn append_audit_entry(
2594    graph: &mut WorkflowGraph,
2595    op: &str,
2596    node_id: Option<String>,
2597    reason: Option<String>,
2598    metadata: BTreeMap<String, serde_json::Value>,
2599) {
2600    graph.audit_log.push(WorkflowAuditEntry {
2601        id: new_id("audit"),
2602        op: op.to_string(),
2603        node_id,
2604        timestamp: now_rfc3339(),
2605        reason,
2606        metadata,
2607    });
2608}
2609
2610pub fn builtin_ceiling() -> CapabilityPolicy {
2611    CapabilityPolicy {
2612        tools: vec![
2613            "read".to_string(),
2614            "read_file".to_string(),
2615            "search".to_string(),
2616            "edit".to_string(),
2617            "run".to_string(),
2618            "exec".to_string(),
2619            "outline".to_string(),
2620            "list_directory".to_string(),
2621            "lsp_hover".to_string(),
2622            "lsp_definition".to_string(),
2623            "lsp_references".to_string(),
2624            "web_search".to_string(),
2625            "web_fetch".to_string(),
2626        ],
2627        capabilities: BTreeMap::from([
2628            (
2629                "workspace".to_string(),
2630                vec![
2631                    "read_text".to_string(),
2632                    "write_text".to_string(),
2633                    "apply_edit".to_string(),
2634                    "delete".to_string(),
2635                    "exists".to_string(),
2636                    "list".to_string(),
2637                ],
2638            ),
2639            ("process".to_string(), vec!["exec".to_string()]),
2640        ]),
2641        workspace_roots: Vec::new(),
2642        side_effect_level: Some("network".to_string()),
2643        recursion_limit: Some(8),
2644        tool_arg_constraints: Vec::new(),
2645    }
2646}
2647
2648#[cfg(test)]
2649mod tests {
2650    use super::*;
2651
2652    #[test]
2653    fn capability_intersection_rejects_privilege_expansion() {
2654        let ceiling = CapabilityPolicy {
2655            tools: vec!["read".to_string()],
2656            side_effect_level: Some("read_only".to_string()),
2657            recursion_limit: Some(2),
2658            ..Default::default()
2659        };
2660        let requested = CapabilityPolicy {
2661            tools: vec!["read".to_string(), "edit".to_string()],
2662            ..Default::default()
2663        };
2664        let error = ceiling.intersect(&requested).unwrap_err();
2665        assert!(error.contains("host ceiling"));
2666    }
2667
2668    #[test]
2669    fn mutation_session_normalize_fills_defaults() {
2670        let normalized = MutationSessionRecord::default().normalize();
2671        assert!(normalized.session_id.starts_with("session_"));
2672        assert_eq!(normalized.mutation_scope, "read_only");
2673        assert_eq!(normalized.approval_mode, "host_enforced");
2674    }
2675
2676    #[test]
2677    fn install_current_mutation_session_round_trips() {
2678        install_current_mutation_session(Some(MutationSessionRecord {
2679            session_id: "session_test".to_string(),
2680            mutation_scope: "apply_workspace".to_string(),
2681            approval_mode: "explicit".to_string(),
2682            ..Default::default()
2683        }));
2684        let current = current_mutation_session().expect("session installed");
2685        assert_eq!(current.session_id, "session_test");
2686        assert_eq!(current.mutation_scope, "apply_workspace");
2687        assert_eq!(current.approval_mode, "explicit");
2688
2689        install_current_mutation_session(None);
2690        assert!(current_mutation_session().is_none());
2691    }
2692
2693    #[test]
2694    fn active_execution_policy_rejects_unknown_bridge_builtin() {
2695        push_execution_policy(CapabilityPolicy {
2696            tools: vec!["read".to_string()],
2697            capabilities: BTreeMap::from([(
2698                "workspace".to_string(),
2699                vec!["read_text".to_string()],
2700            )]),
2701            side_effect_level: Some("read_only".to_string()),
2702            recursion_limit: Some(1),
2703            ..Default::default()
2704        });
2705        let error = enforce_current_policy_for_bridge_builtin("custom_host_builtin").unwrap_err();
2706        pop_execution_policy();
2707        assert!(matches!(
2708            error,
2709            VmError::CategorizedError {
2710                category: crate::value::ErrorCategory::ToolRejected,
2711                ..
2712            }
2713        ));
2714    }
2715
2716    #[test]
2717    fn active_execution_policy_rejects_mcp_escape_hatch() {
2718        push_execution_policy(CapabilityPolicy {
2719            tools: vec!["read".to_string()],
2720            capabilities: BTreeMap::from([(
2721                "workspace".to_string(),
2722                vec!["read_text".to_string()],
2723            )]),
2724            side_effect_level: Some("read_only".to_string()),
2725            recursion_limit: Some(1),
2726            ..Default::default()
2727        });
2728        let error = enforce_current_policy_for_builtin("mcp_connect", &[]).unwrap_err();
2729        pop_execution_policy();
2730        assert!(matches!(
2731            error,
2732            VmError::CategorizedError {
2733                category: crate::value::ErrorCategory::ToolRejected,
2734                ..
2735            }
2736        ));
2737    }
2738
2739    #[test]
2740    fn workflow_normalization_upgrades_legacy_act_verify_repair_shape() {
2741        let value = crate::stdlib::json_to_vm_value(&serde_json::json!({
2742            "name": "legacy",
2743            "act": {"mode": "llm"},
2744            "verify": {"kind": "verify"},
2745            "repair": {"mode": "agent"},
2746        }));
2747        let graph = normalize_workflow_value(&value).unwrap();
2748        assert_eq!(graph.type_name, "workflow_graph");
2749        assert!(graph.nodes.contains_key("act"));
2750        assert!(graph.nodes.contains_key("verify"));
2751        assert!(graph.nodes.contains_key("repair"));
2752        assert_eq!(graph.entry, "act");
2753    }
2754
2755    #[test]
2756    fn workflow_normalization_accepts_tool_registry_nodes() {
2757        let value = crate::stdlib::json_to_vm_value(&serde_json::json!({
2758            "name": "registry_tools",
2759            "entry": "implement",
2760            "nodes": {
2761                "implement": {
2762                    "kind": "stage",
2763                    "mode": "agent",
2764                    "tools": {
2765                        "_type": "tool_registry",
2766                        "tools": [
2767                            {"name": "read", "description": "Read files"},
2768                            {"name": "run", "description": "Run commands"}
2769                        ]
2770                    }
2771                }
2772            },
2773            "edges": []
2774        }));
2775        let graph = normalize_workflow_value(&value).unwrap();
2776        let node = graph.nodes.get("implement").unwrap();
2777        assert_eq!(workflow_tool_names(&node.tools), vec!["read", "run"]);
2778    }
2779
2780    #[test]
2781    fn artifact_selection_honors_budget_and_priority() {
2782        let policy = ContextPolicy {
2783            max_artifacts: Some(2),
2784            max_tokens: Some(30),
2785            prefer_recent: true,
2786            prefer_fresh: true,
2787            prioritize_kinds: vec!["verification_result".to_string()],
2788            ..Default::default()
2789        };
2790        let artifacts = vec![
2791            ArtifactRecord {
2792                type_name: "artifact".to_string(),
2793                id: "a".to_string(),
2794                kind: "summary".to_string(),
2795                text: Some("short".to_string()),
2796                relevance: Some(0.9),
2797                created_at: now_rfc3339(),
2798                ..Default::default()
2799            }
2800            .normalize(),
2801            ArtifactRecord {
2802                type_name: "artifact".to_string(),
2803                id: "b".to_string(),
2804                kind: "summary".to_string(),
2805                text: Some("this is a much larger artifact body".to_string()),
2806                relevance: Some(1.0),
2807                created_at: now_rfc3339(),
2808                ..Default::default()
2809            }
2810            .normalize(),
2811            ArtifactRecord {
2812                type_name: "artifact".to_string(),
2813                id: "c".to_string(),
2814                kind: "summary".to_string(),
2815                text: Some("tiny".to_string()),
2816                relevance: Some(0.5),
2817                created_at: now_rfc3339(),
2818                ..Default::default()
2819            }
2820            .normalize(),
2821        ];
2822        let selected = select_artifacts(artifacts, &policy);
2823        assert_eq!(selected.len(), 2);
2824        assert!(selected.iter().all(|artifact| artifact.kind == "summary"));
2825    }
2826
2827    #[test]
2828    fn workflow_validation_rejects_condition_without_true_false_edges() {
2829        let graph = WorkflowGraph {
2830            entry: "gate".to_string(),
2831            nodes: BTreeMap::from([(
2832                "gate".to_string(),
2833                WorkflowNode {
2834                    id: Some("gate".to_string()),
2835                    kind: "condition".to_string(),
2836                    ..Default::default()
2837                },
2838            )]),
2839            edges: vec![WorkflowEdge {
2840                from: "gate".to_string(),
2841                to: "next".to_string(),
2842                branch: Some("true".to_string()),
2843                label: None,
2844            }],
2845            ..Default::default()
2846        };
2847        let report = validate_workflow(&graph, None);
2848        assert!(!report.valid);
2849        assert!(report
2850            .errors
2851            .iter()
2852            .any(|error| error.contains("true") && error.contains("false")));
2853    }
2854
2855    #[test]
2856    fn replay_fixture_round_trip_passes() {
2857        let run = RunRecord {
2858            type_name: "run_record".to_string(),
2859            id: "run_1".to_string(),
2860            workflow_id: "wf".to_string(),
2861            workflow_name: Some("demo".to_string()),
2862            task: "demo".to_string(),
2863            status: "completed".to_string(),
2864            started_at: "1".to_string(),
2865            finished_at: Some("2".to_string()),
2866            parent_run_id: None,
2867            root_run_id: Some("run_1".to_string()),
2868            stages: vec![RunStageRecord {
2869                id: "stage_1".to_string(),
2870                node_id: "act".to_string(),
2871                kind: "stage".to_string(),
2872                status: "completed".to_string(),
2873                outcome: "success".to_string(),
2874                branch: Some("success".to_string()),
2875                started_at: "1".to_string(),
2876                finished_at: Some("2".to_string()),
2877                visible_text: Some("done".to_string()),
2878                private_reasoning: None,
2879                transcript: None,
2880                verification: None,
2881                usage: None,
2882                artifacts: vec![ArtifactRecord {
2883                    type_name: "artifact".to_string(),
2884                    id: "a1".to_string(),
2885                    kind: "summary".to_string(),
2886                    text: Some("done".to_string()),
2887                    created_at: "1".to_string(),
2888                    ..Default::default()
2889                }
2890                .normalize()],
2891                consumed_artifact_ids: vec![],
2892                produced_artifact_ids: vec!["a1".to_string()],
2893                attempts: vec![],
2894                metadata: BTreeMap::new(),
2895            }],
2896            transitions: vec![],
2897            checkpoints: vec![],
2898            pending_nodes: vec![],
2899            completed_nodes: vec!["act".to_string()],
2900            child_runs: vec![],
2901            artifacts: vec![],
2902            policy: CapabilityPolicy::default(),
2903            execution: None,
2904            transcript: None,
2905            usage: None,
2906            replay_fixture: None,
2907            metadata: BTreeMap::new(),
2908            persisted_path: None,
2909        };
2910        let fixture = replay_fixture_from_run(&run);
2911        let report = evaluate_run_against_fixture(&run, &fixture);
2912        assert!(report.pass);
2913        assert!(report.failures.is_empty());
2914    }
2915
2916    #[test]
2917    fn replay_eval_suite_reports_failed_case() {
2918        let good = RunRecord {
2919            id: "run_good".to_string(),
2920            workflow_id: "wf".to_string(),
2921            status: "completed".to_string(),
2922            stages: vec![RunStageRecord {
2923                node_id: "act".to_string(),
2924                status: "completed".to_string(),
2925                outcome: "success".to_string(),
2926                ..Default::default()
2927            }],
2928            ..Default::default()
2929        };
2930        let bad = RunRecord {
2931            id: "run_bad".to_string(),
2932            workflow_id: "wf".to_string(),
2933            status: "failed".to_string(),
2934            stages: vec![RunStageRecord {
2935                node_id: "act".to_string(),
2936                status: "failed".to_string(),
2937                outcome: "error".to_string(),
2938                ..Default::default()
2939            }],
2940            ..Default::default()
2941        };
2942        let suite = evaluate_run_suite(vec![
2943            (
2944                good.clone(),
2945                replay_fixture_from_run(&good),
2946                Some("good.json".to_string()),
2947            ),
2948            (
2949                bad.clone(),
2950                replay_fixture_from_run(&good),
2951                Some("bad.json".to_string()),
2952            ),
2953        ]);
2954        assert!(!suite.pass);
2955        assert_eq!(suite.total, 2);
2956        assert_eq!(suite.failed, 1);
2957        assert!(suite.cases.iter().any(|case| !case.pass));
2958    }
2959
2960    #[test]
2961    fn run_diff_reports_changed_stage() {
2962        let left = RunRecord {
2963            id: "left".to_string(),
2964            workflow_id: "wf".to_string(),
2965            status: "completed".to_string(),
2966            stages: vec![RunStageRecord {
2967                node_id: "act".to_string(),
2968                status: "completed".to_string(),
2969                outcome: "success".to_string(),
2970                ..Default::default()
2971            }],
2972            ..Default::default()
2973        };
2974        let right = RunRecord {
2975            id: "right".to_string(),
2976            workflow_id: "wf".to_string(),
2977            status: "failed".to_string(),
2978            stages: vec![RunStageRecord {
2979                node_id: "act".to_string(),
2980                status: "failed".to_string(),
2981                outcome: "error".to_string(),
2982                ..Default::default()
2983            }],
2984            ..Default::default()
2985        };
2986        let diff = diff_run_records(&left, &right);
2987        assert!(diff.status_changed);
2988        assert!(!diff.identical);
2989        assert_eq!(diff.stage_diffs.len(), 1);
2990    }
2991
2992    #[test]
2993    fn eval_suite_manifest_can_fail_on_baseline_diff() {
2994        let temp_dir =
2995            std::env::temp_dir().join(format!("harn-eval-suite-{}", uuid::Uuid::now_v7()));
2996        std::fs::create_dir_all(&temp_dir).unwrap();
2997        let baseline_path = temp_dir.join("baseline.json");
2998        let candidate_path = temp_dir.join("candidate.json");
2999
3000        let baseline = RunRecord {
3001            id: "baseline".to_string(),
3002            workflow_id: "wf".to_string(),
3003            status: "completed".to_string(),
3004            stages: vec![RunStageRecord {
3005                node_id: "act".to_string(),
3006                status: "completed".to_string(),
3007                outcome: "success".to_string(),
3008                ..Default::default()
3009            }],
3010            ..Default::default()
3011        };
3012        let candidate = RunRecord {
3013            id: "candidate".to_string(),
3014            workflow_id: "wf".to_string(),
3015            status: "failed".to_string(),
3016            stages: vec![RunStageRecord {
3017                node_id: "act".to_string(),
3018                status: "failed".to_string(),
3019                outcome: "error".to_string(),
3020                ..Default::default()
3021            }],
3022            ..Default::default()
3023        };
3024
3025        save_run_record(&baseline, Some(baseline_path.to_str().unwrap())).unwrap();
3026        save_run_record(&candidate, Some(candidate_path.to_str().unwrap())).unwrap();
3027
3028        let manifest = EvalSuiteManifest {
3029            base_dir: Some(temp_dir.display().to_string()),
3030            cases: vec![EvalSuiteCase {
3031                label: Some("candidate".to_string()),
3032                run_path: "candidate.json".to_string(),
3033                fixture_path: None,
3034                compare_to: Some("baseline.json".to_string()),
3035            }],
3036            ..Default::default()
3037        };
3038        let suite = evaluate_run_suite_manifest(&manifest).unwrap();
3039        assert!(!suite.pass);
3040        assert_eq!(suite.failed, 1);
3041        assert!(suite.cases[0].comparison.is_some());
3042        assert!(suite.cases[0]
3043            .failures
3044            .iter()
3045            .any(|failure| failure.contains("baseline")));
3046    }
3047
3048    #[test]
3049    fn render_unified_diff_marks_removed_and_added_lines() {
3050        let diff = render_unified_diff(Some("src/main.rs"), "old\nsame", "new\nsame");
3051        assert!(diff.contains("--- a/src/main.rs"));
3052        assert!(diff.contains("+++ b/src/main.rs"));
3053        assert!(diff.contains("-old"));
3054        assert!(diff.contains("+new"));
3055        assert!(diff.contains(" same"));
3056    }
3057
3058    #[test]
3059    fn execution_policy_rejects_process_exec_when_read_only() {
3060        push_execution_policy(CapabilityPolicy {
3061            side_effect_level: Some("read_only".to_string()),
3062            capabilities: BTreeMap::from([("process".to_string(), vec!["exec".to_string()])]),
3063            ..Default::default()
3064        });
3065        let result = enforce_current_policy_for_builtin("exec", &[]);
3066        pop_execution_policy();
3067        assert!(result.is_err());
3068    }
3069
3070    #[test]
3071    fn execution_policy_rejects_unlisted_tool() {
3072        push_execution_policy(CapabilityPolicy {
3073            tools: vec!["read".to_string()],
3074            ..Default::default()
3075        });
3076        let result = enforce_current_policy_for_tool("edit");
3077        pop_execution_policy();
3078        assert!(result.is_err());
3079    }
3080
3081    // ── Tool hook tests ──────────────────────────────────────────────
3082
3083    #[test]
3084    fn pre_tool_hook_deny_blocks_execution() {
3085        clear_tool_hooks();
3086        register_tool_hook(ToolHook {
3087            pattern: "dangerous_*".to_string(),
3088            pre: Some(Rc::new(|_name, _args| {
3089                PreToolAction::Deny("blocked by policy".to_string())
3090            })),
3091            post: None,
3092        });
3093        let result = run_pre_tool_hooks("dangerous_delete", &serde_json::json!({}));
3094        clear_tool_hooks();
3095        assert!(matches!(result, PreToolAction::Deny(_)));
3096    }
3097
3098    #[test]
3099    fn pre_tool_hook_allow_passes_through() {
3100        clear_tool_hooks();
3101        register_tool_hook(ToolHook {
3102            pattern: "safe_*".to_string(),
3103            pre: Some(Rc::new(|_name, _args| PreToolAction::Allow)),
3104            post: None,
3105        });
3106        let result = run_pre_tool_hooks("safe_read", &serde_json::json!({}));
3107        clear_tool_hooks();
3108        assert!(matches!(result, PreToolAction::Allow));
3109    }
3110
3111    #[test]
3112    fn pre_tool_hook_modify_rewrites_args() {
3113        clear_tool_hooks();
3114        register_tool_hook(ToolHook {
3115            pattern: "*".to_string(),
3116            pre: Some(Rc::new(|_name, _args| {
3117                PreToolAction::Modify(serde_json::json!({"path": "/sanitized"}))
3118            })),
3119            post: None,
3120        });
3121        let result = run_pre_tool_hooks("read_file", &serde_json::json!({"path": "/etc/passwd"}));
3122        clear_tool_hooks();
3123        match result {
3124            PreToolAction::Modify(args) => assert_eq!(args["path"], "/sanitized"),
3125            _ => panic!("expected Modify"),
3126        }
3127    }
3128
3129    #[test]
3130    fn post_tool_hook_modifies_result() {
3131        clear_tool_hooks();
3132        register_tool_hook(ToolHook {
3133            pattern: "exec".to_string(),
3134            pre: None,
3135            post: Some(Rc::new(|_name, result| {
3136                if result.contains("SECRET") {
3137                    PostToolAction::Modify("[REDACTED]".to_string())
3138                } else {
3139                    PostToolAction::Pass
3140                }
3141            })),
3142        });
3143        let result = run_post_tool_hooks("exec", "output with SECRET data");
3144        let clean = run_post_tool_hooks("exec", "clean output");
3145        clear_tool_hooks();
3146        assert_eq!(result, "[REDACTED]");
3147        assert_eq!(clean, "clean output");
3148    }
3149
3150    #[test]
3151    fn unmatched_hook_pattern_does_not_fire() {
3152        clear_tool_hooks();
3153        register_tool_hook(ToolHook {
3154            pattern: "exec".to_string(),
3155            pre: Some(Rc::new(|_name, _args| {
3156                PreToolAction::Deny("should not match".to_string())
3157            })),
3158            post: None,
3159        });
3160        let result = run_pre_tool_hooks("read_file", &serde_json::json!({}));
3161        clear_tool_hooks();
3162        assert!(matches!(result, PreToolAction::Allow));
3163    }
3164
3165    #[test]
3166    fn glob_match_patterns() {
3167        assert!(glob_match("*", "anything"));
3168        assert!(glob_match("exec*", "exec_at"));
3169        assert!(glob_match("*_file", "read_file"));
3170        assert!(!glob_match("exec*", "read_file"));
3171        assert!(glob_match("read_file", "read_file"));
3172        assert!(!glob_match("read_file", "write_file"));
3173    }
3174
3175    // ── Auto-compaction tests ────────────────────────────────────────
3176
3177    #[test]
3178    fn microcompact_snips_large_output() {
3179        let large = "x".repeat(50_000);
3180        let result = microcompact_tool_output(&large, 10_000);
3181        assert!(result.len() < 15_000);
3182        assert!(result.contains("snipped"));
3183    }
3184
3185    #[test]
3186    fn microcompact_preserves_small_output() {
3187        let small = "hello world";
3188        let result = microcompact_tool_output(small, 10_000);
3189        assert_eq!(result, small);
3190    }
3191
3192    #[test]
3193    fn auto_compact_messages_reduces_count() {
3194        let mut messages: Vec<serde_json::Value> = (0..20)
3195            .map(|i| serde_json::json!({"role": "user", "content": format!("message {i}")}))
3196            .collect();
3197        let runtime = tokio::runtime::Builder::new_current_thread()
3198            .enable_all()
3199            .build()
3200            .unwrap();
3201        let compacted = runtime.block_on(auto_compact_messages(
3202            &mut messages,
3203            &AutoCompactConfig {
3204                compact_strategy: CompactStrategy::Truncate,
3205                keep_last: 6,
3206                ..Default::default()
3207            },
3208            None,
3209        ));
3210        assert!(compacted.unwrap());
3211        assert!(messages.len() <= 7); // 6 kept + 1 summary
3212        assert!(messages[0]["content"]
3213            .as_str()
3214            .unwrap()
3215            .contains("auto-compacted"));
3216    }
3217
3218    #[test]
3219    fn auto_compact_noop_when_under_threshold() {
3220        let mut messages: Vec<serde_json::Value> = (0..4)
3221            .map(|i| serde_json::json!({"role": "user", "content": format!("msg {i}")}))
3222            .collect();
3223        let runtime = tokio::runtime::Builder::new_current_thread()
3224            .enable_all()
3225            .build()
3226            .unwrap();
3227        let compacted = runtime.block_on(auto_compact_messages(
3228            &mut messages,
3229            &AutoCompactConfig {
3230                compact_strategy: CompactStrategy::Truncate,
3231                keep_last: 6,
3232                ..Default::default()
3233            },
3234            None,
3235        ));
3236        assert!(!compacted.unwrap());
3237        assert_eq!(messages.len(), 4);
3238    }
3239
3240    #[test]
3241    fn estimate_message_tokens_basic() {
3242        let messages = vec![
3243            serde_json::json!({"role": "user", "content": "a".repeat(400)}),
3244            serde_json::json!({"role": "assistant", "content": "b".repeat(400)}),
3245        ];
3246        let tokens = estimate_message_tokens(&messages);
3247        assert_eq!(tokens, 200); // 800 chars / 4
3248    }
3249
3250    // ── Artifact dedup and microcompaction tests ─────────────────────
3251
3252    #[test]
3253    fn dedup_artifacts_removes_duplicates() {
3254        let mut artifacts = vec![
3255            ArtifactRecord {
3256                id: "a1".to_string(),
3257                kind: "test".to_string(),
3258                text: Some("duplicate content".to_string()),
3259                ..Default::default()
3260            },
3261            ArtifactRecord {
3262                id: "a2".to_string(),
3263                kind: "test".to_string(),
3264                text: Some("duplicate content".to_string()),
3265                ..Default::default()
3266            },
3267            ArtifactRecord {
3268                id: "a3".to_string(),
3269                kind: "test".to_string(),
3270                text: Some("unique content".to_string()),
3271                ..Default::default()
3272            },
3273        ];
3274        dedup_artifacts(&mut artifacts);
3275        assert_eq!(artifacts.len(), 2);
3276    }
3277
3278    #[test]
3279    fn microcompact_artifact_snips_oversized() {
3280        let mut artifact = ArtifactRecord {
3281            id: "a1".to_string(),
3282            kind: "test".to_string(),
3283            text: Some("x".repeat(10_000)),
3284            estimated_tokens: Some(2_500),
3285            ..Default::default()
3286        };
3287        microcompact_artifact(&mut artifact, 500);
3288        assert!(artifact.text.as_ref().unwrap().len() < 5_000);
3289        assert_eq!(artifact.estimated_tokens, Some(500));
3290    }
3291
3292    // ── Tool argument constraint tests ───────────────────────────────
3293
3294    #[test]
3295    fn arg_constraint_allows_matching_pattern() {
3296        let policy = CapabilityPolicy {
3297            tool_arg_constraints: vec![ToolArgConstraint {
3298                tool: "exec".to_string(),
3299                arg_patterns: vec!["cargo *".to_string()],
3300            }],
3301            ..Default::default()
3302        };
3303        let result = enforce_tool_arg_constraints(
3304            &policy,
3305            "exec",
3306            &serde_json::json!({"command": "cargo test"}),
3307        );
3308        assert!(result.is_ok());
3309    }
3310
3311    #[test]
3312    fn arg_constraint_rejects_non_matching_pattern() {
3313        let policy = CapabilityPolicy {
3314            tool_arg_constraints: vec![ToolArgConstraint {
3315                tool: "exec".to_string(),
3316                arg_patterns: vec!["cargo *".to_string()],
3317            }],
3318            ..Default::default()
3319        };
3320        let result = enforce_tool_arg_constraints(
3321            &policy,
3322            "exec",
3323            &serde_json::json!({"command": "rm -rf /"}),
3324        );
3325        assert!(result.is_err());
3326    }
3327
3328    #[test]
3329    fn arg_constraint_ignores_unmatched_tool() {
3330        let policy = CapabilityPolicy {
3331            tool_arg_constraints: vec![ToolArgConstraint {
3332                tool: "exec".to_string(),
3333                arg_patterns: vec!["cargo *".to_string()],
3334            }],
3335            ..Default::default()
3336        };
3337        let result = enforce_tool_arg_constraints(
3338            &policy,
3339            "read_file",
3340            &serde_json::json!({"path": "/etc/passwd"}),
3341        );
3342        assert!(result.is_ok());
3343    }
3344}