Skip to main content

harn_vm/orchestration/
records.rs

1//! Run records, replay fixtures, eval reports, and diff utilities.
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::path::{Path, PathBuf};
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9    default_run_dir, new_id, now_rfc3339, parse_json_payload, parse_json_value, ArtifactRecord,
10    CapabilityPolicy,
11};
12use crate::llm::vm_value_to_json;
13use crate::value::{VmError, VmValue};
14
15#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
16#[serde(default)]
17pub struct LlmUsageRecord {
18    pub input_tokens: i64,
19    pub output_tokens: i64,
20    pub total_duration_ms: i64,
21    pub call_count: i64,
22    pub total_cost: f64,
23    pub models: Vec<String>,
24}
25
26#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
27#[serde(default)]
28pub struct RunStageRecord {
29    pub id: String,
30    pub node_id: String,
31    pub kind: String,
32    pub status: String,
33    pub outcome: String,
34    pub branch: Option<String>,
35    pub started_at: String,
36    pub finished_at: Option<String>,
37    pub visible_text: Option<String>,
38    pub private_reasoning: Option<String>,
39    pub transcript: Option<serde_json::Value>,
40    pub verification: Option<serde_json::Value>,
41    pub usage: Option<LlmUsageRecord>,
42    pub artifacts: Vec<ArtifactRecord>,
43    pub consumed_artifact_ids: Vec<String>,
44    pub produced_artifact_ids: Vec<String>,
45    pub attempts: Vec<RunStageAttemptRecord>,
46    pub metadata: BTreeMap<String, serde_json::Value>,
47}
48
49#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
50#[serde(default)]
51pub struct RunStageAttemptRecord {
52    pub attempt: usize,
53    pub status: String,
54    pub outcome: String,
55    pub branch: Option<String>,
56    pub error: Option<String>,
57    pub verification: Option<serde_json::Value>,
58    pub started_at: String,
59    pub finished_at: Option<String>,
60}
61
62#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
63#[serde(default)]
64pub struct RunTransitionRecord {
65    pub id: String,
66    pub from_stage_id: Option<String>,
67    pub from_node_id: Option<String>,
68    pub to_node_id: String,
69    pub branch: Option<String>,
70    pub timestamp: String,
71    pub consumed_artifact_ids: Vec<String>,
72    pub produced_artifact_ids: Vec<String>,
73}
74
75#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
76#[serde(default)]
77pub struct RunCheckpointRecord {
78    pub id: String,
79    pub ready_nodes: Vec<String>,
80    pub completed_nodes: Vec<String>,
81    pub last_stage_id: Option<String>,
82    pub persisted_at: String,
83    pub reason: String,
84}
85
86#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
87#[serde(default)]
88pub struct ReplayFixture {
89    #[serde(rename = "_type")]
90    pub type_name: String,
91    pub id: String,
92    pub source_run_id: String,
93    pub workflow_id: String,
94    pub workflow_name: Option<String>,
95    pub created_at: String,
96    pub expected_status: String,
97    pub stage_assertions: Vec<ReplayStageAssertion>,
98}
99
100#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
101#[serde(default)]
102pub struct ReplayStageAssertion {
103    pub node_id: String,
104    pub expected_status: String,
105    pub expected_outcome: String,
106    pub expected_branch: Option<String>,
107    pub required_artifact_kinds: Vec<String>,
108    pub visible_text_contains: Option<String>,
109}
110
111#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
112#[serde(default)]
113pub struct ReplayEvalReport {
114    pub pass: bool,
115    pub failures: Vec<String>,
116    pub stage_count: usize,
117}
118
119#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
120#[serde(default)]
121pub struct ReplayEvalCaseReport {
122    pub run_id: String,
123    pub workflow_id: String,
124    pub label: Option<String>,
125    pub pass: bool,
126    pub failures: Vec<String>,
127    pub stage_count: usize,
128    pub source_path: Option<String>,
129    pub comparison: Option<RunDiffReport>,
130}
131
132#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
133#[serde(default)]
134pub struct ReplayEvalSuiteReport {
135    pub pass: bool,
136    pub total: usize,
137    pub passed: usize,
138    pub failed: usize,
139    pub cases: Vec<ReplayEvalCaseReport>,
140}
141
142#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
143#[serde(default)]
144pub struct RunStageDiffRecord {
145    pub node_id: String,
146    pub change: String,
147    pub details: Vec<String>,
148}
149
150#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
151#[serde(default)]
152pub struct ToolCallDiffRecord {
153    pub tool_name: String,
154    pub args_hash: String,
155    pub result_changed: bool,
156    pub left_result: Option<String>,
157    pub right_result: Option<String>,
158}
159
160#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
161#[serde(default)]
162pub struct RunDiffReport {
163    pub left_run_id: String,
164    pub right_run_id: String,
165    pub identical: bool,
166    pub status_changed: bool,
167    pub left_status: String,
168    pub right_status: String,
169    pub stage_diffs: Vec<RunStageDiffRecord>,
170    pub tool_diffs: Vec<ToolCallDiffRecord>,
171    pub transition_count_delta: isize,
172    pub artifact_count_delta: isize,
173    pub checkpoint_count_delta: isize,
174}
175
176#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
177#[serde(default)]
178pub struct EvalSuiteManifest {
179    #[serde(rename = "_type")]
180    pub type_name: String,
181    pub id: String,
182    pub name: Option<String>,
183    pub base_dir: Option<String>,
184    pub cases: Vec<EvalSuiteCase>,
185}
186
187#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
188#[serde(default)]
189pub struct EvalSuiteCase {
190    pub label: Option<String>,
191    pub run_path: String,
192    pub fixture_path: Option<String>,
193    pub compare_to: Option<String>,
194}
195
196#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
197#[serde(default)]
198pub struct RunRecord {
199    #[serde(rename = "_type")]
200    pub type_name: String,
201    pub id: String,
202    pub workflow_id: String,
203    pub workflow_name: Option<String>,
204    pub task: String,
205    pub status: String,
206    pub started_at: String,
207    pub finished_at: Option<String>,
208    pub parent_run_id: Option<String>,
209    pub root_run_id: Option<String>,
210    pub stages: Vec<RunStageRecord>,
211    pub transitions: Vec<RunTransitionRecord>,
212    pub checkpoints: Vec<RunCheckpointRecord>,
213    pub pending_nodes: Vec<String>,
214    pub completed_nodes: Vec<String>,
215    pub child_runs: Vec<RunChildRecord>,
216    pub artifacts: Vec<ArtifactRecord>,
217    pub policy: CapabilityPolicy,
218    pub execution: Option<RunExecutionRecord>,
219    pub transcript: Option<serde_json::Value>,
220    pub usage: Option<LlmUsageRecord>,
221    pub replay_fixture: Option<ReplayFixture>,
222    pub trace_spans: Vec<RunTraceSpanRecord>,
223    pub tool_recordings: Vec<ToolCallRecord>,
224    pub metadata: BTreeMap<String, serde_json::Value>,
225    pub persisted_path: Option<String>,
226}
227
228#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
229#[serde(default)]
230pub struct ToolCallRecord {
231    pub tool_name: String,
232    pub tool_use_id: String,
233    pub args_hash: String,
234    pub result: String,
235    pub is_rejected: bool,
236    pub duration_ms: u64,
237    pub iteration: usize,
238    pub timestamp: String,
239}
240
241/// Hash a tool invocation for fixture lookup (name + canonical args JSON).
242pub fn tool_fixture_hash(tool_name: &str, args: &serde_json::Value) -> String {
243    use std::hash::{Hash, Hasher};
244    let mut hasher = std::collections::hash_map::DefaultHasher::new();
245    tool_name.hash(&mut hasher);
246    let args_str = serde_json::to_string(args).unwrap_or_default();
247    args_str.hash(&mut hasher);
248    format!("{:016x}", hasher.finish())
249}
250
251#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
252#[serde(default)]
253pub struct RunTraceSpanRecord {
254    pub span_id: u64,
255    pub parent_id: Option<u64>,
256    pub kind: String,
257    pub name: String,
258    pub start_ms: u64,
259    pub duration_ms: u64,
260    pub metadata: BTreeMap<String, serde_json::Value>,
261}
262
263#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
264#[serde(default)]
265pub struct RunChildRecord {
266    pub worker_id: String,
267    pub worker_name: String,
268    pub parent_stage_id: Option<String>,
269    pub session_id: Option<String>,
270    pub parent_session_id: Option<String>,
271    pub mutation_scope: Option<String>,
272    pub approval_policy: Option<super::ToolApprovalPolicy>,
273    pub task: String,
274    pub status: String,
275    pub started_at: String,
276    pub finished_at: Option<String>,
277    pub run_id: Option<String>,
278    pub run_path: Option<String>,
279    pub snapshot_path: Option<String>,
280    pub execution: Option<RunExecutionRecord>,
281}
282
283#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
284#[serde(default)]
285pub struct RunExecutionRecord {
286    pub cwd: Option<String>,
287    pub source_dir: Option<String>,
288    pub env: BTreeMap<String, String>,
289    pub adapter: Option<String>,
290    pub repo_path: Option<String>,
291    pub worktree_path: Option<String>,
292    pub branch: Option<String>,
293    pub base_ref: Option<String>,
294    pub cleanup: Option<String>,
295}
296
297pub fn normalize_run_record(value: &VmValue) -> Result<RunRecord, VmError> {
298    let mut run: RunRecord = parse_json_payload(vm_value_to_json(value), "run_record")?;
299    if run.type_name.is_empty() {
300        run.type_name = "run_record".to_string();
301    }
302    if run.id.is_empty() {
303        run.id = new_id("run");
304    }
305    if run.started_at.is_empty() {
306        run.started_at = now_rfc3339();
307    }
308    if run.status.is_empty() {
309        run.status = "running".to_string();
310    }
311    if run.root_run_id.is_none() {
312        run.root_run_id = Some(run.id.clone());
313    }
314    if run.replay_fixture.is_none() {
315        run.replay_fixture = Some(replay_fixture_from_run(&run));
316    }
317    Ok(run)
318}
319
320pub fn normalize_eval_suite_manifest(value: &VmValue) -> Result<EvalSuiteManifest, VmError> {
321    let mut manifest: EvalSuiteManifest = parse_json_value(value)?;
322    if manifest.type_name.is_empty() {
323        manifest.type_name = "eval_suite_manifest".to_string();
324    }
325    if manifest.id.is_empty() {
326        manifest.id = new_id("eval_suite");
327    }
328    Ok(manifest)
329}
330
331fn load_replay_fixture(path: &Path) -> Result<ReplayFixture, VmError> {
332    let content = std::fs::read_to_string(path)
333        .map_err(|e| VmError::Runtime(format!("failed to read replay fixture: {e}")))?;
334    serde_json::from_str(&content)
335        .map_err(|e| VmError::Runtime(format!("failed to parse replay fixture: {e}")))
336}
337
338fn resolve_manifest_path(base_dir: Option<&Path>, path: &str) -> PathBuf {
339    let path_buf = PathBuf::from(path);
340    if path_buf.is_absolute() {
341        path_buf
342    } else if let Some(base_dir) = base_dir {
343        base_dir.join(path_buf)
344    } else {
345        path_buf
346    }
347}
348
349pub fn evaluate_run_suite_manifest(
350    manifest: &EvalSuiteManifest,
351) -> Result<ReplayEvalSuiteReport, VmError> {
352    let base_dir = manifest.base_dir.as_deref().map(Path::new);
353    let mut reports = Vec::new();
354    for case in &manifest.cases {
355        let run_path = resolve_manifest_path(base_dir, &case.run_path);
356        let run = load_run_record(&run_path)?;
357        let fixture = match &case.fixture_path {
358            Some(path) => load_replay_fixture(&resolve_manifest_path(base_dir, path))?,
359            None => run
360                .replay_fixture
361                .clone()
362                .unwrap_or_else(|| replay_fixture_from_run(&run)),
363        };
364        let eval = evaluate_run_against_fixture(&run, &fixture);
365        let mut pass = eval.pass;
366        let mut failures = eval.failures;
367        let comparison = match &case.compare_to {
368            Some(path) => {
369                let baseline_path = resolve_manifest_path(base_dir, path);
370                let baseline = load_run_record(&baseline_path)?;
371                let diff = diff_run_records(&baseline, &run);
372                if !diff.identical {
373                    pass = false;
374                    failures.push(format!(
375                        "run differs from baseline {} with {} stage changes",
376                        baseline_path.display(),
377                        diff.stage_diffs.len()
378                    ));
379                }
380                Some(diff)
381            }
382            None => None,
383        };
384        reports.push(ReplayEvalCaseReport {
385            run_id: run.id.clone(),
386            workflow_id: run.workflow_id.clone(),
387            label: case.label.clone(),
388            pass,
389            failures,
390            stage_count: eval.stage_count,
391            source_path: Some(run_path.display().to_string()),
392            comparison,
393        });
394    }
395    let total = reports.len();
396    let passed = reports.iter().filter(|report| report.pass).count();
397    let failed = total.saturating_sub(passed);
398    Ok(ReplayEvalSuiteReport {
399        pass: failed == 0,
400        total,
401        passed,
402        failed,
403        cases: reports,
404    })
405}
406
407/// Edit operation in a diff sequence.
408#[derive(Clone, Copy, PartialEq, Eq, Debug)]
409pub(crate) enum DiffOp {
410    Equal,
411    Delete,
412    Insert,
413}
414
415/// Compute the shortest edit script using Myers' O(nd) algorithm.
416/// Returns a sequence of (DiffOp, line_index_in_before_or_after).
417/// Time: O(nd) where d = edit distance. Space: O(d * n).
418pub(crate) fn myers_diff(a: &[&str], b: &[&str]) -> Vec<(DiffOp, usize)> {
419    let n = a.len() as isize;
420    let m = b.len() as isize;
421    if n == 0 && m == 0 {
422        return Vec::new();
423    }
424    if n == 0 {
425        return (0..m as usize).map(|j| (DiffOp::Insert, j)).collect();
426    }
427    if m == 0 {
428        return (0..n as usize).map(|i| (DiffOp::Delete, i)).collect();
429    }
430
431    let max_d = (n + m) as usize;
432    let offset = max_d as isize;
433    let v_size = 2 * max_d + 1;
434    let mut v = vec![0isize; v_size];
435    // trace[d] holds the `v` snapshot BEFORE step d ran — required for backtrack.
436    let mut trace: Vec<Vec<isize>> = Vec::new();
437
438    'outer: for d in 0..=max_d as isize {
439        trace.push(v.clone());
440        let mut new_v = v.clone();
441        for k in (-d..=d).step_by(2) {
442            let ki = (k + offset) as usize;
443            let mut x = if k == -d || (k != d && v[ki - 1] < v[ki + 1]) {
444                v[ki + 1]
445            } else {
446                v[ki - 1] + 1
447            };
448            let mut y = x - k;
449            while x < n && y < m && a[x as usize] == b[y as usize] {
450                x += 1;
451                y += 1;
452            }
453            new_v[ki] = x;
454            if x >= n && y >= m {
455                let _ = new_v;
456                break 'outer;
457            }
458        }
459        v = new_v;
460    }
461
462    let mut ops: Vec<(DiffOp, usize)> = Vec::new();
463    let mut x = n;
464    let mut y = m;
465    for d in (1..trace.len() as isize).rev() {
466        let k = x - y;
467        let v_prev = &trace[d as usize];
468        let prev_k = if k == -d
469            || (k != d && v_prev[(k - 1 + offset) as usize] < v_prev[(k + 1 + offset) as usize])
470        {
471            k + 1
472        } else {
473            k - 1
474        };
475        let prev_x = v_prev[(prev_k + offset) as usize];
476        let prev_y = prev_x - prev_k;
477
478        while x > prev_x && y > prev_y {
479            x -= 1;
480            y -= 1;
481            ops.push((DiffOp::Equal, x as usize));
482        }
483        if prev_k < k {
484            x -= 1;
485            ops.push((DiffOp::Delete, x as usize));
486        } else {
487            y -= 1;
488            ops.push((DiffOp::Insert, y as usize));
489        }
490    }
491    while x > 0 && y > 0 {
492        x -= 1;
493        y -= 1;
494        ops.push((DiffOp::Equal, x as usize));
495    }
496    ops.reverse();
497    ops
498}
499
500pub fn render_unified_diff(path: Option<&str>, before: &str, after: &str) -> String {
501    let before_lines: Vec<&str> = before.lines().collect();
502    let after_lines: Vec<&str> = after.lines().collect();
503    let ops = myers_diff(&before_lines, &after_lines);
504
505    let mut diff = String::new();
506    let file = path.unwrap_or("artifact");
507    diff.push_str(&format!("--- a/{file}\n+++ b/{file}\n"));
508    for &(op, idx) in &ops {
509        match op {
510            DiffOp::Equal => diff.push_str(&format!(" {}\n", before_lines[idx])),
511            DiffOp::Delete => diff.push_str(&format!("-{}\n", before_lines[idx])),
512            DiffOp::Insert => diff.push_str(&format!("+{}\n", after_lines[idx])),
513        }
514    }
515    diff
516}
517
518pub fn save_run_record(run: &RunRecord, path: Option<&str>) -> Result<String, VmError> {
519    let path = path
520        .map(PathBuf::from)
521        .unwrap_or_else(|| default_run_dir().join(format!("{}.json", run.id)));
522    if let Some(parent) = path.parent() {
523        std::fs::create_dir_all(parent)
524            .map_err(|e| VmError::Runtime(format!("failed to create run directory: {e}")))?;
525    }
526    let json = serde_json::to_string_pretty(run)
527        .map_err(|e| VmError::Runtime(format!("failed to encode run record: {e}")))?;
528    // Atomic write: .tmp then rename guards against partial writes on kill.
529    let tmp_path = path.with_extension("json.tmp");
530    std::fs::write(&tmp_path, &json)
531        .map_err(|e| VmError::Runtime(format!("failed to persist run record: {e}")))?;
532    std::fs::rename(&tmp_path, &path).map_err(|e| {
533        // Cross-device renames fail on some filesystems; best-effort direct write.
534        let _ = std::fs::write(&path, &json);
535        VmError::Runtime(format!("failed to finalize run record: {e}"))
536    })?;
537    Ok(path.to_string_lossy().into_owned())
538}
539
540pub fn load_run_record(path: &Path) -> Result<RunRecord, VmError> {
541    let content = std::fs::read_to_string(path)
542        .map_err(|e| VmError::Runtime(format!("failed to read run record: {e}")))?;
543    serde_json::from_str(&content)
544        .map_err(|e| VmError::Runtime(format!("failed to parse run record: {e}")))
545}
546
547pub fn replay_fixture_from_run(run: &RunRecord) -> ReplayFixture {
548    ReplayFixture {
549        type_name: "replay_fixture".to_string(),
550        id: new_id("fixture"),
551        source_run_id: run.id.clone(),
552        workflow_id: run.workflow_id.clone(),
553        workflow_name: run.workflow_name.clone(),
554        created_at: now_rfc3339(),
555        expected_status: run.status.clone(),
556        stage_assertions: run
557            .stages
558            .iter()
559            .map(|stage| ReplayStageAssertion {
560                node_id: stage.node_id.clone(),
561                expected_status: stage.status.clone(),
562                expected_outcome: stage.outcome.clone(),
563                expected_branch: stage.branch.clone(),
564                required_artifact_kinds: stage
565                    .artifacts
566                    .iter()
567                    .map(|artifact| artifact.kind.clone())
568                    .collect(),
569                visible_text_contains: stage
570                    .visible_text
571                    .as_ref()
572                    .filter(|text| !text.is_empty())
573                    .map(|text| text.chars().take(80).collect()),
574            })
575            .collect(),
576    }
577}
578
579pub fn evaluate_run_against_fixture(run: &RunRecord, fixture: &ReplayFixture) -> ReplayEvalReport {
580    let mut failures = Vec::new();
581    if run.status != fixture.expected_status {
582        failures.push(format!(
583            "run status mismatch: expected {}, got {}",
584            fixture.expected_status, run.status
585        ));
586    }
587    let stages_by_id: BTreeMap<&str, &RunStageRecord> =
588        run.stages.iter().map(|s| (s.node_id.as_str(), s)).collect();
589    for assertion in &fixture.stage_assertions {
590        let Some(stage) = stages_by_id.get(assertion.node_id.as_str()) else {
591            failures.push(format!("missing stage {}", assertion.node_id));
592            continue;
593        };
594        if stage.status != assertion.expected_status {
595            failures.push(format!(
596                "stage {} status mismatch: expected {}, got {}",
597                assertion.node_id, assertion.expected_status, stage.status
598            ));
599        }
600        if stage.outcome != assertion.expected_outcome {
601            failures.push(format!(
602                "stage {} outcome mismatch: expected {}, got {}",
603                assertion.node_id, assertion.expected_outcome, stage.outcome
604            ));
605        }
606        if stage.branch != assertion.expected_branch {
607            failures.push(format!(
608                "stage {} branch mismatch: expected {:?}, got {:?}",
609                assertion.node_id, assertion.expected_branch, stage.branch
610            ));
611        }
612        for required_kind in &assertion.required_artifact_kinds {
613            if !stage
614                .artifacts
615                .iter()
616                .any(|artifact| &artifact.kind == required_kind)
617            {
618                failures.push(format!(
619                    "stage {} missing artifact kind {}",
620                    assertion.node_id, required_kind
621                ));
622            }
623        }
624        if let Some(snippet) = &assertion.visible_text_contains {
625            let actual = stage.visible_text.clone().unwrap_or_default();
626            if !actual.contains(snippet) {
627                failures.push(format!(
628                    "stage {} visible text does not contain expected snippet {:?}",
629                    assertion.node_id, snippet
630                ));
631            }
632        }
633    }
634
635    ReplayEvalReport {
636        pass: failures.is_empty(),
637        failures,
638        stage_count: run.stages.len(),
639    }
640}
641
642pub fn evaluate_run_suite(
643    cases: Vec<(RunRecord, ReplayFixture, Option<String>)>,
644) -> ReplayEvalSuiteReport {
645    let mut reports = Vec::new();
646    for (run, fixture, source_path) in cases {
647        let report = evaluate_run_against_fixture(&run, &fixture);
648        reports.push(ReplayEvalCaseReport {
649            run_id: run.id.clone(),
650            workflow_id: run.workflow_id.clone(),
651            label: None,
652            pass: report.pass,
653            failures: report.failures,
654            stage_count: report.stage_count,
655            source_path,
656            comparison: None,
657        });
658    }
659    let total = reports.len();
660    let passed = reports.iter().filter(|report| report.pass).count();
661    let failed = total.saturating_sub(passed);
662    ReplayEvalSuiteReport {
663        pass: failed == 0,
664        total,
665        passed,
666        failed,
667        cases: reports,
668    }
669}
670
671pub fn diff_run_records(left: &RunRecord, right: &RunRecord) -> RunDiffReport {
672    let mut stage_diffs = Vec::new();
673    let mut all_node_ids = BTreeSet::new();
674    let left_by_id: BTreeMap<&str, &RunStageRecord> = left
675        .stages
676        .iter()
677        .map(|s| (s.node_id.as_str(), s))
678        .collect();
679    let right_by_id: BTreeMap<&str, &RunStageRecord> = right
680        .stages
681        .iter()
682        .map(|s| (s.node_id.as_str(), s))
683        .collect();
684    all_node_ids.extend(left_by_id.keys().copied());
685    all_node_ids.extend(right_by_id.keys().copied());
686
687    for node_id in all_node_ids {
688        let left_stage = left_by_id.get(node_id).copied();
689        let right_stage = right_by_id.get(node_id).copied();
690        match (left_stage, right_stage) {
691            (Some(_), None) => stage_diffs.push(RunStageDiffRecord {
692                node_id: node_id.to_string(),
693                change: "removed".to_string(),
694                details: vec!["stage missing from right run".to_string()],
695            }),
696            (None, Some(_)) => stage_diffs.push(RunStageDiffRecord {
697                node_id: node_id.to_string(),
698                change: "added".to_string(),
699                details: vec!["stage missing from left run".to_string()],
700            }),
701            (Some(left_stage), Some(right_stage)) => {
702                let mut details = Vec::new();
703                if left_stage.status != right_stage.status {
704                    details.push(format!(
705                        "status: {} -> {}",
706                        left_stage.status, right_stage.status
707                    ));
708                }
709                if left_stage.outcome != right_stage.outcome {
710                    details.push(format!(
711                        "outcome: {} -> {}",
712                        left_stage.outcome, right_stage.outcome
713                    ));
714                }
715                if left_stage.branch != right_stage.branch {
716                    details.push(format!(
717                        "branch: {:?} -> {:?}",
718                        left_stage.branch, right_stage.branch
719                    ));
720                }
721                if left_stage.produced_artifact_ids.len() != right_stage.produced_artifact_ids.len()
722                {
723                    details.push(format!(
724                        "produced_artifacts: {} -> {}",
725                        left_stage.produced_artifact_ids.len(),
726                        right_stage.produced_artifact_ids.len()
727                    ));
728                }
729                if left_stage.artifacts.len() != right_stage.artifacts.len() {
730                    details.push(format!(
731                        "artifact_records: {} -> {}",
732                        left_stage.artifacts.len(),
733                        right_stage.artifacts.len()
734                    ));
735                }
736                if !details.is_empty() {
737                    stage_diffs.push(RunStageDiffRecord {
738                        node_id: node_id.to_string(),
739                        change: "changed".to_string(),
740                        details,
741                    });
742                }
743            }
744            (None, None) => {}
745        }
746    }
747
748    let mut tool_diffs = Vec::new();
749    let left_tools: std::collections::BTreeMap<(String, String), &ToolCallRecord> = left
750        .tool_recordings
751        .iter()
752        .map(|r| ((r.tool_name.clone(), r.args_hash.clone()), r))
753        .collect();
754    let right_tools: std::collections::BTreeMap<(String, String), &ToolCallRecord> = right
755        .tool_recordings
756        .iter()
757        .map(|r| ((r.tool_name.clone(), r.args_hash.clone()), r))
758        .collect();
759    let all_tool_keys: std::collections::BTreeSet<_> = left_tools
760        .keys()
761        .chain(right_tools.keys())
762        .cloned()
763        .collect();
764    for key in &all_tool_keys {
765        let l = left_tools.get(key);
766        let r = right_tools.get(key);
767        let result_changed = match (l, r) {
768            (Some(a), Some(b)) => a.result != b.result,
769            _ => true,
770        };
771        if result_changed {
772            tool_diffs.push(ToolCallDiffRecord {
773                tool_name: key.0.clone(),
774                args_hash: key.1.clone(),
775                result_changed,
776                left_result: l.map(|t| t.result.clone()),
777                right_result: r.map(|t| t.result.clone()),
778            });
779        }
780    }
781
782    let status_changed = left.status != right.status;
783    let identical = !status_changed
784        && stage_diffs.is_empty()
785        && tool_diffs.is_empty()
786        && left.transitions.len() == right.transitions.len()
787        && left.artifacts.len() == right.artifacts.len()
788        && left.checkpoints.len() == right.checkpoints.len();
789
790    RunDiffReport {
791        left_run_id: left.id.clone(),
792        right_run_id: right.id.clone(),
793        identical,
794        status_changed,
795        left_status: left.status.clone(),
796        right_status: right.status.clone(),
797        stage_diffs,
798        tool_diffs,
799        transition_count_delta: right.transitions.len() as isize - left.transitions.len() as isize,
800        artifact_count_delta: right.artifacts.len() as isize - left.artifacts.len() as isize,
801        checkpoint_count_delta: right.checkpoints.len() as isize - left.checkpoints.len() as isize,
802    }
803}