Skip to main content

harn_vm/orchestration/
records.rs

1//! Run records, replay fixtures, eval reports, and diff utilities.
2
3use std::collections::{BTreeMap, BTreeSet};
4use std::path::{Path, PathBuf};
5
6use serde::{Deserialize, Serialize};
7
8use super::{
9    default_run_dir, new_id, now_rfc3339, parse_json_payload, parse_json_value, ArtifactRecord,
10    CapabilityPolicy,
11};
12use crate::llm::vm_value_to_json;
13use crate::value::{VmError, VmValue};
14
15#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
16#[serde(default)]
17pub struct LlmUsageRecord {
18    pub input_tokens: i64,
19    pub output_tokens: i64,
20    pub total_duration_ms: i64,
21    pub call_count: i64,
22    pub total_cost: f64,
23    pub models: Vec<String>,
24}
25
26#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
27#[serde(default)]
28pub struct RunStageRecord {
29    pub id: String,
30    pub node_id: String,
31    pub kind: String,
32    pub status: String,
33    pub outcome: String,
34    pub branch: Option<String>,
35    pub started_at: String,
36    pub finished_at: Option<String>,
37    pub visible_text: Option<String>,
38    pub private_reasoning: Option<String>,
39    pub transcript: Option<serde_json::Value>,
40    pub verification: Option<serde_json::Value>,
41    pub usage: Option<LlmUsageRecord>,
42    pub artifacts: Vec<ArtifactRecord>,
43    pub consumed_artifact_ids: Vec<String>,
44    pub produced_artifact_ids: Vec<String>,
45    pub attempts: Vec<RunStageAttemptRecord>,
46    pub metadata: BTreeMap<String, serde_json::Value>,
47}
48
49#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
50#[serde(default)]
51pub struct RunStageAttemptRecord {
52    pub attempt: usize,
53    pub status: String,
54    pub outcome: String,
55    pub branch: Option<String>,
56    pub error: Option<String>,
57    pub verification: Option<serde_json::Value>,
58    pub started_at: String,
59    pub finished_at: Option<String>,
60}
61
62#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
63#[serde(default)]
64pub struct RunTransitionRecord {
65    pub id: String,
66    pub from_stage_id: Option<String>,
67    pub from_node_id: Option<String>,
68    pub to_node_id: String,
69    pub branch: Option<String>,
70    pub timestamp: String,
71    pub consumed_artifact_ids: Vec<String>,
72    pub produced_artifact_ids: Vec<String>,
73}
74
75#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
76#[serde(default)]
77pub struct RunCheckpointRecord {
78    pub id: String,
79    pub ready_nodes: Vec<String>,
80    pub completed_nodes: Vec<String>,
81    pub last_stage_id: Option<String>,
82    pub persisted_at: String,
83    pub reason: String,
84}
85
86#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
87#[serde(default)]
88pub struct ReplayFixture {
89    #[serde(rename = "_type")]
90    pub type_name: String,
91    pub id: String,
92    pub source_run_id: String,
93    pub workflow_id: String,
94    pub workflow_name: Option<String>,
95    pub created_at: String,
96    pub expected_status: String,
97    pub stage_assertions: Vec<ReplayStageAssertion>,
98}
99
100#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
101#[serde(default)]
102pub struct ReplayStageAssertion {
103    pub node_id: String,
104    pub expected_status: String,
105    pub expected_outcome: String,
106    pub expected_branch: Option<String>,
107    pub required_artifact_kinds: Vec<String>,
108    pub visible_text_contains: Option<String>,
109}
110
111#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
112#[serde(default)]
113pub struct ReplayEvalReport {
114    pub pass: bool,
115    pub failures: Vec<String>,
116    pub stage_count: usize,
117}
118
119#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
120#[serde(default)]
121pub struct ReplayEvalCaseReport {
122    pub run_id: String,
123    pub workflow_id: String,
124    pub label: Option<String>,
125    pub pass: bool,
126    pub failures: Vec<String>,
127    pub stage_count: usize,
128    pub source_path: Option<String>,
129    pub comparison: Option<RunDiffReport>,
130}
131
132#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
133#[serde(default)]
134pub struct ReplayEvalSuiteReport {
135    pub pass: bool,
136    pub total: usize,
137    pub passed: usize,
138    pub failed: usize,
139    pub cases: Vec<ReplayEvalCaseReport>,
140}
141
142#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
143#[serde(default)]
144pub struct RunStageDiffRecord {
145    pub node_id: String,
146    pub change: String,
147    pub details: Vec<String>,
148}
149
150#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
151#[serde(default)]
152pub struct ToolCallDiffRecord {
153    pub tool_name: String,
154    pub args_hash: String,
155    pub result_changed: bool,
156    pub left_result: Option<String>,
157    pub right_result: Option<String>,
158}
159
160#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
161#[serde(default)]
162pub struct RunDiffReport {
163    pub left_run_id: String,
164    pub right_run_id: String,
165    pub identical: bool,
166    pub status_changed: bool,
167    pub left_status: String,
168    pub right_status: String,
169    pub stage_diffs: Vec<RunStageDiffRecord>,
170    pub tool_diffs: Vec<ToolCallDiffRecord>,
171    pub transition_count_delta: isize,
172    pub artifact_count_delta: isize,
173    pub checkpoint_count_delta: isize,
174}
175
176#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
177#[serde(default)]
178pub struct EvalSuiteManifest {
179    #[serde(rename = "_type")]
180    pub type_name: String,
181    pub id: String,
182    pub name: Option<String>,
183    pub base_dir: Option<String>,
184    pub cases: Vec<EvalSuiteCase>,
185}
186
187#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
188#[serde(default)]
189pub struct EvalSuiteCase {
190    pub label: Option<String>,
191    pub run_path: String,
192    pub fixture_path: Option<String>,
193    pub compare_to: Option<String>,
194}
195
196#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
197#[serde(default)]
198pub struct RunRecord {
199    #[serde(rename = "_type")]
200    pub type_name: String,
201    pub id: String,
202    pub workflow_id: String,
203    pub workflow_name: Option<String>,
204    pub task: String,
205    pub status: String,
206    pub started_at: String,
207    pub finished_at: Option<String>,
208    pub parent_run_id: Option<String>,
209    pub root_run_id: Option<String>,
210    pub stages: Vec<RunStageRecord>,
211    pub transitions: Vec<RunTransitionRecord>,
212    pub checkpoints: Vec<RunCheckpointRecord>,
213    pub pending_nodes: Vec<String>,
214    pub completed_nodes: Vec<String>,
215    pub child_runs: Vec<RunChildRecord>,
216    pub artifacts: Vec<ArtifactRecord>,
217    pub policy: CapabilityPolicy,
218    pub execution: Option<RunExecutionRecord>,
219    pub transcript: Option<serde_json::Value>,
220    pub usage: Option<LlmUsageRecord>,
221    pub replay_fixture: Option<ReplayFixture>,
222    pub trace_spans: Vec<RunTraceSpanRecord>,
223    pub tool_recordings: Vec<ToolCallRecord>,
224    pub metadata: BTreeMap<String, serde_json::Value>,
225    pub persisted_path: Option<String>,
226}
227
228#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
229#[serde(default)]
230pub struct ToolCallRecord {
231    pub tool_name: String,
232    pub tool_use_id: String,
233    pub args_hash: String,
234    pub result: String,
235    pub is_rejected: bool,
236    pub duration_ms: u64,
237    pub iteration: usize,
238    pub timestamp: String,
239}
240
241/// Hash a tool invocation for fixture lookup (name + canonical args JSON).
242pub fn tool_fixture_hash(tool_name: &str, args: &serde_json::Value) -> String {
243    use std::hash::{Hash, Hasher};
244    let mut hasher = std::collections::hash_map::DefaultHasher::new();
245    tool_name.hash(&mut hasher);
246    let args_str = serde_json::to_string(args).unwrap_or_default();
247    args_str.hash(&mut hasher);
248    format!("{:016x}", hasher.finish())
249}
250
251#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
252#[serde(default)]
253pub struct RunTraceSpanRecord {
254    pub span_id: u64,
255    pub parent_id: Option<u64>,
256    pub kind: String,
257    pub name: String,
258    pub start_ms: u64,
259    pub duration_ms: u64,
260    pub metadata: BTreeMap<String, serde_json::Value>,
261}
262
263#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
264#[serde(default)]
265pub struct RunChildRecord {
266    pub worker_id: String,
267    pub worker_name: String,
268    pub parent_stage_id: Option<String>,
269    pub session_id: Option<String>,
270    pub parent_session_id: Option<String>,
271    pub mutation_scope: Option<String>,
272    pub approval_mode: Option<String>,
273    pub task: String,
274    pub status: String,
275    pub started_at: String,
276    pub finished_at: Option<String>,
277    pub run_id: Option<String>,
278    pub run_path: Option<String>,
279    pub snapshot_path: Option<String>,
280    pub execution: Option<RunExecutionRecord>,
281}
282
283#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
284#[serde(default)]
285pub struct RunExecutionRecord {
286    pub cwd: Option<String>,
287    pub source_dir: Option<String>,
288    pub env: BTreeMap<String, String>,
289    pub adapter: Option<String>,
290    pub repo_path: Option<String>,
291    pub worktree_path: Option<String>,
292    pub branch: Option<String>,
293    pub base_ref: Option<String>,
294    pub cleanup: Option<String>,
295}
296
297pub fn normalize_run_record(value: &VmValue) -> Result<RunRecord, VmError> {
298    let mut run: RunRecord = parse_json_payload(vm_value_to_json(value), "run_record")?;
299    if run.type_name.is_empty() {
300        run.type_name = "run_record".to_string();
301    }
302    if run.id.is_empty() {
303        run.id = new_id("run");
304    }
305    if run.started_at.is_empty() {
306        run.started_at = now_rfc3339();
307    }
308    if run.status.is_empty() {
309        run.status = "running".to_string();
310    }
311    if run.root_run_id.is_none() {
312        run.root_run_id = Some(run.id.clone());
313    }
314    if run.replay_fixture.is_none() {
315        run.replay_fixture = Some(replay_fixture_from_run(&run));
316    }
317    Ok(run)
318}
319
320pub fn normalize_eval_suite_manifest(value: &VmValue) -> Result<EvalSuiteManifest, VmError> {
321    let mut manifest: EvalSuiteManifest = parse_json_value(value)?;
322    if manifest.type_name.is_empty() {
323        manifest.type_name = "eval_suite_manifest".to_string();
324    }
325    if manifest.id.is_empty() {
326        manifest.id = new_id("eval_suite");
327    }
328    Ok(manifest)
329}
330
331fn load_replay_fixture(path: &Path) -> Result<ReplayFixture, VmError> {
332    let content = std::fs::read_to_string(path)
333        .map_err(|e| VmError::Runtime(format!("failed to read replay fixture: {e}")))?;
334    serde_json::from_str(&content)
335        .map_err(|e| VmError::Runtime(format!("failed to parse replay fixture: {e}")))
336}
337
338fn resolve_manifest_path(base_dir: Option<&Path>, path: &str) -> PathBuf {
339    let path_buf = PathBuf::from(path);
340    if path_buf.is_absolute() {
341        path_buf
342    } else if let Some(base_dir) = base_dir {
343        base_dir.join(path_buf)
344    } else {
345        path_buf
346    }
347}
348
349pub fn evaluate_run_suite_manifest(
350    manifest: &EvalSuiteManifest,
351) -> Result<ReplayEvalSuiteReport, VmError> {
352    let base_dir = manifest.base_dir.as_deref().map(Path::new);
353    let mut reports = Vec::new();
354    for case in &manifest.cases {
355        let run_path = resolve_manifest_path(base_dir, &case.run_path);
356        let run = load_run_record(&run_path)?;
357        let fixture = match &case.fixture_path {
358            Some(path) => load_replay_fixture(&resolve_manifest_path(base_dir, path))?,
359            None => run
360                .replay_fixture
361                .clone()
362                .unwrap_or_else(|| replay_fixture_from_run(&run)),
363        };
364        let eval = evaluate_run_against_fixture(&run, &fixture);
365        let mut pass = eval.pass;
366        let mut failures = eval.failures;
367        let comparison = match &case.compare_to {
368            Some(path) => {
369                let baseline_path = resolve_manifest_path(base_dir, path);
370                let baseline = load_run_record(&baseline_path)?;
371                let diff = diff_run_records(&baseline, &run);
372                if !diff.identical {
373                    pass = false;
374                    failures.push(format!(
375                        "run differs from baseline {} with {} stage changes",
376                        baseline_path.display(),
377                        diff.stage_diffs.len()
378                    ));
379                }
380                Some(diff)
381            }
382            None => None,
383        };
384        reports.push(ReplayEvalCaseReport {
385            run_id: run.id.clone(),
386            workflow_id: run.workflow_id.clone(),
387            label: case.label.clone(),
388            pass,
389            failures,
390            stage_count: eval.stage_count,
391            source_path: Some(run_path.display().to_string()),
392            comparison,
393        });
394    }
395    let total = reports.len();
396    let passed = reports.iter().filter(|report| report.pass).count();
397    let failed = total.saturating_sub(passed);
398    Ok(ReplayEvalSuiteReport {
399        pass: failed == 0,
400        total,
401        passed,
402        failed,
403        cases: reports,
404    })
405}
406
407/// Edit operation in a diff sequence.
408#[derive(Clone, Copy, PartialEq, Eq, Debug)]
409pub(crate) enum DiffOp {
410    Equal,
411    Delete,
412    Insert,
413}
414
415/// Compute the shortest edit script using Myers' O(nd) algorithm.
416/// Returns a sequence of (DiffOp, line_index_in_before_or_after).
417/// Time: O(nd) where d = edit distance. Space: O(d * n).
418pub(crate) fn myers_diff(a: &[&str], b: &[&str]) -> Vec<(DiffOp, usize)> {
419    let n = a.len() as isize;
420    let m = b.len() as isize;
421    if n == 0 && m == 0 {
422        return Vec::new();
423    }
424    if n == 0 {
425        return (0..m as usize).map(|j| (DiffOp::Insert, j)).collect();
426    }
427    if m == 0 {
428        return (0..n as usize).map(|i| (DiffOp::Delete, i)).collect();
429    }
430
431    let max_d = (n + m) as usize;
432    let offset = max_d as isize;
433    let v_size = 2 * max_d + 1;
434    let mut v = vec![0isize; v_size];
435    // trace[d] stores v snapshot BEFORE step d was applied.
436    let mut trace: Vec<Vec<isize>> = Vec::new();
437
438    'outer: for d in 0..=max_d as isize {
439        trace.push(v.clone());
440        let mut new_v = v.clone();
441        for k in (-d..=d).step_by(2) {
442            let ki = (k + offset) as usize;
443            let mut x = if k == -d || (k != d && v[ki - 1] < v[ki + 1]) {
444                v[ki + 1] // insert (move down)
445            } else {
446                v[ki - 1] + 1 // delete (move right)
447            };
448            let mut y = x - k;
449            while x < n && y < m && a[x as usize] == b[y as usize] {
450                x += 1;
451                y += 1;
452            }
453            new_v[ki] = x;
454            if x >= n && y >= m {
455                let _ = new_v;
456                break 'outer;
457            }
458        }
459        v = new_v;
460    }
461
462    // Backtrack from (n, m) to (0, 0).
463    let mut ops: Vec<(DiffOp, usize)> = Vec::new();
464    let mut x = n;
465    let mut y = m;
466    for d in (1..trace.len() as isize).rev() {
467        let k = x - y;
468        let v_prev = &trace[d as usize];
469        let prev_k = if k == -d
470            || (k != d && v_prev[(k - 1 + offset) as usize] < v_prev[(k + 1 + offset) as usize])
471        {
472            k + 1 // came from insert
473        } else {
474            k - 1 // came from delete
475        };
476        let prev_x = v_prev[(prev_k + offset) as usize];
477        let prev_y = prev_x - prev_k;
478
479        // Diagonal (equal) moves
480        while x > prev_x && y > prev_y {
481            x -= 1;
482            y -= 1;
483            ops.push((DiffOp::Equal, x as usize));
484        }
485        // Edit move
486        if prev_k < k {
487            x -= 1;
488            ops.push((DiffOp::Delete, x as usize));
489        } else {
490            y -= 1;
491            ops.push((DiffOp::Insert, y as usize));
492        }
493    }
494    // Initial diagonal at d=0
495    while x > 0 && y > 0 {
496        x -= 1;
497        y -= 1;
498        ops.push((DiffOp::Equal, x as usize));
499    }
500    ops.reverse();
501    ops
502}
503
504pub fn render_unified_diff(path: Option<&str>, before: &str, after: &str) -> String {
505    let before_lines: Vec<&str> = before.lines().collect();
506    let after_lines: Vec<&str> = after.lines().collect();
507    let ops = myers_diff(&before_lines, &after_lines);
508
509    let mut diff = String::new();
510    let file = path.unwrap_or("artifact");
511    diff.push_str(&format!("--- a/{file}\n+++ b/{file}\n"));
512    for &(op, idx) in &ops {
513        match op {
514            DiffOp::Equal => diff.push_str(&format!(" {}\n", before_lines[idx])),
515            DiffOp::Delete => diff.push_str(&format!("-{}\n", before_lines[idx])),
516            DiffOp::Insert => diff.push_str(&format!("+{}\n", after_lines[idx])),
517        }
518    }
519    diff
520}
521
522pub fn save_run_record(run: &RunRecord, path: Option<&str>) -> Result<String, VmError> {
523    let path = path
524        .map(PathBuf::from)
525        .unwrap_or_else(|| default_run_dir().join(format!("{}.json", run.id)));
526    if let Some(parent) = path.parent() {
527        std::fs::create_dir_all(parent)
528            .map_err(|e| VmError::Runtime(format!("failed to create run directory: {e}")))?;
529    }
530    let json = serde_json::to_string_pretty(run)
531        .map_err(|e| VmError::Runtime(format!("failed to encode run record: {e}")))?;
532    // Atomic write: write to .tmp then rename to prevent corruption on kill.
533    let tmp_path = path.with_extension("json.tmp");
534    std::fs::write(&tmp_path, &json)
535        .map_err(|e| VmError::Runtime(format!("failed to persist run record: {e}")))?;
536    std::fs::rename(&tmp_path, &path).map_err(|e| {
537        // Fallback: if rename fails (cross-device), write directly.
538        let _ = std::fs::write(&path, &json);
539        VmError::Runtime(format!("failed to finalize run record: {e}"))
540    })?;
541    Ok(path.to_string_lossy().to_string())
542}
543
544pub fn load_run_record(path: &Path) -> Result<RunRecord, VmError> {
545    let content = std::fs::read_to_string(path)
546        .map_err(|e| VmError::Runtime(format!("failed to read run record: {e}")))?;
547    serde_json::from_str(&content)
548        .map_err(|e| VmError::Runtime(format!("failed to parse run record: {e}")))
549}
550
551pub fn replay_fixture_from_run(run: &RunRecord) -> ReplayFixture {
552    ReplayFixture {
553        type_name: "replay_fixture".to_string(),
554        id: new_id("fixture"),
555        source_run_id: run.id.clone(),
556        workflow_id: run.workflow_id.clone(),
557        workflow_name: run.workflow_name.clone(),
558        created_at: now_rfc3339(),
559        expected_status: run.status.clone(),
560        stage_assertions: run
561            .stages
562            .iter()
563            .map(|stage| ReplayStageAssertion {
564                node_id: stage.node_id.clone(),
565                expected_status: stage.status.clone(),
566                expected_outcome: stage.outcome.clone(),
567                expected_branch: stage.branch.clone(),
568                required_artifact_kinds: stage
569                    .artifacts
570                    .iter()
571                    .map(|artifact| artifact.kind.clone())
572                    .collect(),
573                visible_text_contains: stage
574                    .visible_text
575                    .as_ref()
576                    .filter(|text| !text.is_empty())
577                    .map(|text| text.chars().take(80).collect()),
578            })
579            .collect(),
580    }
581}
582
583pub fn evaluate_run_against_fixture(run: &RunRecord, fixture: &ReplayFixture) -> ReplayEvalReport {
584    let mut failures = Vec::new();
585    if run.status != fixture.expected_status {
586        failures.push(format!(
587            "run status mismatch: expected {}, got {}",
588            fixture.expected_status, run.status
589        ));
590    }
591    let stages_by_id: BTreeMap<&str, &RunStageRecord> =
592        run.stages.iter().map(|s| (s.node_id.as_str(), s)).collect();
593    for assertion in &fixture.stage_assertions {
594        let Some(stage) = stages_by_id.get(assertion.node_id.as_str()) else {
595            failures.push(format!("missing stage {}", assertion.node_id));
596            continue;
597        };
598        if stage.status != assertion.expected_status {
599            failures.push(format!(
600                "stage {} status mismatch: expected {}, got {}",
601                assertion.node_id, assertion.expected_status, stage.status
602            ));
603        }
604        if stage.outcome != assertion.expected_outcome {
605            failures.push(format!(
606                "stage {} outcome mismatch: expected {}, got {}",
607                assertion.node_id, assertion.expected_outcome, stage.outcome
608            ));
609        }
610        if stage.branch != assertion.expected_branch {
611            failures.push(format!(
612                "stage {} branch mismatch: expected {:?}, got {:?}",
613                assertion.node_id, assertion.expected_branch, stage.branch
614            ));
615        }
616        for required_kind in &assertion.required_artifact_kinds {
617            if !stage
618                .artifacts
619                .iter()
620                .any(|artifact| &artifact.kind == required_kind)
621            {
622                failures.push(format!(
623                    "stage {} missing artifact kind {}",
624                    assertion.node_id, required_kind
625                ));
626            }
627        }
628        if let Some(snippet) = &assertion.visible_text_contains {
629            let actual = stage.visible_text.clone().unwrap_or_default();
630            if !actual.contains(snippet) {
631                failures.push(format!(
632                    "stage {} visible text does not contain expected snippet {:?}",
633                    assertion.node_id, snippet
634                ));
635            }
636        }
637    }
638
639    ReplayEvalReport {
640        pass: failures.is_empty(),
641        failures,
642        stage_count: run.stages.len(),
643    }
644}
645
646pub fn evaluate_run_suite(
647    cases: Vec<(RunRecord, ReplayFixture, Option<String>)>,
648) -> ReplayEvalSuiteReport {
649    let mut reports = Vec::new();
650    for (run, fixture, source_path) in cases {
651        let report = evaluate_run_against_fixture(&run, &fixture);
652        reports.push(ReplayEvalCaseReport {
653            run_id: run.id.clone(),
654            workflow_id: run.workflow_id.clone(),
655            label: None,
656            pass: report.pass,
657            failures: report.failures,
658            stage_count: report.stage_count,
659            source_path,
660            comparison: None,
661        });
662    }
663    let total = reports.len();
664    let passed = reports.iter().filter(|report| report.pass).count();
665    let failed = total.saturating_sub(passed);
666    ReplayEvalSuiteReport {
667        pass: failed == 0,
668        total,
669        passed,
670        failed,
671        cases: reports,
672    }
673}
674
675pub fn diff_run_records(left: &RunRecord, right: &RunRecord) -> RunDiffReport {
676    let mut stage_diffs = Vec::new();
677    let mut all_node_ids = BTreeSet::new();
678    let left_by_id: BTreeMap<&str, &RunStageRecord> = left
679        .stages
680        .iter()
681        .map(|s| (s.node_id.as_str(), s))
682        .collect();
683    let right_by_id: BTreeMap<&str, &RunStageRecord> = right
684        .stages
685        .iter()
686        .map(|s| (s.node_id.as_str(), s))
687        .collect();
688    all_node_ids.extend(left_by_id.keys().copied());
689    all_node_ids.extend(right_by_id.keys().copied());
690
691    for node_id in all_node_ids {
692        let left_stage = left_by_id.get(node_id).copied();
693        let right_stage = right_by_id.get(node_id).copied();
694        match (left_stage, right_stage) {
695            (Some(_), None) => stage_diffs.push(RunStageDiffRecord {
696                node_id: node_id.to_string(),
697                change: "removed".to_string(),
698                details: vec!["stage missing from right run".to_string()],
699            }),
700            (None, Some(_)) => stage_diffs.push(RunStageDiffRecord {
701                node_id: node_id.to_string(),
702                change: "added".to_string(),
703                details: vec!["stage missing from left run".to_string()],
704            }),
705            (Some(left_stage), Some(right_stage)) => {
706                let mut details = Vec::new();
707                if left_stage.status != right_stage.status {
708                    details.push(format!(
709                        "status: {} -> {}",
710                        left_stage.status, right_stage.status
711                    ));
712                }
713                if left_stage.outcome != right_stage.outcome {
714                    details.push(format!(
715                        "outcome: {} -> {}",
716                        left_stage.outcome, right_stage.outcome
717                    ));
718                }
719                if left_stage.branch != right_stage.branch {
720                    details.push(format!(
721                        "branch: {:?} -> {:?}",
722                        left_stage.branch, right_stage.branch
723                    ));
724                }
725                if left_stage.produced_artifact_ids.len() != right_stage.produced_artifact_ids.len()
726                {
727                    details.push(format!(
728                        "produced_artifacts: {} -> {}",
729                        left_stage.produced_artifact_ids.len(),
730                        right_stage.produced_artifact_ids.len()
731                    ));
732                }
733                if left_stage.artifacts.len() != right_stage.artifacts.len() {
734                    details.push(format!(
735                        "artifact_records: {} -> {}",
736                        left_stage.artifacts.len(),
737                        right_stage.artifacts.len()
738                    ));
739                }
740                if !details.is_empty() {
741                    stage_diffs.push(RunStageDiffRecord {
742                        node_id: node_id.to_string(),
743                        change: "changed".to_string(),
744                        details,
745                    });
746                }
747            }
748            (None, None) => {}
749        }
750    }
751
752    // Tool recording diffs
753    let mut tool_diffs = Vec::new();
754    let left_tools: std::collections::BTreeMap<(String, String), &ToolCallRecord> = left
755        .tool_recordings
756        .iter()
757        .map(|r| ((r.tool_name.clone(), r.args_hash.clone()), r))
758        .collect();
759    let right_tools: std::collections::BTreeMap<(String, String), &ToolCallRecord> = right
760        .tool_recordings
761        .iter()
762        .map(|r| ((r.tool_name.clone(), r.args_hash.clone()), r))
763        .collect();
764    let all_tool_keys: std::collections::BTreeSet<_> = left_tools
765        .keys()
766        .chain(right_tools.keys())
767        .cloned()
768        .collect();
769    for key in &all_tool_keys {
770        let l = left_tools.get(key);
771        let r = right_tools.get(key);
772        let result_changed = match (l, r) {
773            (Some(a), Some(b)) => a.result != b.result,
774            _ => true,
775        };
776        if result_changed {
777            tool_diffs.push(ToolCallDiffRecord {
778                tool_name: key.0.clone(),
779                args_hash: key.1.clone(),
780                result_changed,
781                left_result: l.map(|t| t.result.clone()),
782                right_result: r.map(|t| t.result.clone()),
783            });
784        }
785    }
786
787    let status_changed = left.status != right.status;
788    let identical = !status_changed
789        && stage_diffs.is_empty()
790        && tool_diffs.is_empty()
791        && left.transitions.len() == right.transitions.len()
792        && left.artifacts.len() == right.artifacts.len()
793        && left.checkpoints.len() == right.checkpoints.len();
794
795    RunDiffReport {
796        left_run_id: left.id.clone(),
797        right_run_id: right.id.clone(),
798        identical,
799        status_changed,
800        left_status: left.status.clone(),
801        right_status: right.status.clone(),
802        stage_diffs,
803        tool_diffs,
804        transition_count_delta: right.transitions.len() as isize - left.transitions.len() as isize,
805        artifact_count_delta: right.artifacts.len() as isize - left.artifacts.len() as isize,
806        checkpoint_count_delta: right.checkpoints.len() as isize - left.checkpoints.len() as isize,
807    }
808}